You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/22 19:23:19 UTC

[02/12] incubator-ranger git commit: RANGER-276 Add support for aggregating audit logs at source

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
index ec5e9a8..ab6a74a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
@@ -24,7 +24,7 @@ import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 
 public abstract class BufferedAuditProvider extends BaseAuditProvider {
-	private LogBuffer<AuditEventBase>      mBuffer      = null;
+	private LogBuffer<AuditEventBase> mBuffer = null;
 	private LogDestination<AuditEventBase> mDestination = null;
 
 	@Override
@@ -34,34 +34,39 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public boolean log(AuditEventBase event) {
-		if(event instanceof AuthzAuditEvent) {
-			AuthzAuditEvent authzEvent = (AuthzAuditEvent)event;
+		if (event instanceof AuthzAuditEvent) {
+			AuthzAuditEvent authzEvent = (AuthzAuditEvent) event;
 
-			if(authzEvent.getAgentHostname() == null) {
+			if (authzEvent.getAgentHostname() == null) {
 				authzEvent.setAgentHostname(MiscUtil.getHostname());
 			}
 
-			if(authzEvent.getLogType() == null) {
+			if (authzEvent.getLogType() == null) {
 				authzEvent.setLogType("RangerAudit");
 			}
 
-			if(authzEvent.getEventId() == null) {
+			if (authzEvent.getEventId() == null) {
 				authzEvent.setEventId(MiscUtil.generateUniqueId());
 			}
 		}
 
-		if(! mBuffer.add(event)) {
+		if (!mBuffer.add(event)) {
 			logFailedEvent(event);
+			return false;
 		}
 		return true;
 	}
 
 	@Override
 	public boolean log(Collection<AuditEventBase> events) {
+		boolean ret = true;
 		for (AuditEventBase event : events) {
-			log(event);
+			ret = log(event);
+			if (!ret) {
+				break;
+			}
 		}
-		return true;
+		return ret;
 	}
 
 	@Override
@@ -73,8 +78,12 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 
 	@Override
 	public boolean logJSON(Collection<String> events) {
+		boolean ret = true;
 		for (String event : events) {
-			logJSON(event);
+			ret = logJSON(event);
+			if (!ret) {
+				break;
+			}
 		}
 		return false;
 	}
@@ -93,7 +102,6 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 	public void waitToComplete() {
 	}
 
-	
 	@Override
 	public void waitToComplete(long timeout) {
 	}
@@ -120,9 +128,9 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 		return mDestination;
 	}
 
-	protected void setBufferAndDestination(LogBuffer<AuditEventBase>      buffer,
-										   LogDestination<AuditEventBase> destination) {
-		mBuffer      = buffer;
+	protected void setBufferAndDestination(LogBuffer<AuditEventBase> buffer,
+			LogDestination<AuditEventBase> destination) {
+		mBuffer = buffer;
 		mDestination = destination;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index f4976fb..f4bd90c 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -31,6 +31,7 @@ import javax.persistence.Persistence;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.dao.DaoManager;
+import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
@@ -120,10 +121,14 @@ public class DbAuditProvider extends AuditDestination {
 
 	@Override
 	public boolean log(Collection<AuditEventBase> events) {
+		boolean ret = true;
 		for (AuditEventBase event : events) {
-			log(event);
+			ret = log(event);
+			if(!ret) {
+				break;
+			}
 		}
-		return true;
+		return ret;
 	}
 
 	@Override
@@ -135,10 +140,14 @@ public class DbAuditProvider extends AuditDestination {
 
 	@Override
 	public boolean logJSON(Collection<String> events) {
+		boolean ret = true;
 		for (String event : events) {
-			logJSON(event);
+			ret = logJSON(event);
+			if( !ret ) {
+				break;
+			}
 		}
-		return false;
+		return ret;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
deleted file mode 100644
index 62ecab1..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-
-/**
- * This class write the logs to local file
- */
-public class FileAuditDestination extends AuditDestination {
-	private static final Log logger = LogFactory
-			.getLog(FileAuditDestination.class);
-
-	public static final String PROP_FILE_LOCAL_DIR = "dir";
-	public static final String PROP_FILE_LOCAL_FILE_NAME_FORMAT = "filename.format";
-	public static final String PROP_FILE_FILE_ROLLOVER = "file.rollover.sec";
-
-	String baseFolder = null;
-	String fileFormat = null;
-	int fileRolloverSec = 24 * 60 * 60; // In seconds
-	private String logFileNameFormat;
-
-	boolean initDone = false;
-
-	private File logFolder;
-	PrintWriter logWriter = null;
-
-	private Date fileCreateTime = null;
-
-	private String currentFileName;
-
-	private boolean isStopped = false;
-
-	@Override
-	public void init(Properties prop, String propPrefix) {
-		super.init(prop, propPrefix);
-
-		// Initialize properties for this class
-		// Initial folder and file properties
-		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
-				+ "." + PROP_FILE_LOCAL_DIR);
-		logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
-				+ PROP_FILE_LOCAL_FILE_NAME_FORMAT);
-		fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
-				+ PROP_FILE_FILE_ROLLOVER, fileRolloverSec);
-
-		if (logFolderProp == null || logFolderProp.isEmpty()) {
-			logger.error("File destination folder is not configured. Please set "
-					+ propPrefix
-					+ "."
-					+ PROP_FILE_LOCAL_DIR
-					+ ". name="
-					+ getName());
-			return;
-		}
-		logFolder = new File(logFolderProp);
-		if (!logFolder.isDirectory()) {
-			logFolder.mkdirs();
-			if (!logFolder.isDirectory()) {
-				logger.error("FileDestination folder not found and can't be created. folder="
-						+ logFolder.getAbsolutePath() + ", name=" + getName());
-				return;
-			}
-		}
-		logger.info("logFolder=" + logFolder + ", name=" + getName());
-
-		if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
-			logFileNameFormat = "%app-type%_ranger_audit.log";
-		}
-
-		logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
-				+ getName());
-
-		initDone = true;
-	}
-
-	@Override
-	public boolean logJSON(Collection<String> events) {
-		try {
-			PrintWriter out = getLogFileStream();
-			for (String event : events) {
-				out.println(event);
-			}
-			out.flush();
-		} catch (Throwable t) {
-			logError("Error writing to log file.", t);
-			return false;
-		}
-		return true;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
-	 */
-	@Override
-	synchronized public boolean log(Collection<AuditEventBase> events) {
-		if (isStopped) {
-			logError("log() called after stop was requested. name=" + getName());
-			return false;
-		}
-		List<String> jsonList = new ArrayList<String>();
-		for (AuditEventBase event : events) {
-			try {
-				jsonList.add(MiscUtil.stringify(event));
-			} catch (Throwable t) {
-				logger.error("Error converting to JSON. event=" + event);
-			}
-		}
-		return logJSON(jsonList);
-
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
-	 */
-	@Override
-	public void start() {
-		// Nothing to do here. We will open the file when the first log request
-		// comes
-	}
-
-	@Override
-	synchronized public void stop() {
-		if (logWriter != null) {
-			logWriter.flush();
-			logWriter.close();
-			logWriter = null;
-			isStopped = true;
-		}
-	}
-
-	// Helper methods in this class
-	synchronized private PrintWriter getLogFileStream() throws Exception {
-		closeFileIfNeeded();
-
-		// Either there are no open log file or the previous one has been rolled
-		// over
-		if (logWriter == null) {
-			Date currentTime = new Date();
-			// Create a new file
-			String fileName = MiscUtil.replaceTokens(logFileNameFormat,
-					currentTime.getTime());
-			File outLogFile = new File(logFolder, fileName);
-			if (outLogFile.exists()) {
-				// Let's try to get the next available file
-				int i = 0;
-				while (true) {
-					i++;
-					int lastDot = fileName.lastIndexOf('.');
-					String baseName = fileName.substring(0, lastDot);
-					String extension = fileName.substring(lastDot);
-					String newFileName = baseName + "." + i + extension;
-					File newLogFile = new File(logFolder, newFileName);
-					if (!newLogFile.exists()) {
-						// Move the file
-						if (!outLogFile.renameTo(newLogFile)) {
-							logger.error("Error renameing file. " + outLogFile
-									+ " to " + newLogFile);
-						}
-						break;
-					}
-				}
-			}
-			if (!outLogFile.exists()) {
-				logger.info("Creating new file. destName=" + getName()
-						+ ", fileName=" + fileName);
-				// Open the file
-				logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
-						outLogFile)));
-			} else {
-				logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
-						outLogFile, true)));
-			}
-			fileCreateTime = new Date();
-			currentFileName = outLogFile.getPath();
-		}
-		return logWriter;
-	}
-
-	private void closeFileIfNeeded() throws FileNotFoundException, IOException {
-		if (logWriter == null) {
-			return;
-		}
-		if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
-			logger.info("Closing file. Rolling over. name=" + getName()
-					+ ", fileName=" + currentFileName);
-			logWriter.flush();
-			logWriter.close();
-			logWriter = null;
-			currentFileName = null;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
deleted file mode 100644
index a36c40f..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.ranger.audit.model.AuditEventBase;
-
-/**
- * This class write the logs to local file
- */
-public class HDFSAuditDestination extends AuditDestination {
-	private static final Log logger = LogFactory
-			.getLog(HDFSAuditDestination.class);
-
-	public static final String PROP_HDFS_DIR = "dir";
-	public static final String PROP_HDFS_SUBDIR = "subdir";
-	public static final String PROP_HDFS_FILE_NAME_FORMAT = "filename.format";
-	public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec";
-
-	String baseFolder = null;
-	String fileFormat = null;
-	int fileRolloverSec = 24 * 60 * 60; // In seconds
-	private String logFileNameFormat;
-
-	boolean initDone = false;
-
-	private String logFolder;
-	PrintWriter logWriter = null;
-
-	private Date fileCreateTime = null;
-
-	private String currentFileName;
-
-	private boolean isStopped = false;
-
-	@Override
-	public void init(Properties prop, String propPrefix) {
-		super.init(prop, propPrefix);
-
-		// Initialize properties for this class
-		// Initial folder and file properties
-		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
-				+ "." + PROP_HDFS_DIR);
-		String logSubFolder = MiscUtil.getStringProperty(props, propPrefix
-				+ "." + PROP_HDFS_SUBDIR);
-		if (logSubFolder == null || logSubFolder.isEmpty()) {
-			logSubFolder = "%app-type%/%time:yyyyMMdd%";
-		}
-
-		logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
-				+ PROP_HDFS_FILE_NAME_FORMAT);
-		fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
-				+ PROP_HDFS_ROLLOVER, fileRolloverSec);
-
-		if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
-			logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log";
-		}
-
-		if (logFolderProp == null || logFolderProp.isEmpty()) {
-			logger.fatal("File destination folder is not configured. Please set "
-					+ propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
-			return;
-		}
-
-		logFolder = logFolderProp + "/" + logSubFolder;
-		logger.info("logFolder=" + logFolder + ", destName=" + getName());
-		logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
-				+ getName());
-
-		initDone = true;
-	}
-
-	@Override
-	public boolean logJSON(Collection<String> events) {
-		try {
-			PrintWriter out = getLogFileStream();
-			for (String event : events) {
-				out.println(event);
-			}
-			out.flush();
-		} catch (Throwable t) {
-			logError("Error writing to log file.", t);
-			return false;
-		}
-		return true;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
-	 */
-	@Override
-	synchronized public boolean log(Collection<AuditEventBase> events) {
-		if (isStopped) {
-			logError("log() called after stop was requested. name=" + getName());
-			return false;
-		}
-		List<String> jsonList = new ArrayList<String>();
-		for (AuditEventBase event : events) {
-			try {
-				jsonList.add(MiscUtil.stringify(event));
-			} catch (Throwable t) {
-				logger.error("Error converting to JSON. event=" + event);
-			}
-		}
-		return logJSON(jsonList);
-
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
-	 */
-	@Override
-	public void start() {
-		// Nothing to do here. We will open the file when the first log request
-		// comes
-	}
-
-	@Override
-	synchronized public void stop() {
-		try {
-			if (logWriter != null) {
-				logWriter.flush();
-				logWriter.close();
-				logWriter = null;
-				isStopped = true;
-			}
-		} catch (Throwable t) {
-			logger.error("Error closing HDFS file.", t);
-		}
-	}
-
-	// Helper methods in this class
-	synchronized private PrintWriter getLogFileStream() throws Throwable {
-		closeFileIfNeeded();
-
-		// Either there are no open log file or the previous one has been rolled
-		// over
-		if (logWriter == null) {
-			Date currentTime = new Date();
-			// Create a new file
-			String fileName = MiscUtil.replaceTokens(logFileNameFormat,
-					currentTime.getTime());
-			String parentFolder = MiscUtil.replaceTokens(logFolder,
-					currentTime.getTime());
-			Configuration conf = new Configuration();
-
-			String fullPath = parentFolder
-					+ org.apache.hadoop.fs.Path.SEPARATOR + fileName;
-			String defaultPath = fullPath;
-			URI uri = URI.create(fullPath);
-			FileSystem fileSystem = FileSystem.get(uri, conf);
-
-			Path hdfPath = new Path(fullPath);
-			logger.info("Checking whether log file exists. hdfPath=" + fullPath);
-			int i = 0;
-			while (fileSystem.exists(hdfPath)) {
-				i++;
-				int lastDot = defaultPath.lastIndexOf('.');
-				String baseName = defaultPath.substring(0, lastDot);
-				String extension = defaultPath.substring(lastDot);
-				fullPath = baseName + "." + i + extension;
-				hdfPath = new Path(fullPath);
-				logger.info("Checking whether log file exists. hdfPath=" + fullPath);
-			}
-			logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath);
-			// Create parent folders
-			createParents(hdfPath, fileSystem);
-
-			// Create the file to write
-			logger.info("Creating new log file. hdfPath=" + fullPath);
-			FSDataOutputStream ostream = fileSystem.create(hdfPath);
-			logWriter = new PrintWriter(ostream);
-			fileCreateTime = new Date();
-			currentFileName = fullPath;
-		}
-		return logWriter;
-	}
-
-	private void createParents(Path pathLogfile, FileSystem fileSystem)
-			throws Throwable {
-		logger.info("Creating parent folder for " + pathLogfile);
-		Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;
-
-		if (parentPath != null && fileSystem != null
-				&& !fileSystem.exists(parentPath)) {
-			fileSystem.mkdirs(parentPath);
-		}
-	}
-
-	private void closeFileIfNeeded() throws FileNotFoundException, IOException {
-		if (logWriter == null) {
-			return;
-		}
-		// TODO: Close the file on absolute time. Currently it is implemented as
-		// relative time
-		if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
-			logger.info("Closing file. Rolling over. name=" + getName()
-					+ ", fileName=" + currentFileName);
-			logWriter.flush();
-			logWriter.close();
-			logWriter = null;
-			currentFileName = null;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
index a5a52a0..040a045 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 57ac0a0..876fa5b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -69,6 +69,8 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 	public void addAuditProviders(List<AuditProvider> providers) {
 		if (providers != null) {
 			for (AuditProvider provider : providers) {
+				LOG.info("Adding " + provider.getName()
+						+ " as consumer to MultiDestination " + getName());
 				addAuditProvider(provider);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
new file mode 100644
index 0000000..a6f291d
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedTransferQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+
+/**
+ * This is a non-blocking queue with no limit on capacity.
+ */
+public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+	private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class);
+
+	LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+	Thread consumerThread = null;
+
+	static final int MAX_DRAIN = 1000;
+	static int threadCount = 0;
+	static final String DEFAULT_NAME = "async";
+
+	public AuditAsyncQueue() {
+		setName(DEFAULT_NAME);
+	}
+
+	public AuditAsyncQueue(AuditProvider consumer) {
+		super(consumer);
+		setName(DEFAULT_NAME);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+	 * audit.model.AuditEventBase)
+	 */
+	@Override
+	public boolean log(AuditEventBase event) {
+		// Add to the queue and return ASAP
+		if (queue.size() >= getMaxQueueSize()) {
+			return false;
+		}
+		queue.add(event);
+		addLifeTimeInLogCount(1);
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		boolean ret = true;
+		for (AuditEventBase event : events) {
+			ret = log(event);
+			if (!ret) {
+				break;
+			}
+		}
+		return ret;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+	 */
+	@Override
+	public void start() {
+		if (consumer != null) {
+			consumer.start();
+		}
+
+		consumerThread = new Thread(this, this.getClass().getName()
+				+ (threadCount++));
+		consumerThread.setDaemon(true);
+		consumerThread.start();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+	 */
+	@Override
+	public void stop() {
+		setDrain(true);
+		try {
+			if (consumerThread != null) {
+				consumerThread.interrupt();
+			}
+			consumerThread = null;
+		} catch (Throwable t) {
+			// ignore any exception
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+	 */
+	@Override
+	public boolean isFlushPending() {
+		if (queue.isEmpty()) {
+			return consumer.isFlushPending();
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see java.lang.Runnable#run()
+	 */
+	@Override
+	public void run() {
+		while (true) {
+			try {
+				AuditEventBase event = null;
+				if (!isDrain()) {
+					// For Transfer queue take() is blocking
+					event = queue.take();
+				} else {
+					// For Transfer queue poll() is non blocking
+					event = queue.poll();
+				}
+				if (event != null) {
+					Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+					eventList.add(event);
+					queue.drainTo(eventList, MAX_DRAIN - 1);
+					consumer.log(eventList);
+				}
+			} catch (InterruptedException e) {
+				logger.info(
+						"Caught exception in consumer thread. Mostly to about loop",
+						e);
+			} catch (Throwable t) {
+				logger.error("Caught error during processing request.", t);
+			}
+			if (isDrain() && queue.isEmpty()) {
+				break;
+			}
+		}
+		try {
+			// Call stop on the consumer
+			consumer.stop();
+		} catch (Throwable t) {
+			logger.error("Error while calling stop on consumer.", t);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
new file mode 100644
index 0000000..5e21efc
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+
+public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
+	private static final Log logger = LogFactory.getLog(AuditBatchQueue.class);
+
+	private BlockingQueue<AuditEventBase> queue = null;
+	private Collection<AuditEventBase> localBatchBuffer = new ArrayList<AuditEventBase>();
+
+	Thread consumerThread = null;
+	static int threadCount = 0;
+
+	public AuditBatchQueue() {
+	}
+
+	public AuditBatchQueue(AuditProvider consumer) {
+		super(consumer);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+	 * audit.model.AuditEventBase)
+	 */
+	@Override
+	public boolean log(AuditEventBase event) {
+		// Add to batchQueue. Block if full
+		queue.add(event);
+		addLifeTimeInLogCount(1);
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		boolean ret = true;
+		for (AuditEventBase event : events) {
+			ret = log(event);
+			if (!ret) {
+				break;
+			}
+		}
+		return ret;
+	}
+
+	@Override
+	public void init(Properties prop, String basePropertyName) {
+		String propPrefix = "xasecure.audit.batch";
+		if (basePropertyName != null) {
+			propPrefix = basePropertyName;
+		}
+
+		super.init(prop, propPrefix);
+
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+	 */
+	@Override
+	synchronized public void start() {
+		if (consumerThread != null) {
+			logger.error("Provider is already started. name=" + getName());
+			return;
+		}
+		logger.info("Creating ArrayBlockingQueue with maxSize="
+				+ getMaxQueueSize());
+		queue = new ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize());
+
+		// Start the consumer first
+		consumer.start();
+
+		// Then the FileSpooler
+		if (fileSpoolerEnabled) {
+			fileSpooler.start();
+		}
+
+		// Finally the queue listener
+		consumerThread = new Thread(this, this.getClass().getName()
+				+ (threadCount++));
+		consumerThread.setDaemon(true);
+		consumerThread.start();
+
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+	 */
+	@Override
+	public void stop() {
+		setDrain(true);
+		flush();
+		try {
+			if (consumerThread != null) {
+				consumerThread.interrupt();
+			}
+			consumerThread = null;
+		} catch (Throwable t) {
+			// ignore any exception
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+	 */
+	@Override
+	public void waitToComplete() {
+		int defaultTimeOut = -1;
+		waitToComplete(defaultTimeOut);
+		consumer.waitToComplete(defaultTimeOut);
+	}
+
+	@Override
+	public void waitToComplete(long timeout) {
+		setDrain(true);
+		flush();
+		long sleepTime = 1000;
+		long startTime = System.currentTimeMillis();
+		int prevQueueSize = -1;
+		int staticLoopCount = 0;
+		while ((queue.size() > 0 || localBatchBuffer.size() > 0)) {
+			if (prevQueueSize == queue.size()) {
+				logger.error("Queue size is not changing. " + getName()
+						+ ".size=" + queue.size());
+				staticLoopCount++;
+				if (staticLoopCount > 5) {
+					logger.error("Aborting writing to consumer. Some logs will be discarded."
+							+ getName() + ".size=" + queue.size());
+				}
+			} else {
+				staticLoopCount = 0;
+			}
+			if (consumerThread != null) {
+				consumerThread.interrupt();
+			}
+			try {
+				Thread.sleep(sleepTime);
+				if (timeout > 0
+						&& (System.currentTimeMillis() - startTime > timeout)) {
+					break;
+				}
+			} catch (InterruptedException e) {
+				break;
+			}
+		}
+		consumer.waitToComplete(timeout);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+	 */
+	@Override
+	public boolean isFlushPending() {
+		if (queue.isEmpty()) {
+			return consumer.isFlushPending();
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+	 */
+	@Override
+	public void flush() {
+		if (fileSpoolerEnabled) {
+			fileSpooler.flush();
+		}
+		consumer.flush();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see java.lang.Runnable#run()
+	 */
+	@Override
+	public void run() {
+		long lastDispatchTime = System.currentTimeMillis();
+		boolean isDestActive = true;
+		while (true) {
+			// Time to next dispatch
+			long nextDispatchDuration = lastDispatchTime
+					- System.currentTimeMillis() + getMaxBatchInterval();
+
+			boolean isToSpool = false;
+			boolean fileSpoolDrain = false;
+			try {
+				if (fileSpoolerEnabled && fileSpooler.isPending()) {
+					int percentUsed = (getMaxQueueSize() - queue.size()) * 100
+							/ getMaxQueueSize();
+					long lastAttemptDelta = fileSpooler
+							.getLastAttemptTimeDelta();
+
+					fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime;
+					// If we should even read from queue?
+					if (!isDrain() && !fileSpoolDrain
+							&& percentUsed < fileSpoolDrainThresholdPercent) {
+						// Since some files are still under progress and it is
+						// not in drain mode, lets wait and retry
+						if (nextDispatchDuration > 0) {
+							Thread.sleep(nextDispatchDuration);
+							lastDispatchTime = System.currentTimeMillis();
+						}
+						continue;
+					}
+					isToSpool = true;
+				}
+
+				AuditEventBase event = null;
+
+				if (!isToSpool && !isDrain() && !fileSpoolDrain
+						&& nextDispatchDuration > 0) {
+					event = queue.poll(nextDispatchDuration,
+							TimeUnit.MILLISECONDS);
+				} else {
+					// For poll() is non blocking
+					event = queue.poll();
+				}
+
+				if (event != null) {
+					localBatchBuffer.add(event);
+					if (getMaxBatchSize() >= localBatchBuffer.size()) {
+						queue.drainTo(localBatchBuffer, getMaxBatchSize()
+								- localBatchBuffer.size());
+					}
+				} else {
+					// poll returned due to timeout, so reseting clock
+					nextDispatchDuration = lastDispatchTime
+							- System.currentTimeMillis()
+							+ getMaxBatchInterval();
+
+					lastDispatchTime = System.currentTimeMillis();
+				}
+			} catch (InterruptedException e) {
+				logger.info(
+						"Caught exception in consumer thread. Mostly to abort loop",
+						e);
+				setDrain(true);
+			} catch (Throwable t) {
+				logger.error("Caught error during processing request.", t);
+			}
+
+			if (localBatchBuffer.size() > 0 && isToSpool) {
+				// Let spool to the file directly
+				if (isDestActive) {
+					logger.info("Switching to file spool. Queue=" + getName()
+							+ ", dest=" + consumer.getName());
+				}
+				isDestActive = false;
+				// Just before stashing
+				lastDispatchTime = System.currentTimeMillis();
+				fileSpooler.stashLogs(localBatchBuffer);
+				localBatchBuffer.clear();
+			} else if (localBatchBuffer.size() > 0
+					&& (isDrain()
+							|| localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
+				if (fileSpoolerEnabled && !isDestActive) {
+					logger.info("Switching to writing to destination. Queue="
+							+ getName() + ", dest=" + consumer.getName());
+				}
+				// Reset time just before sending the logs
+				lastDispatchTime = System.currentTimeMillis();
+				boolean ret = consumer.log(localBatchBuffer);
+				if (!ret) {
+					if (fileSpoolerEnabled) {
+						logger.info("Switching to file spool. Queue="
+								+ getName() + ", dest=" + consumer.getName());
+						// Transient error. Stash and move on
+						fileSpooler.stashLogs(localBatchBuffer);
+						isDestActive = false;
+					} else {
+						// We need to drop this event
+						logFailedEvent(localBatchBuffer, null);
+					}
+				} else {
+					isDestActive = true;
+				}
+				localBatchBuffer.clear();
+			}
+
+			if (isDrain()) {
+				if (!queue.isEmpty() || localBatchBuffer.size() > 0) {
+					logger.info("Queue is not empty. Will retry. queue.size)="
+							+ queue.size() + ", localBatchBuffer.size()="
+							+ localBatchBuffer.size());
+				} else {
+					break;
+				}
+			}
+		}
+
+		logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
+				+ consumer.getName());
+		try {
+			// Call stop on the consumer
+			consumer.stop();
+			if (fileSpoolerEnabled) {
+				fileSpooler.stop();
+			}
+		} catch (Throwable t) {
+			logger.error("Error while calling stop on consumer.", t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
new file mode 100644
index 0000000..66d1573
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
@@ -0,0 +1,884 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * This class temporarily stores logs in file system if the destination is
+ * overloaded or down
+ */
+public class AuditFileSpool implements Runnable {
+	private static final Log logger = LogFactory.getLog(AuditFileSpool.class);
+
+	public enum SPOOL_FILE_STATUS {
+		pending, write_inprogress, read_inprogress, done
+	}
+
+	public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir";
+	public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format";
+	public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir";
+	public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files";
+	public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix";
+	public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec";
+	public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename";
+	// public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE =
+	// "filespool.index.done_filename";
+	public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms";
+
+	AuditProvider queueProvider = null;
+	AuditProvider consumerProvider = null;
+
+	BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>();
+
+	// Folder and File attributes
+	File logFolder = null;
+	String logFileNameFormat = null;
+	File archiveFolder = null;
+	String fileNamePrefix = null;
+	String indexFileName = null;
+	File indexFile = null;
+	String indexDoneFileName = null;
+	File indexDoneFile = null;
+	int retryDestinationMS = 30 * 1000; // Default 30 seconds
+	int fileRolloverSec = 24 * 60 * 60; // In seconds
+	int maxArchiveFiles = 100;
+
+	int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+	long lastErrorLogMS = 0;
+
+	List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>();
+
+	boolean isPending = false;
+	long lastAttemptTime = 0;
+	boolean initDone = false;
+
+	PrintWriter logWriter = null;
+	AuditIndexRecord currentWriterIndexRecord = null;
+	AuditIndexRecord currentConsumerIndexRecord = null;
+
+	BufferedReader logReader = null;
+
+	Thread destinationThread = null;
+
+	boolean isWriting = true;
+	boolean isDrain = false;
+	boolean isDestDown = true;
+
+	private static Gson gson = null;
+
+	public AuditFileSpool(AuditProvider queueProvider,
+			AuditProvider consumerProvider) {
+		this.queueProvider = queueProvider;
+		this.consumerProvider = consumerProvider;
+	}
+
+	public void init(Properties prop) {
+		init(prop, null);
+	}
+
+	public void init(Properties props, String basePropertyName) {
+		if (initDone) {
+			logger.error("init() called more than once. queueProvider="
+					+ queueProvider.getName() + ", consumerProvider="
+					+ consumerProvider.getName());
+			return;
+		}
+		String propPrefix = "xasecure.audit.filespool";
+		if (basePropertyName != null) {
+			propPrefix = basePropertyName;
+		}
+
+		try {
+			gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+					.create();
+
+			// Initial folder and file properties
+			String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+					+ "." + PROP_FILE_SPOOL_LOCAL_DIR);
+			logFileNameFormat = MiscUtil.getStringProperty(props,
+					basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME);
+			String archiveFolderProp = MiscUtil.getStringProperty(props,
+					propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR);
+			fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_FILENAME_PREFIX);
+			indexFileName = MiscUtil.getStringProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_INDEX_FILE);
+			retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix
+					+ "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS);
+			fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec);
+			maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles);
+
+			logger.info("retryDestinationMS=" + retryDestinationMS
+					+ ", queueName=" + queueProvider.getName());
+			logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName="
+					+ queueProvider.getName());
+			logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName="
+					+ queueProvider.getName());
+
+			if (logFolderProp == null || logFolderProp.isEmpty()) {
+				logger.error("Audit spool folder is not configured. Please set "
+						+ propPrefix
+						+ "."
+						+ PROP_FILE_SPOOL_LOCAL_DIR
+						+ ". queueName=" + queueProvider.getName());
+				return;
+			}
+			logFolder = new File(logFolderProp);
+			if (!logFolder.isDirectory()) {
+				logFolder.mkdirs();
+				if (!logFolder.isDirectory()) {
+					logger.error("File Spool folder not found and can't be created. folder="
+							+ logFolder.getAbsolutePath()
+							+ ", queueName="
+							+ queueProvider.getName());
+					return;
+				}
+			}
+			logger.info("logFolder=" + logFolder + ", queueName="
+					+ queueProvider.getName());
+
+			if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+				logFileNameFormat = "spool_" + "%app-type%" + "_"
+						+ "%time:yyyyMMdd-HHmm.ss%.log";
+			}
+			logger.info("logFileNameFormat=" + logFileNameFormat
+					+ ", queueName=" + queueProvider.getName());
+
+			if (archiveFolderProp == null || archiveFolderProp.isEmpty()) {
+				archiveFolder = new File(logFolder, "archive");
+			} else {
+				archiveFolder = new File(archiveFolderProp);
+			}
+			if (!archiveFolder.isDirectory()) {
+				archiveFolder.mkdirs();
+				if (!archiveFolder.isDirectory()) {
+					logger.error("File Spool archive folder not found and can't be created. folder="
+							+ archiveFolder.getAbsolutePath()
+							+ ", queueName="
+							+ queueProvider.getName());
+					return;
+				}
+			}
+			logger.info("archiveFolder=" + archiveFolder + ", queueName="
+					+ queueProvider.getName());
+
+			if (indexFileName == null || indexFileName.isEmpty()) {
+				if (fileNamePrefix == null || fileNamePrefix.isEmpty()) {
+					fileNamePrefix = queueProvider.getName() + "_"
+							+ consumerProvider.getName();
+				}
+				indexFileName = "index_" + fileNamePrefix + ".json";
+			}
+
+			indexFile = new File(logFolder, indexFileName);
+			if (!indexFile.exists()) {
+				indexFile.createNewFile();
+			}
+			logger.info("indexFile=" + indexFile + ", queueName="
+					+ queueProvider.getName());
+
+			int lastDot = indexFileName.lastIndexOf('.');
+			indexDoneFileName = indexFileName.substring(0, lastDot)
+					+ "_closed.json";
+			indexDoneFile = new File(logFolder, indexDoneFileName);
+			if (!indexDoneFile.exists()) {
+				indexDoneFile.createNewFile();
+			}
+			logger.info("indexDoneFile=" + indexDoneFile + ", queueName="
+					+ queueProvider.getName());
+
+			// Load index file
+			loadIndexFile();
+			for (AuditIndexRecord auditIndexRecord : indexRecords) {
+				if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) {
+					isPending = true;
+				}
+				if (auditIndexRecord.status
+						.equals(SPOOL_FILE_STATUS.write_inprogress)) {
+					currentWriterIndexRecord = auditIndexRecord;
+					logger.info("currentWriterIndexRecord="
+							+ currentWriterIndexRecord.filePath
+							+ ", queueName=" + queueProvider.getName());
+				}
+				if (auditIndexRecord.status
+						.equals(SPOOL_FILE_STATUS.read_inprogress)) {
+					indexQueue.add(auditIndexRecord);
+				}
+			}
+			printIndex();
+			// One more loop to add the rest of the pending records in reverse
+			// order
+			for (int i = 0; i < indexRecords.size(); i++) {
+				AuditIndexRecord auditIndexRecord = indexRecords.get(i);
+				if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
+					File consumerFile = new File(auditIndexRecord.filePath);
+					if (!consumerFile.exists()) {
+						logger.error("INIT: Consumer file="
+								+ consumerFile.getPath() + " not found.");
+						System.exit(1);
+					}
+					indexQueue.add(auditIndexRecord);
+				}
+			}
+
+		} catch (Throwable t) {
+			logger.fatal("Error initializing File Spooler. queue="
+					+ queueProvider.getName(), t);
+			return;
+		}
+		initDone = true;
+	}
+
+	/**
+	 * Start looking for outstanding logs and update status according.
+	 */
+	public void start() {
+		if (!initDone) {
+			logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName="
+					+ queueProvider.getName());
+			return;
+		}
+
+		logger.info("Starting writerThread, queueName="
+				+ queueProvider.getName() + ", consumer="
+				+ consumerProvider.getName());
+
+		// Let's start the thread to read
+		destinationThread = new Thread(this, queueProvider.getName()
+				+ "_destWriter");
+		destinationThread.setDaemon(true);
+		destinationThread.start();
+	}
+
+	public void stop() {
+		if (!initDone) {
+			logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName="
+					+ queueProvider.getName());
+			return;
+		}
+		logger.info("Stop called, queueName=" + queueProvider.getName()
+				+ ", consumer=" + consumerProvider.getName());
+
+		isDrain = true;
+		flush();
+
+		PrintWriter out = getOpenLogFileStream();
+		if (out != null) {
+			// If write is still going on, then let's give it enough time to
+			// complete
+			for (int i = 0; i < 3; i++) {
+				if (isWriting) {
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						// ignore
+					}
+					continue;
+				}
+				try {
+					logger.info("Closing open file, queueName="
+							+ queueProvider.getName() + ", consumer="
+							+ consumerProvider.getName());
+
+					out.flush();
+					out.close();
+				} catch (Throwable t) {
+					logger.debug("Error closing spool out file.", t);
+				}
+			}
+		}
+		try {
+			if (destinationThread != null) {
+				destinationThread.interrupt();
+			}
+			destinationThread = null;
+		} catch (Throwable e) {
+			// ignore
+		}
+	}
+
+	public void flush() {
+		if (!initDone) {
+			logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName="
+					+ queueProvider.getName());
+			return;
+		}
+		PrintWriter out = getOpenLogFileStream();
+		if (out != null) {
+			out.flush();
+		}
+	}
+
+	/**
+	 * If any files are still not processed. Also, if the destination is not
+	 * reachable
+	 * 
+	 * @return
+	 */
+	public boolean isPending() {
+		if (!initDone) {
+			logError("isPending(): File Spooler not initialized. queueName="
+					+ queueProvider.getName());
+			return false;
+		}
+
+		return isPending;
+	}
+
+	/**
+	 * Milliseconds from last attempt time
+	 * 
+	 * @return
+	 */
+	public long getLastAttemptTimeDelta() {
+		if (lastAttemptTime == 0) {
+			return 0;
+		}
+		return System.currentTimeMillis() - lastAttemptTime;
+	}
+
+	synchronized public void stashLogs(AuditEventBase event) {
+		if (isDrain) {
+			// Stop has been called, so this method shouldn't be called
+			logger.error("stashLogs() is called after stop is called. event="
+					+ event);
+			return;
+		}
+		try {
+			isWriting = true;
+			PrintWriter logOut = getLogFileStream();
+			// Convert event to json
+			String jsonStr = MiscUtil.stringify(event);
+			logOut.println(jsonStr);
+			isPending = true;
+		} catch (Exception ex) {
+			logger.error("Error writing to file. event=" + event, ex);
+		} finally {
+			isWriting = false;
+		}
+
+	}
+
+	synchronized public void stashLogs(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			stashLogs(event);
+		}
+		flush();
+	}
+
+	synchronized public void stashLogsString(String event) {
+		if (isDrain) {
+			// Stop has been called, so this method shouldn't be called
+			logger.error("stashLogs() is called after stop is called. event="
+					+ event);
+			return;
+		}
+		try {
+			isWriting = true;
+			PrintWriter logOut = getLogFileStream();
+			logOut.println(event);
+		} catch (Exception ex) {
+			logger.error("Error writing to file. event=" + event, ex);
+		} finally {
+			isWriting = false;
+		}
+
+	}
+
+	synchronized public void stashLogsString(Collection<String> events) {
+		for (String event : events) {
+			stashLogsString(event);
+		}
+		flush();
+	}
+
+	/**
+	 * This return the current file. If there are not current open output file,
+	 * then it will return null
+	 * 
+	 * @return
+	 * @throws Exception
+	 */
+	synchronized private PrintWriter getOpenLogFileStream() {
+		return logWriter;
+	}
+
+	/**
+	 * @return
+	 * @throws Exception
+	 */
+	synchronized private PrintWriter getLogFileStream() throws Exception {
+		closeFileIfNeeded();
+
+		// Either there are no open log file or the previous one has been rolled
+		// over
+		if (currentWriterIndexRecord == null) {
+			Date currentTime = new Date();
+			// Create a new file
+			String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+					currentTime.getTime());
+			String newFileName = fileName;
+			File outLogFile = null;
+			int i = 0;
+			while (true) {
+				outLogFile = new File(logFolder, newFileName);
+				File archiveLogFile = new File(archiveFolder, newFileName);
+				if (!outLogFile.exists() && !archiveLogFile.exists()) {
+					break;
+				}
+				i++;
+				int lastDot = fileName.lastIndexOf('.');
+				String baseName = fileName.substring(0, lastDot);
+				String extension = fileName.substring(lastDot);
+				newFileName = baseName + "." + i + extension;
+			}
+			fileName = newFileName;
+			logger.info("Creating new file. queueName="
+					+ queueProvider.getName() + ", fileName=" + fileName);
+			// Open the file
+			logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+					outLogFile)));
+
+			AuditIndexRecord tmpIndexRecord = new AuditIndexRecord();
+
+			tmpIndexRecord.id = MiscUtil.generateUniqueId();
+			tmpIndexRecord.filePath = outLogFile.getPath();
+			tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress;
+			tmpIndexRecord.fileCreateTime = currentTime;
+			tmpIndexRecord.lastAttempt = true;
+			currentWriterIndexRecord = tmpIndexRecord;
+			indexRecords.add(currentWriterIndexRecord);
+			saveIndexFile();
+
+		} else {
+			if (logWriter == null) {
+				// This means the process just started. We need to open the file
+				// in append mode.
+				logger.info("Opening existing file for append. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+				logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+						currentWriterIndexRecord.filePath, true)));
+			}
+		}
+		return logWriter;
+	}
+
+	synchronized private void closeFileIfNeeded() throws FileNotFoundException,
+			IOException {
+		// Is there file open to write or there are no pending file, then close
+		// the active file
+		if (currentWriterIndexRecord != null) {
+			// Check whether the file needs to rolled
+			boolean closeFile = false;
+			if (indexRecords.size() == 1) {
+				closeFile = true;
+				logger.info("Closing file. Only one open file. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+			} else if (System.currentTimeMillis()
+					- currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) {
+				closeFile = true;
+				logger.info("Closing file. Rolling over. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+			}
+			if (closeFile) {
+				// Roll the file
+				if (logWriter != null) {
+					logWriter.flush();
+					logWriter.close();
+					logWriter = null;
+				}
+				currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending;
+				currentWriterIndexRecord.writeCompleteTime = new Date();
+				saveIndexFile();
+				logger.info("Adding file to queue. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+				indexQueue.add(currentWriterIndexRecord);
+				currentWriterIndexRecord = null;
+			}
+		}
+	}
+
+	/**
+	 * Load the index file
+	 * 
+	 * @throws IOException
+	 */
+	void loadIndexFile() throws IOException {
+		logger.info("Loading index file. fileName=" + indexFile.getPath());
+		BufferedReader br = new BufferedReader(new FileReader(indexFile));
+		indexRecords.clear();
+		String line;
+		while ((line = br.readLine()) != null) {
+			if (!line.isEmpty() && !line.startsWith("#")) {
+				AuditIndexRecord record = gson.fromJson(line,
+						AuditIndexRecord.class);
+				indexRecords.add(record);
+			}
+		}
+		br.close();
+	}
+
+	synchronized void printIndex() {
+		logger.info("INDEX printIndex() ==== START");
+		Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+		while (iter.hasNext()) {
+			AuditIndexRecord record = iter.next();
+			logger.info("INDEX=" + record + ", isFileExist="
+					+ (new File(record.filePath).exists()));
+		}
+		logger.info("INDEX printIndex() ==== END");
+	}
+
+	synchronized void removeIndexRecord(AuditIndexRecord indexRecord)
+			throws FileNotFoundException, IOException {
+		Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+		while (iter.hasNext()) {
+			AuditIndexRecord record = iter.next();
+			if (record.id.equals(indexRecord.id)) {
+				logger.info("Removing file from index. file=" + record.filePath
+						+ ", queueName=" + queueProvider.getName()
+						+ ", consumer=" + consumerProvider.getName());
+
+				iter.remove();
+				appendToDoneFile(record);
+			}
+		}
+		saveIndexFile();
+	}
+
+	synchronized void saveIndexFile() throws FileNotFoundException, IOException {
+		PrintWriter out = new PrintWriter(indexFile);
+		for (AuditIndexRecord auditIndexRecord : indexRecords) {
+			out.println(gson.toJson(auditIndexRecord));
+		}
+		out.close();
+		// printIndex();
+
+	}
+
+	void appendToDoneFile(AuditIndexRecord indexRecord)
+			throws FileNotFoundException, IOException {
+		logger.info("Moving to done file. " + indexRecord.filePath
+				+ ", queueName=" + queueProvider.getName() + ", consumer="
+				+ consumerProvider.getName());
+		String line = gson.toJson(indexRecord);
+		PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(
+				indexDoneFile, true)));
+		out.println(line);
+		out.flush();
+		out.close();
+
+		// Move to archive folder
+		File logFile = null;
+		File archiveFile = null;
+		try {
+			logFile = new File(indexRecord.filePath);
+			String fileName = logFile.getName();
+			archiveFile = new File(archiveFolder, fileName);
+			logger.info("Moving logFile " + logFile + " to " + archiveFile);
+			logFile.renameTo(archiveFile);
+		} catch (Throwable t) {
+			logger.error("Error moving log file to archive folder. logFile="
+					+ logFile + ", archiveFile=" + archiveFile, t);
+		}
+
+		archiveFile = null;
+		try {
+			// Remove old files
+			File[] logFiles = archiveFolder.listFiles(new FileFilter() {
+				public boolean accept(File pathname) {
+					return pathname.getName().toLowerCase().endsWith(".log");
+				}
+			});
+
+			if (logFiles.length > maxArchiveFiles) {
+				int filesToDelete = logFiles.length - maxArchiveFiles;
+				BufferedReader br = new BufferedReader(new FileReader(
+						indexDoneFile));
+				try {
+					int filesDeletedCount = 0;
+					while ((line = br.readLine()) != null) {
+						if (!line.isEmpty() && !line.startsWith("#")) {
+							AuditIndexRecord record = gson.fromJson(line,
+									AuditIndexRecord.class);
+							logFile = new File(record.filePath);
+							String fileName = logFile.getName();
+							archiveFile = new File(archiveFolder, fileName);
+							if (archiveFile.exists()) {
+								logger.info("Deleting archive file "
+										+ archiveFile);
+								boolean ret = archiveFile.delete();
+								if (!ret) {
+									logger.error("Error deleting archive file. archiveFile="
+											+ archiveFile);
+								}
+								filesDeletedCount++;
+								if (filesDeletedCount >= filesToDelete) {
+									logger.info("Deleted " + filesDeletedCount
+											+ " files");
+									break;
+								}
+							}
+						}
+					}
+				} finally {
+					br.close();
+				}
+			}
+		} catch (Throwable t) {
+			logger.error("Error deleting older archive file. archiveFile="
+					+ archiveFile, t);
+		}
+
+	}
+
+	void logError(String msg) {
+		long currTimeMS = System.currentTimeMillis();
+		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+			logger.error(msg);
+			lastErrorLogMS = currTimeMS;
+		}
+	}
+
+	class AuditIndexRecord {
+		String id;
+		String filePath;
+		int linePosition = 0;
+		SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress;
+		Date fileCreateTime;
+		Date writeCompleteTime;
+		Date doneCompleteTime;
+		Date lastSuccessTime;
+		Date lastFailedTime;
+		int failedAttemptCount = 0;
+		boolean lastAttempt = false;
+
+		@Override
+		public String toString() {
+			return "AuditIndexRecord [id=" + id + ", filePath=" + filePath
+					+ ", linePosition=" + linePosition + ", status=" + status
+					+ ", fileCreateTime=" + fileCreateTime
+					+ ", writeCompleteTime=" + writeCompleteTime
+					+ ", doneCompleteTime=" + doneCompleteTime
+					+ ", lastSuccessTime=" + lastSuccessTime
+					+ ", lastFailedTime=" + lastFailedTime
+					+ ", failedAttemptCount=" + failedAttemptCount
+					+ ", lastAttempt=" + lastAttempt + "]";
+		}
+
+	}
+
+	class AuditFileSpoolAttempt {
+		Date attemptTime;
+		String status;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see java.lang.Runnable#run()
+	 */
+	@Override
+	public void run() {
+		while (true) {
+			try {
+				// Let's pause between each iteration
+				if (currentConsumerIndexRecord == null) {
+					currentConsumerIndexRecord = indexQueue.poll(
+							retryDestinationMS, TimeUnit.MILLISECONDS);
+				} else {
+					Thread.sleep(retryDestinationMS);
+				}
+
+				if (isDrain) {
+					// Need to exit
+					break;
+				}
+				if (currentConsumerIndexRecord == null) {
+					closeFileIfNeeded();
+					continue;
+				}
+
+				boolean isRemoveIndex = false;
+				File consumerFile = new File(
+						currentConsumerIndexRecord.filePath);
+				if (!consumerFile.exists()) {
+					logger.error("Consumer file=" + consumerFile.getPath()
+							+ " not found.");
+					printIndex();
+					isRemoveIndex = true;
+				} else {
+					// Let's open the file to write
+					BufferedReader br = new BufferedReader(new FileReader(
+							currentConsumerIndexRecord.filePath));
+					try {
+						int startLine = currentConsumerIndexRecord.linePosition;
+						String line;
+						int currLine = 0;
+						boolean isResumed = false;
+						List<String> lines = new ArrayList<String>();
+						while ((line = br.readLine()) != null) {
+							currLine++;
+							if (currLine < startLine) {
+								continue;
+							}
+							lines.add(line);
+							if (lines.size() == queueProvider.getMaxBatchSize()) {
+								boolean ret = sendEvent(lines,
+										currentConsumerIndexRecord, currLine);
+								if (!ret) {
+									throw new Exception("Destination down");
+								} else {
+									if (!isResumed) {
+										logger.info("Started writing to destination. file="
+												+ currentConsumerIndexRecord.filePath
+												+ ", queueName="
+												+ queueProvider.getName()
+												+ ", consumer="
+												+ consumerProvider.getName());
+									}
+								}
+								lines.clear();
+							}
+						}
+						if (lines.size() > 0) {
+							boolean ret = sendEvent(lines,
+									currentConsumerIndexRecord, currLine);
+							if (!ret) {
+								throw new Exception("Destination down");
+							} else {
+								if (!isResumed) {
+									logger.info("Started writing to destination. file="
+											+ currentConsumerIndexRecord.filePath
+											+ ", queueName="
+											+ queueProvider.getName()
+											+ ", consumer="
+											+ consumerProvider.getName());
+								}
+							}
+							lines.clear();
+						}
+						logger.info("Done reading file. file="
+								+ currentConsumerIndexRecord.filePath
+								+ ", queueName=" + queueProvider.getName()
+								+ ", consumer=" + consumerProvider.getName());
+						// The entire file is read
+						currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done;
+						currentConsumerIndexRecord.doneCompleteTime = new Date();
+						currentConsumerIndexRecord.lastAttempt = true;
+
+						isRemoveIndex = true;
+					} catch (Exception ex) {
+						isDestDown = true;
+						logError("Destination down. queueName="
+								+ queueProvider.getName() + ", consumer="
+								+ consumerProvider.getName());
+						lastAttemptTime = System.currentTimeMillis();
+						// Update the index file
+						currentConsumerIndexRecord.lastFailedTime = new Date();
+						currentConsumerIndexRecord.failedAttemptCount++;
+						currentConsumerIndexRecord.lastAttempt = false;
+						saveIndexFile();
+					} finally {
+						br.close();
+					}
+				}
+				if (isRemoveIndex) {
+					// Remove this entry from index
+					removeIndexRecord(currentConsumerIndexRecord);
+					currentConsumerIndexRecord = null;
+					closeFileIfNeeded();
+				}
+			} catch (Throwable t) {
+				logger.error("Exception in destination writing thread.", t);
+			}
+		}
+		logger.info("Exiting file spooler. provider=" + queueProvider.getName()
+				+ ", consumer=" + consumerProvider.getName());
+	}
+
+	private boolean sendEvent(List<String> lines, AuditIndexRecord indexRecord,
+			int currLine) {
+		boolean ret = true;
+		try {
+			ret = consumerProvider.logJSON(lines);
+			if (!ret) {
+				// Need to log error after fixed interval
+				logError("Error sending logs to consumer. provider="
+						+ queueProvider.getName() + ", consumer="
+						+ consumerProvider.getName());
+			} else {
+				// Update index and save
+				indexRecord.linePosition = currLine;
+				indexRecord.status = SPOOL_FILE_STATUS.read_inprogress;
+				indexRecord.lastSuccessTime = new Date();
+				indexRecord.lastAttempt = true;
+				saveIndexFile();
+
+				if (isDestDown) {
+					isDestDown = false;
+					logger.info("Destination up now. " + indexRecord.filePath
+							+ ", queueName=" + queueProvider.getName()
+							+ ", consumer=" + consumerProvider.getName());
+				}
+			}
+		} catch (Throwable t) {
+			logger.error("Error while sending logs to consumer. provider="
+					+ queueProvider.getName() + ", consumer="
+					+ consumerProvider.getName() + ", log=" + lines, t);
+		}
+
+		return ret;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
new file mode 100644
index 0000000..e102d8b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+/**
+ * This is a non-blocking queue with no limit on capacity.
+ */
+public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
+	private static final Log logger = LogFactory
+			.getLog(AuditSummaryQueue.class);
+
+	public static final String PROP_SUMMARY_INTERVAL = "summary.interval.ms";
+
+	LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+	Thread consumerThread = null;
+
+	static int threadCount = 0;
+	static final String DEFAULT_NAME = "summary";
+
+	private static final int MAX_DRAIN = 100000;
+
+	private int maxSummaryInterval = 5000;
+
+	HashMap<String, AuditSummary> summaryMap = new HashMap<String, AuditSummary>();
+
+	public AuditSummaryQueue() {
+		setName(DEFAULT_NAME);
+	}
+
+	public AuditSummaryQueue(AuditProvider consumer) {
+		super(consumer);
+		setName(DEFAULT_NAME);
+	}
+
+	@Override
+	public void init(Properties props, String propPrefix) {
+		super.init(props, propPrefix);
+		maxSummaryInterval = MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_SUMMARY_INTERVAL, maxSummaryInterval);
+		logger.info("maxSummaryInterval=" + maxSummaryInterval + ", name="
+				+ getName());
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+	 * audit.model.AuditEventBase)
+	 */
+	@Override
+	public boolean log(AuditEventBase event) {
+		// Add to the queue and return ASAP
+		if (queue.size() >= getMaxQueueSize()) {
+			return false;
+		}
+		queue.add(event);
+		addLifeTimeInLogCount(1);
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		boolean ret = true;
+		for (AuditEventBase event : events) {
+			ret = log(event);
+			if (!ret) {
+				break;
+			}
+		}
+		return ret;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+	 */
+	@Override
+	public void start() {
+		if (consumer != null) {
+			consumer.start();
+		}
+
+		consumerThread = new Thread(this, this.getClass().getName()
+				+ (threadCount++));
+		consumerThread.setDaemon(true);
+		consumerThread.start();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+	 */
+	@Override
+	public void stop() {
+		setDrain(true);
+		try {
+			if (consumerThread != null) {
+				consumerThread.interrupt();
+			}
+			consumerThread = null;
+		} catch (Throwable t) {
+			// ignore any exception
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+	 */
+	@Override
+	public boolean isFlushPending() {
+		if (queue.isEmpty()) {
+			return consumer.isFlushPending();
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see java.lang.Runnable#run()
+	 */
+	@Override
+	public void run() {
+		long lastDispatchTime = System.currentTimeMillis();
+
+		while (true) {
+			// Time to next dispatch
+			long nextDispatchDuration = lastDispatchTime
+					- System.currentTimeMillis() + maxSummaryInterval;
+
+			Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+
+			try {
+				AuditEventBase event = null;
+				if (!isDrain() && nextDispatchDuration > 0) {
+					event = queue.poll(nextDispatchDuration,
+							TimeUnit.MILLISECONDS);
+				} else {
+					// For poll() is non blocking
+					event = queue.poll();
+				}
+
+				if (event != null) {
+					eventList.add(event);
+					queue.drainTo(eventList, MAX_DRAIN - 1);
+				} else {
+					// poll returned due to timeout, so reseting clock
+					nextDispatchDuration = lastDispatchTime
+							- System.currentTimeMillis() + maxSummaryInterval;
+					lastDispatchTime = System.currentTimeMillis();
+				}
+			} catch (InterruptedException e) {
+				logger.info(
+						"Caught exception in consumer thread. Mostly to about loop",
+						e);
+			} catch (Throwable t) {
+				logger.error("Caught error during processing request.", t);
+			}
+
+			for (AuditEventBase event : eventList) {
+				// Add to hash map
+				String key = event.getEventKey();
+				AuditSummary auditSummary = summaryMap.get(key);
+				if (auditSummary == null) {
+					auditSummary = new AuditSummary();
+					auditSummary.event = event;
+					auditSummary.startTime = event.getEventTime();
+					auditSummary.endTime = event.getEventTime();
+					auditSummary.count = 1;
+					summaryMap.put(key, auditSummary);
+				} else {
+					auditSummary.endTime = event.getEventTime();
+					auditSummary.count++;
+				}
+			}
+
+			if (isDrain() || nextDispatchDuration <= 0) {
+				for (Map.Entry<String, AuditSummary> entry : summaryMap
+						.entrySet()) {
+					AuditSummary auditSummary = entry.getValue();
+					auditSummary.event.setEventCount(auditSummary.count);
+					long timeDiff = auditSummary.endTime.getTime()
+							- auditSummary.startTime.getTime();
+					timeDiff = timeDiff > 0 ? timeDiff : 1;
+					auditSummary.event.setEventDurationMS(timeDiff);
+
+					// Reset time just before sending the logs
+					lastDispatchTime = System.currentTimeMillis();
+					boolean ret = consumer.log(auditSummary.event);
+					if (!ret) {
+						// We need to drop this event
+						logFailedEvent(auditSummary.event, null);
+					}
+				}
+				summaryMap.clear();
+			}
+
+			if (isDrain() && summaryMap.isEmpty() && queue.isEmpty()) {
+				break;
+			}
+		}
+
+		try {
+			// Call stop on the consumer
+			consumer.stop();
+		} catch (Throwable t) {
+			logger.error("Error while calling stop on consumer.", t);
+		}
+	}
+
+	class AuditSummary {
+		Date startTime = null;
+		Date endTime = null;
+		int count = 0;
+		AuditEventBase event;
+	}
+}