You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/22 19:23:29 UTC

[12/12] incubator-ranger git commit: RANGER-397 - Implement reliable streaming audits to configurable destinations - Incorporate Review Feedback

RANGER-397 - Implement reliable streaming audits to configurable
destinations - Incorporate Review Feedback

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/4f3cea22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/4f3cea22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/4f3cea22

Branch: refs/heads/master
Commit: 4f3cea223b9bb717577732bf050bc78f16e94a69
Parents: 42a0e25
Author: Don Bosco Durai <bo...@apache.org>
Authored: Wed Apr 22 09:49:01 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Wed Apr 22 09:49:01 2015 -0700

----------------------------------------------------------------------
 .../audit/destination/AuditDestination.java     |  32 +-
 .../audit/destination/FileAuditDestination.java |  33 +-
 .../audit/destination/HDFSAuditDestination.java |  50 ++-
 .../ranger/audit/model/AuditEventBase.java      |   4 +-
 .../audit/provider/AsyncAuditProvider.java      |  60 +--
 .../ranger/audit/provider/AuditHandler.java     |  46 ++
 .../ranger/audit/provider/AuditProvider.java    |  56 ---
 .../audit/provider/AuditProviderFactory.java    | 142 +++---
 .../ranger/audit/provider/BaseAuditHandler.java | 271 ++++++++++++
 .../audit/provider/BaseAuditProvider.java       | 432 -------------------
 .../audit/provider/BufferedAuditProvider.java   |  12 +-
 .../ranger/audit/provider/DbAuditProvider.java  |  10 -
 .../audit/provider/DummyAuditProvider.java      |  35 +-
 .../audit/provider/Log4jAuditProvider.java      |   2 -
 .../audit/provider/MultiDestAuditProvider.java  |  59 +--
 .../provider/kafka/KafkaAuditProvider.java      |  22 +-
 .../audit/provider/solr/SolrAuditProvider.java  |  33 +-
 .../ranger/audit/queue/AuditAsyncQueue.java     |  34 +-
 .../ranger/audit/queue/AuditBatchQueue.java     |  26 +-
 .../ranger/audit/queue/AuditFileSpool.java      |  57 ++-
 .../apache/ranger/audit/queue/AuditQueue.java   | 174 ++++++++
 .../ranger/audit/queue/AuditSummaryQueue.java   |  49 +--
 .../apache/ranger/audit/test/TestEvents.java    |   4 +-
 .../org/apache/ranger/audit/TestAuditQueue.java |  98 +++--
 .../org/apache/ranger/audit/TestConsumer.java   |  46 +-
 25 files changed, 815 insertions(+), 972 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
index 25c0220..9db8937 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
@@ -23,13 +23,13 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
 
 /**
  * This class needs to be extended by anyone who wants to build custom
  * destination
  */
-public abstract class AuditDestination extends BaseAuditProvider {
+public abstract class AuditDestination extends BaseAuditHandler {
 	private static final Log logger = LogFactory.getLog(AuditDestination.class);
 
 	public AuditDestination() {
@@ -51,21 +51,31 @@ public abstract class AuditDestination extends BaseAuditProvider {
 	/*
 	 * (non-Javadoc)
 	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
 	 */
 	@Override
-	public boolean isFlushPending() {
-		return false;
+	public void flush() {
+
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
-	 */
 	@Override
-	public void flush() {
+	public void start() {
+		
+	}
+
+	@Override
+	public void stop() {
+		
+	}
 
+	@Override
+	public void waitToComplete() {
+		
 	}
 
+	@Override
+	public void waitToComplete(long timeout) {
+		
+	}
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
index 1ccfd5f..a132cdf 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
@@ -21,9 +21,7 @@ package org.apache.ranger.audit.destination;
 
 import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileWriter;
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -107,7 +105,12 @@ public class FileAuditDestination extends AuditDestination {
 	}
 
 	@Override
-	public boolean logJSON(Collection<String> events) {
+	synchronized public boolean logJSON(Collection<String> events) {
+		if (isStopped) {
+			logError("log() called after stop was requested. name=" + getName());
+			return false;
+		}
+
 		try {
 			PrintWriter out = getLogFileStream();
 			for (String event : events) {
@@ -128,7 +131,7 @@ public class FileAuditDestination extends AuditDestination {
 	 * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
 	 */
 	@Override
-	synchronized public boolean log(Collection<AuditEventBase> events) {
+	public boolean log(Collection<AuditEventBase> events) {
 		if (isStopped) {
 			logError("log() called after stop was requested. name=" + getName());
 			return false;
@@ -158,11 +161,16 @@ public class FileAuditDestination extends AuditDestination {
 
 	@Override
 	synchronized public void stop() {
+		isStopped = true;
 		if (logWriter != null) {
-			logWriter.flush();
-			logWriter.close();
+			try {
+				logWriter.flush();
+				logWriter.close();
+			} catch (Throwable t) {
+				logger.error("Error on closing log writter. Exception will be ignored. name="
+						+ getName() + ", fileName=" + currentFileName);
+			}
 			logWriter = null;
-			isStopped = true;
 		}
 	}
 
@@ -214,15 +222,20 @@ public class FileAuditDestination extends AuditDestination {
 		return logWriter;
 	}
 
-	private void closeFileIfNeeded() throws FileNotFoundException, IOException {
+	private void closeFileIfNeeded() {
 		if (logWriter == null) {
 			return;
 		}
 		if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
 			logger.info("Closing file. Rolling over. name=" + getName()
 					+ ", fileName=" + currentFileName);
-			logWriter.flush();
-			logWriter.close();
+			try {
+				logWriter.flush();
+				logWriter.close();
+			} catch (Throwable t) {
+				logger.error("Error on closing log writter. Exception will be ignored. name="
+						+ getName() + ", fileName=" + currentFileName);
+			}
 			logWriter = null;
 			currentFileName = null;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 706eb8e..6ca4fce 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -74,6 +74,12 @@ public class HDFSAuditDestination extends AuditDestination {
 		// Initial folder and file properties
 		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
 				+ "." + PROP_HDFS_DIR);
+		if (logFolderProp == null || logFolderProp.isEmpty()) {
+			logger.fatal("File destination folder is not configured. Please set "
+					+ propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
+			return;
+		}
+
 		String logSubFolder = MiscUtil.getStringProperty(props, propPrefix
 				+ "." + PROP_HDFS_SUBDIR);
 		if (logSubFolder == null || logSubFolder.isEmpty()) {
@@ -89,12 +95,6 @@ public class HDFSAuditDestination extends AuditDestination {
 			logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log";
 		}
 
-		if (logFolderProp == null || logFolderProp.isEmpty()) {
-			logger.fatal("File destination folder is not configured. Please set "
-					+ propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
-			return;
-		}
-
 		logFolder = logFolderProp + "/" + logSubFolder;
 		logger.info("logFolder=" + logFolder + ", destName=" + getName());
 		logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
@@ -104,7 +104,12 @@ public class HDFSAuditDestination extends AuditDestination {
 	}
 
 	@Override
-	public boolean logJSON(Collection<String> events) {
+	synchronized public boolean logJSON(Collection<String> events) {
+		if (isStopped) {
+			logError("log() called after stop was requested. name=" + getName());
+			return false;
+		}
+
 		try {
 			PrintWriter out = getLogFileStream();
 			for (String event : events) {
@@ -125,7 +130,7 @@ public class HDFSAuditDestination extends AuditDestination {
 	 * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
 	 */
 	@Override
-	synchronized public boolean log(Collection<AuditEventBase> events) {
+	public boolean log(Collection<AuditEventBase> events) {
 		if (isStopped) {
 			logError("log() called after stop was requested. name=" + getName());
 			return false;
@@ -155,15 +160,16 @@ public class HDFSAuditDestination extends AuditDestination {
 
 	@Override
 	synchronized public void stop() {
-		try {
-			if (logWriter != null) {
+		isStopped = true;
+		if (logWriter != null) {
+			try {
 				logWriter.flush();
 				logWriter.close();
-				logWriter = null;
-				isStopped = true;
+			} catch (Throwable t) {
+				logger.error("Error on closing log writter. Exception will be ignored. name="
+						+ getName() + ", fileName=" + currentFileName);
 			}
-		} catch (Throwable t) {
-			logger.error("Error closing HDFS file.", t);
+			logWriter = null;
 		}
 	}
 
@@ -198,9 +204,11 @@ public class HDFSAuditDestination extends AuditDestination {
 				String extension = defaultPath.substring(lastDot);
 				fullPath = baseName + "." + i + extension;
 				hdfPath = new Path(fullPath);
-				logger.info("Checking whether log file exists. hdfPath=" + fullPath);
+				logger.info("Checking whether log file exists. hdfPath="
+						+ fullPath);
 			}
-			logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath);
+			logger.info("Log file doesn't exists. Will create and use it. hdfPath="
+					+ fullPath);
 			// Create parent folders
 			createParents(hdfPath, fileSystem);
 
@@ -234,8 +242,14 @@ public class HDFSAuditDestination extends AuditDestination {
 		if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
 			logger.info("Closing file. Rolling over. name=" + getName()
 					+ ", fileName=" + currentFileName);
-			logWriter.flush();
-			logWriter.close();
+			try {
+				logWriter.flush();
+				logWriter.close();
+			} catch (Throwable t) {
+				logger.error("Error on closing log writter. Exception will be ignored. name="
+						+ getName() + ", fileName=" + currentFileName);
+			}
+
 			logWriter = null;
 			currentFileName = null;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
index 39a2578..2c6a87f 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
@@ -32,8 +32,8 @@ public abstract class AuditEventBase {
 	
 	public abstract String getEventKey();
 	public abstract Date getEventTime ();
-	public abstract void setEventCount(long frequencyCount);
-	public abstract void setEventDurationMS(long frequencyDurationMS);
+	public abstract void setEventCount(long eventCount);
+	public abstract void setEventDurationMS(long eventDurationMS);
 	
 	protected String trim(String str, int len) {
 		String ret = str;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index 53adc86..c3a0c78 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -68,7 +68,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		mQueue = new ArrayBlockingQueue<AuditEventBase>(mMaxQueueSize);
 	}
 
-	public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditProvider provider) {
+	public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider) {
 		this(name, maxQueueSize, maxFlushInterval);
 
 		addAuditProvider(provider);
@@ -174,21 +174,21 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		while(ret == null) {
 			logSummaryIfRequired();
 
-			if (mMaxFlushInterval > 0 && isFlushPending()) {
-				long timeTillNextFlush = getTimeTillNextFlush();
-
-				if (timeTillNextFlush <= 0) {
-					break; // force flush
-				}
-
-				ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS);
-			} else {
+//			if (mMaxFlushInterval > 0 && isFlushPending()) {
+//				long timeTillNextFlush = getTimeTillNextFlush();
+//
+//				if (timeTillNextFlush <= 0) {
+//					break; // force flush
+//				}
+//
+//				ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS);
+//			} else {
 				// Let's wake up for summary logging
 				long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime);
 				waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime;
 
 				ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS);
-			}
+//			}
 		}
 
 		if(ret != null) {
@@ -246,23 +246,23 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		LOG.debug("<== AsyncAuditProvider.waitToComplete()");
 	}
 
-	private long getTimeTillNextFlush() {
-		long timeTillNextFlush = mMaxFlushInterval;
-
-		if (mMaxFlushInterval > 0) {
-			long lastFlushTime = getLastFlushTime();
-
-			if (lastFlushTime != 0) {
-				long timeSinceLastFlush = System.currentTimeMillis()
-						- lastFlushTime;
-
-				if (timeSinceLastFlush >= mMaxFlushInterval)
-					timeTillNextFlush = 0;
-				else
-					timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush;
-			}
-		}
-
-		return timeTillNextFlush;
-	}
+//	private long getTimeTillNextFlush() {
+//		long timeTillNextFlush = mMaxFlushInterval;
+//
+//		if (mMaxFlushInterval > 0) {
+//			long lastFlushTime = getLastFlushTime();
+//
+//			if (lastFlushTime != 0) {
+//				long timeSinceLastFlush = System.currentTimeMillis()
+//						- lastFlushTime;
+//
+//				if (timeSinceLastFlush >= mMaxFlushInterval)
+//					timeTillNextFlush = 0;
+//				else
+//					timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush;
+//			}
+//		}
+//
+//		return timeTillNextFlush;
+//	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
new file mode 100644
index 0000000..7b51f1d
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.ranger.audit.model.AuditEventBase;
+
+public interface AuditHandler {
+	public boolean log(AuditEventBase event);
+	public boolean log(Collection<AuditEventBase> events);	
+
+	public boolean logJSON(String event);
+	public boolean logJSON(Collection<String> events);	
+
+    public void init(Properties prop);
+    public void init(Properties prop, String basePropertyName);
+    public void start();
+    public void stop();
+    public void waitToComplete();
+    public void waitToComplete(long timeout);
+
+    /**
+     * Name for this provider. Used only during logging. Uniqueness is not guaranteed
+     */
+    public String getName();
+
+    public void    flush();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
deleted file mode 100644
index 0e38624..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.util.Collection;
-import java.util.Properties;
-
-import org.apache.ranger.audit.model.AuditEventBase;
-
-public interface AuditProvider {
-	public boolean log(AuditEventBase event);
-	public boolean log(Collection<AuditEventBase> events);	
-
-	public boolean logJSON(String event);
-	public boolean logJSON(Collection<String> events);	
-
-    public void init(Properties prop);
-    public void init(Properties prop, String basePropertyName);
-    public void start();
-    public void stop();
-    public void waitToComplete();
-    public void waitToComplete(long timeout);
-
-    /**
-     * Name for this provider. Used only during logging. Uniqueness is not guaranteed
-     */
-    public String getName();
-
-    /**
-     * If this AuditProvider in the state of shutdown
-     * @return
-     */
-    public boolean isDrain();
-    
-    public int getMaxBatchSize();
-    public int getMaxBatchInterval();
-	public boolean isFlushPending();
-	public long    getLastFlushTime();
-    public void    flush();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index a67f7e0..7b2b52b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -31,6 +31,7 @@ import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
 import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
 import org.apache.ranger.audit.queue.AuditAsyncQueue;
 import org.apache.ranger.audit.queue.AuditBatchQueue;
+import org.apache.ranger.audit.queue.AuditQueue;
 import org.apache.ranger.audit.queue.AuditSummaryQueue;
 
 /*
@@ -58,7 +59,7 @@ public class AuditProviderFactory {
 
 	private static AuditProviderFactory sFactory;
 
-	private AuditProvider mProvider = null;
+	private AuditHandler mProvider = null;
 	private boolean mInitDone = false;
 
 	private AuditProviderFactory() {
@@ -79,11 +80,11 @@ public class AuditProviderFactory {
 		return sFactory;
 	}
 
-	public static AuditProvider getAuditProvider() {
+	public static AuditHandler getAuditProvider() {
 		return AuditProviderFactory.getInstance().getProvider();
 	}
 
-	public AuditProvider getProvider() {
+	public AuditHandler getProvider() {
 		return mProvider;
 	}
 
@@ -118,7 +119,7 @@ public class AuditProviderFactory {
 		boolean isAuditToSolrEnabled = MiscUtil.getBooleanProperty(props,
 				AUDIT_SOLR_IS_ENABLED_PROP, false);
 
-		List<AuditProvider> providers = new ArrayList<AuditProvider>();
+		List<AuditHandler> providers = new ArrayList<AuditHandler>();
 
 		// TODO: Delete me
 		for (Object propNameObj : props.keySet()) {
@@ -150,17 +151,16 @@ public class AuditProviderFactory {
 
 		for (String destName : destNameList) {
 			String destPropPrefix = AUDIT_DEST_BASE + "." + destName;
-			AuditProvider destProvider = getProviderFromConfig(props,
-					destPropPrefix, destName);
+			AuditHandler destProvider = getProviderFromConfig(props,
+					destPropPrefix, destName, null);
 
 			if (destProvider != null) {
 				destProvider.init(props, destPropPrefix);
 
 				String queueName = MiscUtil.getStringProperty(props,
-						destPropPrefix + "." + BaseAuditProvider.PROP_QUEUE);
+						destPropPrefix + "." + AuditQueue.PROP_QUEUE);
 				if (queueName == null || queueName.isEmpty()) {
-					LOG.info(destPropPrefix + "."
-							+ BaseAuditProvider.PROP_QUEUE
+					LOG.info(destPropPrefix + "." + AuditQueue.PROP_QUEUE
 							+ " is not set. Setting queue to batch for "
 							+ destName);
 					queueName = "batch";
@@ -169,16 +169,15 @@ public class AuditProviderFactory {
 				if (queueName != null && !queueName.isEmpty()
 						&& !queueName.equalsIgnoreCase("none")) {
 					String queuePropPrefix = destPropPrefix + "." + queueName;
-					AuditProvider queueProvider = getProviderFromConfig(props,
-							queuePropPrefix, queueName);
+					AuditHandler queueProvider = getProviderFromConfig(props,
+							queuePropPrefix, queueName, destProvider);
 					if (queueProvider != null) {
-						if (queueProvider instanceof BaseAuditProvider) {
-							BaseAuditProvider qProvider = (BaseAuditProvider) queueProvider;
-							qProvider.setConsumer(destProvider);
+						if (queueProvider instanceof AuditQueue) {
+							AuditQueue qProvider = (AuditQueue) queueProvider;
 							qProvider.init(props, queuePropPrefix);
 							providers.add(queueProvider);
 						} else {
-							LOG.fatal("Provider queue doesn't extend BaseAuditProvider destination "
+							LOG.fatal("Provider queue doesn't extend AuditQueue. Destination="
 									+ destName
 									+ " can't be created. queueName="
 									+ queueName);
@@ -196,51 +195,51 @@ public class AuditProviderFactory {
 		}
 		if (providers.size() > 0) {
 			LOG.info("Using v3 audit configuration");
-			AuditAsyncQueue asyncQueue = new AuditAsyncQueue();
-			String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
-					+ "async";
-			asyncQueue.init(props, propPrefix);
+			AuditHandler consumer = providers.get(0);
+
+			// Possible pipeline is:
+			// async_queue -> summary_queue -> multidestination -> batch_queue
+			// -> hdfs_destination
+			// -> batch_queue -> solr_destination
+			// -> batch_queue -> kafka_destination
+			// Above, up to multidestination, the providers are same, then it
+			// branches out in parallel.
+
+			// Set the providers in the reverse order e.g.
+
+			if (providers.size() > 1) {
+				// If there are more than one destination, then we need multi
+				// destination to process it in parallel
+				LOG.info("MultiDestAuditProvider is used. Destination count="
+						+ providers.size());
+				MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
+				multiDestProvider.init(props);
+				multiDestProvider.addAuditProviders(providers);
+				consumer = multiDestProvider;
+			}
 
-			propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX;
+			// Let's see if Summary is enabled, then summarize before sending it
+			// downstream
+			String propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX;
 			boolean summaryEnabled = MiscUtil.getBooleanProperty(props,
 					propPrefix + "." + "summary" + "." + "enabled", false);
 			AuditSummaryQueue summaryQueue = null;
 			if (summaryEnabled) {
 				LOG.info("AuditSummaryQueue is enabled");
-				summaryQueue = new AuditSummaryQueue();
+				summaryQueue = new AuditSummaryQueue(consumer);
 				summaryQueue.init(props, propPrefix);
-				asyncQueue.setConsumer(summaryQueue);
+				consumer = summaryQueue;
 			} else {
 				LOG.info("AuditSummaryQueue is disabled");
 			}
 
-			if (providers.size() == 1) {
-				if (summaryEnabled) {
-					LOG.info("Setting " + providers.get(0).getName()
-							+ " as consumer to AuditSummaryQueue");
-					summaryQueue.setConsumer(providers.get(0));
-				} else {
-					LOG.info("Setting " + providers.get(0).getName()
-							+ " as consumer to " + asyncQueue.getName());
-					asyncQueue.setConsumer(providers.get(0));
-				}
-			} else {
-				MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
-				multiDestProvider.init(props);
-				multiDestProvider.addAuditProviders(providers);
-				if (summaryEnabled) {
-					LOG.info("Setting " + multiDestProvider.getName()
-							+ " as consumer to AuditSummaryQueue");
-					summaryQueue.setConsumer(multiDestProvider);
-				} else {
-					LOG.info("Setting " + multiDestProvider.getName()
-							+ " as consumer to " + asyncQueue.getName());
-					asyncQueue.setConsumer(multiDestProvider);
-				}
-			}
+			// Create the AsysnQueue
+			AuditAsyncQueue asyncQueue = new AuditAsyncQueue(consumer);
+			propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "async";
+			asyncQueue.init(props, propPrefix);
 
 			mProvider = asyncQueue;
-			LOG.info("Starting " + mProvider.getName());
+			LOG.info("Starting audit queue " + mProvider.getName());
 			mProvider.start();
 		} else {
 			LOG.info("No v3 audit configuration found. Trying v2 audit configurations");
@@ -315,9 +314,7 @@ public class AuditProviderFactory {
 
 				if (kafkaProvider.isAsync()) {
 					AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
-							"MyKafkaAuditProvider",
-							kafkaProvider.getMaxQueueSize(),
-							kafkaProvider.getMaxBatchInterval(), kafkaProvider);
+							"MyKafkaAuditProvider", 1000, 1000, kafkaProvider);
 					providers.add(asyncProvider);
 				} else {
 					providers.add(kafkaProvider);
@@ -331,9 +328,7 @@ public class AuditProviderFactory {
 
 				if (solrProvider.isAsync()) {
 					AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
-							"MySolrAuditProvider",
-							solrProvider.getMaxQueueSize(),
-							solrProvider.getMaxBatchInterval(), solrProvider);
+							"MySolrAuditProvider", 1000, 1000, solrProvider);
 					providers.add(asyncProvider);
 				} else {
 					providers.add(solrProvider);
@@ -387,18 +382,26 @@ public class AuditProviderFactory {
 		Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
 	}
 
-	private AuditProvider getProviderFromConfig(Properties props,
-			String propPrefix, String providerName) {
-		AuditProvider provider = null;
+	private AuditHandler getProviderFromConfig(Properties props,
+			String propPrefix, String providerName, AuditHandler consumer) {
+		AuditHandler provider = null;
 		String className = MiscUtil.getStringProperty(props, propPrefix + "."
-				+ BaseAuditProvider.PROP_CLASS_NAME);
+				+ BaseAuditHandler.PROP_CLASS_NAME);
 		if (className != null && !className.isEmpty()) {
 			try {
-				provider = (AuditProvider) Class.forName(className)
-						.newInstance();
+				Class<?> handlerClass = Class.forName(className);
+				if (handlerClass.isAssignableFrom(AuditQueue.class)) {
+					// Queue class needs consumer
+					handlerClass.getDeclaredConstructor(AuditHandler.class)
+							.newInstance(consumer);
+				} else {
+					provider = (AuditHandler) Class.forName(className)
+							.newInstance();
+				}
 			} catch (Exception e) {
 				LOG.fatal("Can't instantiate audit class for providerName="
-						+ providerName + ", className=" + className, e);
+						+ providerName + ", className=" + className
+						+ ", propertyPrefix=" + propPrefix, e);
 			}
 		} else {
 			if (providerName.equals("file")) {
@@ -414,25 +417,32 @@ public class AuditProviderFactory {
 			} else if (providerName.equals("log4j")) {
 				provider = new Log4jAuditProvider();
 			} else if (providerName.equals("batch")) {
-				provider = new AuditBatchQueue();
+				provider = new AuditBatchQueue(consumer);
 			} else if (providerName.equals("async")) {
-				provider = new AuditAsyncQueue();
+				provider = new AuditAsyncQueue(consumer);
 			} else {
 				LOG.error("Provider name doesn't have any class associated with it. providerName="
-						+ providerName);
+						+ providerName + ", propertyPrefix=" + propPrefix);
+			}
+		}
+		if (provider != null && provider instanceof AuditQueue) {
+			if (consumer == null) {
+				LOG.fatal("consumer can't be null for AuditQueue. queue="
+						+ provider.getName() + ", propertyPrefix=" + propPrefix);
+				provider = null;
 			}
 		}
 		return provider;
 	}
 
-	private AuditProvider getDefaultProvider() {
+	private AuditHandler getDefaultProvider() {
 		return new DummyAuditProvider();
 	}
 
 	private static class JVMShutdownHook extends Thread {
-		AuditProvider mProvider;
+		AuditHandler mProvider;
 
-		public JVMShutdownHook(AuditProvider provider) {
+		public JVMShutdownHook(AuditHandler provider) {
 			mProvider = provider;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
new file mode 100644
index 0000000..601650e
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import com.google.gson.GsonBuilder;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public abstract class BaseAuditHandler implements AuditHandler {
+	private static final Log LOG = LogFactory.getLog(BaseAuditHandler.class);
+
+	private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
+
+	private int mLogFailureReportMinIntervalInMs = 60 * 1000;
+
+	private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
+	private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
+	private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
+
+	public static final String PROP_NAME = "name";
+	public static final String PROP_CLASS_NAME = "classname";
+
+	public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider";
+
+	protected String propPrefix = PROP_DEFAULT_PREFIX;
+
+	protected String providerName = null;
+
+	protected int failedRetryTimes = 3;
+	protected int failedRetrySleep = 3 * 1000;
+
+	int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+	long lastErrorLogMS = 0;
+
+	protected Properties props = null;
+
+	@Override
+	public void init(Properties props) {
+		init(props, null);
+	}
+
+	@Override
+	public void init(Properties props, String basePropertyName) {
+		LOG.info("BaseAuditProvider.init()");
+		this.props = props;
+		if (basePropertyName != null) {
+			propPrefix = basePropertyName;
+		}
+		LOG.info("propPrefix=" + propPrefix);
+		// Get final token
+		List<String> tokens = MiscUtil.toArray(propPrefix, ".");
+		String finalToken = tokens.get(tokens.size() - 1);
+
+		String name = MiscUtil.getStringProperty(props, basePropertyName + "."
+				+ PROP_NAME);
+		if (name != null && !name.isEmpty()) {
+			providerName = name;
+		}
+		if (providerName == null) {
+			providerName = finalToken;
+			LOG.info("Using providerName from property prefix. providerName="
+					+ providerName);
+		}
+		LOG.info("providerName=" + providerName);
+
+		try {
+			new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+		} catch (Throwable excp) {
+			LOG.warn(
+					"Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json",
+					excp);
+		}
+
+		mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props,
+				AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
+
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+	 * audit.model.AuditEventBase)
+	 */
+	@Override
+	public boolean log(AuditEventBase event) {
+		List<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+		eventList.add(event);
+		return log(eventList);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
+	 */
+	@Override
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
+	 * )
+	 */
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		boolean ret = true;
+		for (String event : events) {
+			ret = logJSON(event);
+			if (!ret) {
+				break;
+			}
+		}
+		return ret;
+	}
+
+	public void setName(String name) {
+		providerName = name;
+	}
+
+	@Override
+	public String getName() {
+		return providerName;
+	}
+
+	public void logFailedEvent(AuditEventBase event) {
+		logFailedEvent(event, null);
+	}
+
+	public void logError(String msg) {
+		long currTimeMS = System.currentTimeMillis();
+		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+			LOG.error(msg);
+			lastErrorLogMS = currTimeMS;
+		}
+	}
+
+	public void logError(String msg, Throwable ex) {
+		long currTimeMS = System.currentTimeMillis();
+		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+			LOG.error(msg, ex);
+			lastErrorLogMS = currTimeMS;
+		}
+	}
+
+	public String getTimeDiffStr(long time1, long time2) {
+		long timeInMs = Math.abs(time1 - time2);
+		return formatIntervalForLog(timeInMs);
+	}
+
+	public String formatIntervalForLog(long timeInMs) {
+		long hours = timeInMs / (60 * 60 * 1000);
+		long minutes = (timeInMs / (60 * 1000)) % 60;
+		long seconds = (timeInMs % (60 * 1000)) / 1000;
+		long mSeconds = (timeInMs % (1000));
+
+		if (hours > 0)
+			return String.format("%02d:%02d:%02d.%03d hours", hours, minutes,
+					seconds, mSeconds);
+		else if (minutes > 0)
+			return String.format("%02d:%02d.%03d minutes", minutes, seconds,
+					mSeconds);
+		else if (seconds > 0)
+			return String.format("%02d.%03d seconds", seconds, mSeconds);
+		else
+			return String.format("%03d milli-seconds", mSeconds);
+	}
+
+	public void logFailedEvent(AuditEventBase event, Throwable excp) {
+		long now = System.currentTimeMillis();
+
+		long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+		long countSinceLastReport = mFailedLogCountSinceLastReport
+				.incrementAndGet();
+		long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+		if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+			mFailedLogLastReportTime.set(now);
+			mFailedLogCountSinceLastReport.set(0);
+
+			if (excp != null) {
+				LOG.warn(
+						"failed to log audit event: "
+								+ MiscUtil.stringify(event), excp);
+			} else {
+				LOG.warn("failed to log audit event: "
+						+ MiscUtil.stringify(event));
+			}
+
+			if (countLifeTime > 1) { // no stats to print for the 1st failure
+				LOG.warn("Log failure count: " + countSinceLastReport
+						+ " in past "
+						+ formatIntervalForLog(timeSinceLastReport) + "; "
+						+ countLifeTime + " during process lifetime");
+			}
+		}
+	}
+
+	public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) {
+		for (AuditEventBase event : events) {
+			logFailedEvent(event, excp);
+		}
+	}
+
+	public void logFailedEventJSON(String event, Throwable excp) {
+		long now = System.currentTimeMillis();
+
+		long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+		long countSinceLastReport = mFailedLogCountSinceLastReport
+				.incrementAndGet();
+		long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+		if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+			mFailedLogLastReportTime.set(now);
+			mFailedLogCountSinceLastReport.set(0);
+
+			if (excp != null) {
+				LOG.warn("failed to log audit event: " + event, excp);
+			} else {
+				LOG.warn("failed to log audit event: " + event);
+			}
+
+			if (countLifeTime > 1) { // no stats to print for the 1st failure
+				LOG.warn("Log failure count: " + countSinceLastReport
+						+ " in past "
+						+ formatIntervalForLog(timeSinceLastReport) + "; "
+						+ countLifeTime + " during process lifetime");
+			}
+		}
+	}
+
+	public void logFailedEventJSON(Collection<String> events, Throwable excp) {
+		for (String event : events) {
+			logFailedEventJSON(event, excp);
+		}
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
deleted file mode 100644
index 85c207b..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ranger.audit.provider;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.queue.AuditFileSpool;
-
-import com.google.gson.GsonBuilder;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-public abstract class BaseAuditProvider implements AuditProvider {
-	private static final Log LOG = LogFactory.getLog(BaseAuditProvider.class);
-
-	private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
-	public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
-	public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
-	public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
-
-	private AtomicLong lifeTimeInLogCount = new AtomicLong(0);
-
-	private int mLogFailureReportMinIntervalInMs = 60 * 1000;
-
-	private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
-	private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
-	private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
-
-	public static final String PROP_NAME = "name";
-	public static final String PROP_CLASS_NAME = "classname";
-	public static final String PROP_QUEUE = "queue";
-
-	public static final String PROP_BATCH_SIZE = "batch.size";
-	public static final String PROP_QUEUE_SIZE = "queue.size";
-	public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
-
-	public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
-	public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms";
-	public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent";
-
-	public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider";
-
-	private boolean isDrain = false;
-	private String providerName = null;
-
-	private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
-	private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
-	private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
-
-	protected int failedRetryTimes = 3;
-	protected int failedRetrySleep = 3 * 1000;
-
-	protected AuditProvider consumer = null;
-	protected AuditFileSpool fileSpooler = null;
-
-	protected boolean fileSpoolerEnabled = false;
-	protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
-	protected int fileSpoolDrainThresholdPercent = 80;
-
-	int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
-	long lastErrorLogMS = 0;
-
-	protected Properties props = null;
-
-	public BaseAuditProvider() {
-	}
-
-	public BaseAuditProvider(AuditProvider consumer) {
-		this.consumer = consumer;
-	}
-
-	@Override
-	public void init(Properties props) {
-		init(props, null);
-	}
-
-	@Override
-	public void init(Properties props, String basePropertyName) {
-		LOG.info("BaseAuditProvider.init()");
-		this.props = props;
-		String propPrefix = PROP_DEFAULT_PREFIX;
-		if (basePropertyName != null) {
-			propPrefix = basePropertyName;
-		}
-		LOG.info("propPrefix=" + propPrefix);
-		// Get final token
-		List<String> tokens = MiscUtil.toArray(propPrefix, ".");
-		String finalToken = tokens.get(tokens.size() - 1);
-
-		String name = MiscUtil.getStringProperty(props, basePropertyName + "."
-				+ PROP_NAME);
-		if (name != null && !name.isEmpty()) {
-			providerName = name;
-		}
-		if (providerName == null) {
-			providerName = finalToken;
-			LOG.info("Using providerName from property prefix. providerName="
-					+ providerName);
-		}
-		LOG.info("providerName=" + providerName);
-
-		setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
-				+ PROP_BATCH_SIZE, getMaxBatchSize()));
-		setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
-				+ PROP_QUEUE_SIZE, getMaxQueueSize()));
-		setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "."
-				+ PROP_BATCH_INTERVAL, getMaxBatchInterval()));
-
-		fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix
-				+ "." + PROP_FILE_SPOOL_ENABLE, false);
-		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
-				+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
-		if (fileSpoolerEnabled || logFolderProp != null) {
-			LOG.info("File spool is enabled for " + getName()
-					+ ", logFolderProp=" + logFolderProp + ", " + propPrefix
-					+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
-					+ fileSpoolerEnabled);
-			fileSpoolerEnabled = true;
-			fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix
-					+ "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
-					fileSpoolMaxWaitTime);
-			fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props,
-					propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD,
-					fileSpoolDrainThresholdPercent);
-			fileSpooler = new AuditFileSpool(this, consumer);
-			fileSpooler.init(props, basePropertyName);
-		} else {
-			LOG.info("File spool is disabled for " + getName());
-		}
-
-		try {
-			new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
-		} catch (Throwable excp) {
-			LOG.warn(
-					"Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json",
-					excp);
-		}
-
-		mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props,
-				AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
-
-	}
-
-	public AuditProvider getConsumer() {
-		return consumer;
-	}
-
-	public void setConsumer(AuditProvider consumer) {
-		this.consumer = consumer;
-	}
-
-	public void logFailedEvent(AuditEventBase event) {
-		logFailedEvent(event, null);
-	}
-
-	public void logFailedEvent(AuditEventBase event, Throwable excp) {
-		long now = System.currentTimeMillis();
-
-		long timeSinceLastReport = now - mFailedLogLastReportTime.get();
-		long countSinceLastReport = mFailedLogCountSinceLastReport
-				.incrementAndGet();
-		long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
-
-		if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
-			mFailedLogLastReportTime.set(now);
-			mFailedLogCountSinceLastReport.set(0);
-
-			if (excp != null) {
-				LOG.warn(
-						"failed to log audit event: "
-								+ MiscUtil.stringify(event), excp);
-			} else {
-				LOG.warn("failed to log audit event: "
-						+ MiscUtil.stringify(event));
-			}
-
-			if (countLifeTime > 1) { // no stats to print for the 1st failure
-				LOG.warn("Log failure count: " + countSinceLastReport
-						+ " in past "
-						+ formatIntervalForLog(timeSinceLastReport) + "; "
-						+ countLifeTime + " during process lifetime");
-			}
-		}
-	}
-
-	public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) {
-		for (AuditEventBase event : events) {
-			logFailedEvent(event, excp);
-		}
-	}
-
-	public void logFailedEventJSON(String event, Throwable excp) {
-		long now = System.currentTimeMillis();
-
-		long timeSinceLastReport = now - mFailedLogLastReportTime.get();
-		long countSinceLastReport = mFailedLogCountSinceLastReport
-				.incrementAndGet();
-		long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
-
-		if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
-			mFailedLogLastReportTime.set(now);
-			mFailedLogCountSinceLastReport.set(0);
-
-			if (excp != null) {
-				LOG.warn("failed to log audit event: " + event, excp);
-			} else {
-				LOG.warn("failed to log audit event: " + event);
-			}
-
-			if (countLifeTime > 1) { // no stats to print for the 1st failure
-				LOG.warn("Log failure count: " + countSinceLastReport
-						+ " in past "
-						+ formatIntervalForLog(timeSinceLastReport) + "; "
-						+ countLifeTime + " during process lifetime");
-			}
-		}
-	}
-
-	public void logFailedEventJSON(Collection<String> events, Throwable excp) {
-		for (String event : events) {
-			logFailedEventJSON(event, excp);
-		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
-	 * audit.model.AuditEventBase)
-	 */
-	@Override
-	public boolean log(AuditEventBase event) {
-		List<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
-		eventList.add(event);
-		return log(eventList);
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
-	 */
-	@Override
-	public boolean logJSON(String event) {
-		AuditEventBase eventObj = MiscUtil.fromJson(event,
-				AuthzAuditEvent.class);
-		return log(eventObj);
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
-	 * )
-	 */
-	@Override
-	public boolean logJSON(Collection<String> events) {
-		boolean ret = true;
-		for (String event : events) {
-			ret = logJSON(event);
-			if (!ret) {
-				break;
-			}
-		}
-		return ret;
-	}
-
-	public void setName(String name) {
-		providerName = name;
-	}
-
-	@Override
-	public String getName() {
-		return providerName;
-	}
-
-	@Override
-	public boolean isDrain() {
-		return isDrain;
-	}
-
-	public void setDrain(boolean isDrain) {
-		this.isDrain = isDrain;
-	}
-
-	public int getMaxQueueSize() {
-		return maxQueueSize;
-	}
-
-	public void setMaxQueueSize(int maxQueueSize) {
-		this.maxQueueSize = maxQueueSize;
-	}
-
-	@Override
-	public int getMaxBatchInterval() {
-		return maxBatchInterval;
-	}
-
-	public void setMaxBatchInterval(int maxBatchInterval) {
-		this.maxBatchInterval = maxBatchInterval;
-	}
-
-	@Override
-	public int getMaxBatchSize() {
-		return maxBatchSize;
-	}
-
-	public void setMaxBatchSize(int maxBatchSize) {
-		this.maxBatchSize = maxBatchSize;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
-	 */
-	@Override
-	public void waitToComplete() {
-		if (consumer != null) {
-			consumer.waitToComplete(-1);
-		}
-	}
-
-	@Override
-	public void waitToComplete(long timeout) {
-		if (consumer != null) {
-			consumer.waitToComplete(timeout);
-		}
-	}
-
-	@Override
-	public boolean isFlushPending() {
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
-	 */
-	@Override
-	public long getLastFlushTime() {
-		if (consumer != null) {
-			return consumer.getLastFlushTime();
-		}
-		return 0;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
-	 */
-	@Override
-	public void flush() {
-		if (consumer != null) {
-			consumer.flush();
-		}
-	}
-
-	public AtomicLong getLifeTimeInLogCount() {
-		return lifeTimeInLogCount;
-	}
-
-	public long addLifeTimeInLogCount(long count) {
-		return lifeTimeInLogCount.addAndGet(count);
-	}
-
-	public void logError(String msg) {
-		long currTimeMS = System.currentTimeMillis();
-		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
-			LOG.error(msg);
-			lastErrorLogMS = currTimeMS;
-		}
-	}
-
-	public void logError(String msg, Throwable ex) {
-		long currTimeMS = System.currentTimeMillis();
-		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
-			LOG.error(msg, ex);
-			lastErrorLogMS = currTimeMS;
-		}
-	}
-
-	public String getTimeDiffStr(long time1, long time2) {
-		long timeInMs = Math.abs(time1 - time2);
-		return formatIntervalForLog(timeInMs);
-	}
-
-	public String formatIntervalForLog(long timeInMs) {
-		long hours = timeInMs / (60 * 60 * 1000);
-		long minutes = (timeInMs / (60 * 1000)) % 60;
-		long seconds = (timeInMs % (60 * 1000)) / 1000;
-		long mSeconds = (timeInMs % (1000));
-
-		if (hours > 0)
-			return String.format("%02d:%02d:%02d.%03d hours", hours, minutes,
-					seconds, mSeconds);
-		else if (minutes > 0)
-			return String.format("%02d:%02d.%03d minutes", minutes, seconds,
-					mSeconds);
-		else if (seconds > 0)
-			return String.format("%02d.%03d seconds", seconds, mSeconds);
-		else
-			return String.format("%03d milli-seconds", mSeconds);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
index ab6a74a..ca842f3 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
@@ -23,7 +23,7 @@ import java.util.Properties;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 
-public abstract class BufferedAuditProvider extends BaseAuditProvider {
+public abstract class BufferedAuditProvider extends BaseAuditHandler {
 	private LogBuffer<AuditEventBase> mBuffer = null;
 	private LogDestination<AuditEventBase> mDestination = null;
 
@@ -107,16 +107,6 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 	}
 
 	@Override
-	public boolean isFlushPending() {
-		return false;
-	}
-
-	@Override
-	public long getLastFlushTime() {
-		return 0;
-	}
-
-	@Override
 	public void flush() {
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index f4bd90c..d475f89 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -177,16 +177,6 @@ public class DbAuditProvider extends AuditDestination {
 	}
 
 	@Override
-	public boolean isFlushPending() {
-		return mUncommitted.size() > 0;
-	}
-	
-	@Override
-	public long getLastFlushTime() {
-		return mLastCommitTime;
-	}
-
-	@Override
 	public void flush() {
 		if(mUncommitted.size() > 0) {
 			boolean isSuccess = commitTransaction();

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
index 619a99d..05f882f 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
@@ -24,7 +24,7 @@ import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 
 
-public class DummyAuditProvider implements AuditProvider {
+public class DummyAuditProvider implements AuditHandler {
 	@Override
 	public void init(Properties prop) {
 		// intentionally left empty
@@ -74,23 +74,6 @@ public class DummyAuditProvider implements AuditProvider {
 		// intentionally left empty
 	}
 
-	
-	@Override
-	public int getMaxBatchSize() {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public boolean isFlushPending() {
-		return false;
-	}
-	
-	@Override
-	public long getLastFlushTime() {
-		return 0;
-	}
-
 	@Override
 	public void flush() {
 		// intentionally left empty
@@ -120,20 +103,4 @@ public class DummyAuditProvider implements AuditProvider {
 		return this.getClass().getName();
 	}
 
-	/* (non-Javadoc)
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
-	 */
-	@Override
-	public boolean isDrain() {
-		return false;
-	}
-
-	/* (non-Javadoc)
-	 * @see org.apache.ranger.audit.provider.AuditProvider#getMaxBatchInterval()
-	 */
-	@Override
-	public int getMaxBatchInterval() {
-		// TODO Auto-generated method stub
-		return 0;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
index 040a045..0402de2 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -27,8 +27,6 @@ import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 
-import com.sun.tools.hat.internal.util.Misc;
-
 
 public class Log4jAuditProvider extends AuditDestination {
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 876fa5b..4c1593a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -26,18 +26,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 
-public class MultiDestAuditProvider extends BaseAuditProvider {
+public class MultiDestAuditProvider extends BaseAuditHandler {
 
 	private static final Log LOG = LogFactory
 			.getLog(MultiDestAuditProvider.class);
 
-	protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>();
+	protected List<AuditHandler> mProviders = new ArrayList<AuditHandler>();
 
 	public MultiDestAuditProvider() {
 		LOG.info("MultiDestAuditProvider: creating..");
 	}
 
-	public MultiDestAuditProvider(AuditProvider provider) {
+	public MultiDestAuditProvider(AuditHandler provider) {
 		addAuditProvider(provider);
 	}
 
@@ -47,7 +47,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 		super.init(props);
 
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.init(props);
 			} catch (Throwable excp) {
@@ -57,7 +57,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 		}
 	}
 
-	public void addAuditProvider(AuditProvider provider) {
+	public void addAuditProvider(AuditHandler provider) {
 		if (provider != null) {
 			LOG.info("MultiDestAuditProvider.addAuditProvider(providerType="
 					+ provider.getClass().getCanonicalName() + ")");
@@ -66,9 +66,9 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 		}
 	}
 
-	public void addAuditProviders(List<AuditProvider> providers) {
+	public void addAuditProviders(List<AuditHandler> providers) {
 		if (providers != null) {
-			for (AuditProvider provider : providers) {
+			for (AuditHandler provider : providers) {
 				LOG.info("Adding " + provider.getName()
 						+ " as consumer to MultiDestination " + getName());
 				addAuditProvider(provider);
@@ -78,7 +78,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public boolean log(AuditEventBase event) {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.log(event);
 			} catch (Throwable excp) {
@@ -90,7 +90,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public boolean log(Collection<AuditEventBase> events) {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.log(events);
 			} catch (Throwable excp) {
@@ -102,7 +102,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public boolean logJSON(String event) {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.logJSON(event);
 			} catch (Throwable excp) {
@@ -114,7 +114,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public boolean logJSON(Collection<String> events) {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.logJSON(events);
 			} catch (Throwable excp) {
@@ -126,7 +126,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public void start() {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.start();
 			} catch (Throwable excp) {
@@ -138,7 +138,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public void stop() {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.stop();
 			} catch (Throwable excp) {
@@ -150,7 +150,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public void waitToComplete() {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.waitToComplete();
 			} catch (Throwable excp) {
@@ -163,7 +163,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public void waitToComplete(long timeout) {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.waitToComplete(timeout);
 			} catch (Throwable excp) {
@@ -175,35 +175,8 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 	}
 
 	@Override
-	public boolean isFlushPending() {
-		for (AuditProvider provider : mProviders) {
-			if (provider.isFlushPending()) {
-				return true;
-			}
-		}
-
-		return false;
-	}
-
-	@Override
-	public long getLastFlushTime() {
-		long lastFlushTime = 0;
-		for (AuditProvider provider : mProviders) {
-			long flushTime = provider.getLastFlushTime();
-
-			if (flushTime != 0) {
-				if (lastFlushTime == 0 || lastFlushTime > flushTime) {
-					lastFlushTime = flushTime;
-				}
-			}
-		}
-
-		return lastFlushTime;
-	}
-
-	@Override
 	public void flush() {
-		for (AuditProvider provider : mProviders) {
+		for (AuditHandler provider : mProviders) {
 			try {
 				provider.flush();
 			} catch (Throwable excp) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
index 5f39e69..2c77b40 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
@@ -25,12 +25,12 @@ import kafka.producer.ProducerConfig;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
 import org.apache.ranger.audit.provider.MiscUtil;
 
-public class KafkaAuditProvider extends BaseAuditProvider {
+public class KafkaAuditProvider extends AuditDestination {
 	private static final Log LOG = LogFactory.getLog(KafkaAuditProvider.class);
 
 	public static final String AUDIT_MAX_QUEUE_SIZE_PROP = "xasecure.audit.kafka.async.max.queue.size";
@@ -47,11 +47,6 @@ public class KafkaAuditProvider extends BaseAuditProvider {
 		LOG.info("init() called");
 		super.init(props);
 
-		setMaxQueueSize(MiscUtil.getIntProperty(props,
-				AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
-		setMaxBatchInterval(MiscUtil.getIntProperty(props,
-				AUDIT_MAX_QUEUE_SIZE_PROP,
-				AUDIT_BATCH_INTERVAL_DEFAULT_MS));
 		topic = MiscUtil.getStringProperty(props,
 				AUDIT_KAFKA_TOPIC_NAME);
 		if (topic == null || topic.isEmpty()) {
@@ -176,19 +171,6 @@ public class KafkaAuditProvider extends BaseAuditProvider {
 	}
 
 	@Override
-	public boolean isFlushPending() {
-		LOG.info("isFlushPending() called");
-		return false;
-	}
-
-	@Override
-	public long getLastFlushTime() {
-		LOG.info("getLastFlushTime() called");
-
-		return 0;
-	}
-
-	@Override
 	public void flush() {
 		LOG.info("flush() called");
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
index 9ee4ec0..53e4348 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
@@ -25,16 +25,16 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
 import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrInputDocument;
 
-public class SolrAuditProvider extends BaseAuditProvider {
+public class SolrAuditProvider extends AuditDestination {
 	private static final Log LOG = LogFactory.getLog(SolrAuditProvider.class);
 
 	public static final String AUDIT_MAX_QUEUE_SIZE_PROP = "xasecure.audit.solr.async.max.queue.size";
@@ -56,11 +56,6 @@ public class SolrAuditProvider extends BaseAuditProvider {
 		LOG.info("init() called");
 		super.init(props);
 
-		setMaxQueueSize(MiscUtil.getIntProperty(props,
-				AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
-		setMaxBatchInterval(MiscUtil.getIntProperty(props,
-				AUDIT_MAX_QUEUE_SIZE_PROP,
-				AUDIT_BATCH_INTERVAL_DEFAULT_MS));
 		retryWaitTime = MiscUtil.getIntProperty(props,
 				AUDIT_RETRY_WAIT_PROP, retryWaitTime);
 	}
@@ -241,29 +236,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
 	public void waitToComplete(long timeout) {
 		
 	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-	 */
-	@Override
-	public boolean isFlushPending() {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
-	 */
-	@Override
-	public long getLastFlushTime() {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
+	
 	/*
 	 * (non-Javadoc)
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index a6f291d..d16fff9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -21,32 +21,27 @@ package org.apache.ranger.audit.queue;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 
 /**
  * This is a non-blocking queue with no limit on capacity.
  */
-public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+public class AuditAsyncQueue extends AuditQueue implements Runnable {
 	private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class);
 
-	LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+	LinkedBlockingQueue<AuditEventBase> queue = new LinkedBlockingQueue<AuditEventBase>();
 	Thread consumerThread = null;
 
 	static final int MAX_DRAIN = 1000;
 	static int threadCount = 0;
 	static final String DEFAULT_NAME = "async";
 
-	public AuditAsyncQueue() {
-		setName(DEFAULT_NAME);
-	}
-
-	public AuditAsyncQueue(AuditProvider consumer) {
+	public AuditAsyncQueue(AuditHandler consumer) {
 		super(consumer);
 		setName(DEFAULT_NAME);
 	}
@@ -65,7 +60,6 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
 			return false;
 		}
 		queue.add(event);
-		addLifeTimeInLogCount(1);
 		return true;
 	}
 
@@ -90,6 +84,9 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
 	public void start() {
 		if (consumer != null) {
 			consumer.start();
+		} else {
+			logger.error("consumer is not set. Nothing will be sent to any consumer. name="
+					+ getName());
 		}
 
 		consumerThread = new Thread(this, this.getClass().getName()
@@ -110,23 +107,10 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
 			if (consumerThread != null) {
 				consumerThread.interrupt();
 			}
-			consumerThread = null;
 		} catch (Throwable t) {
 			// ignore any exception
 		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-	 */
-	@Override
-	public boolean isFlushPending() {
-		if (queue.isEmpty()) {
-			return consumer.isFlushPending();
-		}
-		return true;
+		consumerThread = null;
 	}
 
 	/*

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 5e21efc..8ed07bd 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -29,10 +29,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 
-public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
+public class AuditBatchQueue extends AuditQueue implements Runnable {
 	private static final Log logger = LogFactory.getLog(AuditBatchQueue.class);
 
 	private BlockingQueue<AuditEventBase> queue = null;
@@ -41,10 +40,7 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
 	Thread consumerThread = null;
 	static int threadCount = 0;
 
-	public AuditBatchQueue() {
-	}
-
-	public AuditBatchQueue(AuditProvider consumer) {
+	public AuditBatchQueue(AuditHandler consumer) {
 		super(consumer);
 	}
 
@@ -59,7 +55,6 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
 	public boolean log(AuditEventBase event) {
 		// Add to batchQueue. Block if full
 		queue.add(event);
-		addLifeTimeInLogCount(1);
 		return true;
 	}
 
@@ -130,10 +125,10 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
 			if (consumerThread != null) {
 				consumerThread.interrupt();
 			}
-			consumerThread = null;
 		} catch (Throwable t) {
 			// ignore any exception
 		}
+		consumerThread = null;
 	}
 
 	/*
@@ -187,19 +182,6 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
 	/*
 	 * (non-Javadoc)
 	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-	 */
-	@Override
-	public boolean isFlushPending() {
-		if (queue.isEmpty()) {
-			return consumer.isFlushPending();
-		}
-		return true;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
 	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
 	 */
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
index 66d1573..a1c32b9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
@@ -35,13 +35,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 import org.apache.ranger.audit.provider.MiscUtil;
 
 import com.google.gson.Gson;
@@ -69,10 +69,10 @@ public class AuditFileSpool implements Runnable {
 	// "filespool.index.done_filename";
 	public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms";
 
-	AuditProvider queueProvider = null;
-	AuditProvider consumerProvider = null;
+	AuditQueue queueProvider = null;
+	AuditHandler consumerProvider = null;
 
-	BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>();
+	BlockingQueue<AuditIndexRecord> indexQueue = new LinkedBlockingQueue<AuditIndexRecord>();
 
 	// Folder and File attributes
 	File logFolder = null;
@@ -108,10 +108,10 @@ public class AuditFileSpool implements Runnable {
 	boolean isDrain = false;
 	boolean isDestDown = true;
 
-	private static Gson gson = null;
+	private Gson gson = null;
 
-	public AuditFileSpool(AuditProvider queueProvider,
-			AuditProvider consumerProvider) {
+	public AuditFileSpool(AuditQueue queueProvider,
+			AuditHandler consumerProvider) {
 		this.queueProvider = queueProvider;
 		this.consumerProvider = consumerProvider;
 	}
@@ -120,12 +120,12 @@ public class AuditFileSpool implements Runnable {
 		init(prop, null);
 	}
 
-	public void init(Properties props, String basePropertyName) {
+	public boolean init(Properties props, String basePropertyName) {
 		if (initDone) {
 			logger.error("init() called more than once. queueProvider="
 					+ queueProvider.getName() + ", consumerProvider="
 					+ consumerProvider.getName());
-			return;
+			return true;
 		}
 		String propPrefix = "xasecure.audit.filespool";
 		if (basePropertyName != null) {
@@ -162,22 +162,22 @@ public class AuditFileSpool implements Runnable {
 					+ queueProvider.getName());
 
 			if (logFolderProp == null || logFolderProp.isEmpty()) {
-				logger.error("Audit spool folder is not configured. Please set "
+				logger.fatal("Audit spool folder is not configured. Please set "
 						+ propPrefix
 						+ "."
 						+ PROP_FILE_SPOOL_LOCAL_DIR
 						+ ". queueName=" + queueProvider.getName());
-				return;
+				return false;
 			}
 			logFolder = new File(logFolderProp);
 			if (!logFolder.isDirectory()) {
 				logFolder.mkdirs();
 				if (!logFolder.isDirectory()) {
-					logger.error("File Spool folder not found and can't be created. folder="
+					logger.fatal("File Spool folder not found and can't be created. folder="
 							+ logFolder.getAbsolutePath()
 							+ ", queueName="
 							+ queueProvider.getName());
-					return;
+					return false;
 				}
 			}
 			logger.info("logFolder=" + logFolder + ", queueName="
@@ -202,7 +202,7 @@ public class AuditFileSpool implements Runnable {
 							+ archiveFolder.getAbsolutePath()
 							+ ", queueName="
 							+ queueProvider.getName());
-					return;
+					return false;
 				}
 			}
 			logger.info("archiveFolder=" + archiveFolder + ", queueName="
@@ -218,17 +218,30 @@ public class AuditFileSpool implements Runnable {
 
 			indexFile = new File(logFolder, indexFileName);
 			if (!indexFile.exists()) {
-				indexFile.createNewFile();
+				boolean ret = indexFile.createNewFile();
+				if (!ret) {
+					logger.fatal("Error creating index file. fileName="
+							+ indexDoneFile.getPath());
+					return false;
+				}
 			}
 			logger.info("indexFile=" + indexFile + ", queueName="
 					+ queueProvider.getName());
 
 			int lastDot = indexFileName.lastIndexOf('.');
+			if (lastDot < 0) {
+				lastDot = indexFileName.length() - 1;
+			}
 			indexDoneFileName = indexFileName.substring(0, lastDot)
 					+ "_closed.json";
 			indexDoneFile = new File(logFolder, indexDoneFileName);
 			if (!indexDoneFile.exists()) {
-				indexDoneFile.createNewFile();
+				boolean ret = indexDoneFile.createNewFile();
+				if (!ret) {
+					logger.fatal("Error creating index done file. fileName="
+							+ indexDoneFile.getPath());
+					return false;
+				}
 			}
 			logger.info("indexDoneFile=" + indexDoneFile + ", queueName="
 					+ queueProvider.getName());
@@ -252,8 +265,6 @@ public class AuditFileSpool implements Runnable {
 				}
 			}
 			printIndex();
-			// One more loop to add the rest of the pending records in reverse
-			// order
 			for (int i = 0; i < indexRecords.size(); i++) {
 				AuditIndexRecord auditIndexRecord = indexRecords.get(i);
 				if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
@@ -261,18 +272,19 @@ public class AuditFileSpool implements Runnable {
 					if (!consumerFile.exists()) {
 						logger.error("INIT: Consumer file="
 								+ consumerFile.getPath() + " not found.");
-						System.exit(1);
+					} else {
+						indexQueue.add(auditIndexRecord);
 					}
-					indexQueue.add(auditIndexRecord);
 				}
 			}
 
 		} catch (Throwable t) {
 			logger.fatal("Error initializing File Spooler. queue="
 					+ queueProvider.getName(), t);
-			return;
+			return false;
 		}
 		initDone = true;
+		return true;
 	}
 
 	/**
@@ -328,6 +340,7 @@ public class AuditFileSpool implements Runnable {
 
 					out.flush();
 					out.close();
+					break;
 				} catch (Throwable t) {
 					logger.debug("Error closing spool out file.", t);
 				}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
new file mode 100644
index 0000000..4c3ac5f
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.provider.AuditHandler;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public abstract class AuditQueue extends BaseAuditHandler {
+	private static final Log LOG = LogFactory.getLog(AuditQueue.class);
+
+	public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
+	public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
+	public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+
+	private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
+	private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
+	private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
+
+	public static final String PROP_QUEUE = "queue";
+
+	public static final String PROP_BATCH_SIZE = "batch.size";
+	public static final String PROP_QUEUE_SIZE = "queue.size";
+	public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
+
+	public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
+	public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms";
+	public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent";
+
+	final protected AuditHandler consumer;
+	protected AuditFileSpool fileSpooler = null;
+
+	private boolean isDrain = false;
+
+	protected boolean fileSpoolerEnabled = false;
+	protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
+	protected int fileSpoolDrainThresholdPercent = 80;
+
+	/**
+	 * @param consumer
+	 */
+	public AuditQueue(AuditHandler consumer) {
+		this.consumer = consumer;
+	}
+
+	@Override
+	public void init(Properties props, String basePropertyName) {
+		LOG.info("BaseAuditProvider.init()");
+		super.init(props, basePropertyName);
+
+		setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_BATCH_SIZE, getMaxBatchSize()));
+		setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_QUEUE_SIZE, getMaxQueueSize()));
+		setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_BATCH_INTERVAL, getMaxBatchInterval()));
+
+		fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix
+				+ "." + PROP_FILE_SPOOL_ENABLE, false);
+		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+				+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
+		if (fileSpoolerEnabled || logFolderProp != null) {
+			LOG.info("File spool is enabled for " + getName()
+					+ ", logFolderProp=" + logFolderProp + ", " + propPrefix
+					+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
+					+ fileSpoolerEnabled);
+			fileSpoolerEnabled = true;
+			fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix
+					+ "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
+					fileSpoolMaxWaitTime);
+			fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props,
+					propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD,
+					fileSpoolDrainThresholdPercent);
+			fileSpooler = new AuditFileSpool(this, consumer);
+			if (!fileSpooler.init(props, basePropertyName)) {
+				fileSpoolerEnabled = false;
+				LOG.fatal("Couldn't initialize file spooler. Disabling it. queue="
+						+ getName() + ", consumer=" + consumer.getName());
+			}
+		} else {
+			LOG.info("File spool is disabled for " + getName());
+		}
+
+	}
+
+	public AuditHandler getConsumer() {
+		return consumer;
+	}
+
+	public boolean isDrain() {
+		return isDrain;
+	}
+
+	public void setDrain(boolean isDrain) {
+		this.isDrain = isDrain;
+	}
+
+	public int getMaxQueueSize() {
+		return maxQueueSize;
+	}
+
+	public void setMaxQueueSize(int maxQueueSize) {
+		this.maxQueueSize = maxQueueSize;
+	}
+
+	public int getMaxBatchInterval() {
+		return maxBatchInterval;
+	}
+
+	public void setMaxBatchInterval(int maxBatchInterval) {
+		this.maxBatchInterval = maxBatchInterval;
+	}
+
+	public int getMaxBatchSize() {
+		return maxBatchSize;
+	}
+
+	public void setMaxBatchSize(int maxBatchSize) {
+		this.maxBatchSize = maxBatchSize;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+	 */
+	@Override
+	public void waitToComplete() {
+		if (consumer != null) {
+			consumer.waitToComplete(-1);
+		}
+	}
+
+	@Override
+	public void waitToComplete(long timeout) {
+		if (consumer != null) {
+			consumer.waitToComplete(timeout);
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+	 */
+	@Override
+	public void flush() {
+		if (consumer != null) {
+			consumer.flush();
+		}
+	}
+
+}