You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/20 00:11:55 UTC

[1/4] incubator-ranger git commit: RANGER-397 - Implement reliable streaming audits to configurable destinations

Repository: incubator-ranger
Updated Branches:
  refs/heads/master 4a20a9dd9 -> 44f403ead


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
----------------------------------------------------------------------
diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
new file mode 100644
index 0000000..a023b9a
--- /dev/null
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.AuditAsyncQueue;
+import org.apache.ranger.audit.provider.AuditBatchProcessor;
+import org.apache.ranger.audit.provider.AuditDestination;
+import org.apache.ranger.audit.provider.AuditFileSpool;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditProviderFactory;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.FileAuditDestination;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.provider.MultiDestAuditProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAuditProcessor {
+
+	private static final Log logger = LogFactory
+			.getLog(TestAuditProcessor.class);
+
+	@BeforeClass
+	public static void setUpBeforeClass() throws Exception {
+	}
+
+	@AfterClass
+	public static void tearDownAfterClass() throws Exception {
+	}
+
+	static private int seqNum = 0;
+
+	@Test
+	public void testAuditAsyncQueue() {
+		logger.debug("testAuditAsyncQueue()...");
+		TestConsumer testConsumer = new TestConsumer();
+		AuditAsyncQueue queue = new AuditAsyncQueue(testConsumer);
+		Properties props = new Properties();
+		queue.init(props);
+
+		queue.start();
+
+		int messageToSend = 10;
+		for (int i = 0; i < messageToSend; i++) {
+			queue.log(createEvent());
+		}
+		queue.stop();
+		queue.waitToComplete();
+		// Let's wait for second
+		try {
+			Thread.sleep(1000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+		assertEquals(messageToSend, testConsumer.getCountTotal());
+		assertEquals(messageToSend, testConsumer.getSumTotal());
+		assertNull("Event not in sequnce", testConsumer.isInSequence());
+	}
+
+	@Test
+	public void testMultipleQueue() {
+		logger.debug("testAuditAsyncQueue()...");
+		int destCount = 3;
+		TestConsumer[] testConsumer = new TestConsumer[destCount];
+
+		MultiDestAuditProvider multiQueue = new MultiDestAuditProvider();
+		for (int i = 0; i < destCount; i++) {
+			testConsumer[i] = new TestConsumer();
+			multiQueue.addAuditProvider(testConsumer[i]);
+		}
+
+		AuditAsyncQueue queue = new AuditAsyncQueue(multiQueue);
+		Properties props = new Properties();
+		queue.init(props);
+		queue.start();
+
+		int messageToSend = 10;
+		for (int i = 0; i < messageToSend; i++) {
+			queue.log(createEvent());
+		}
+		queue.stop();
+		queue.waitToComplete();
+		// Let's wait for second
+		try {
+			Thread.sleep(1000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+		for (int i = 0; i < destCount; i++) {
+			assertEquals("consumer" + i, messageToSend,
+					testConsumer[i].getCountTotal());
+			assertEquals("consumer" + i, messageToSend,
+					testConsumer[i].getSumTotal());
+
+		}
+	}
+
+	@Test
+	public void testAuditBatchProcessorBySize() {
+		logger.debug("testAuditBatchProcessor()...");
+		int messageToSend = 10;
+
+		String basePropName = "ranger.test.batch";
+		int batchSize = messageToSend / 3;
+		int expectedBatchSize = batchSize
+				+ (batchSize * 3 < messageToSend ? 1 : 0);
+		int queueSize = messageToSend * 2;
+		int intervalMS = messageToSend * 100; // Deliberately big interval
+		Properties props = new Properties();
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+				+ batchSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+				+ queueSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+				"" + intervalMS);
+
+		TestConsumer testConsumer = new TestConsumer();
+		AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+		queue.init(props, basePropName);
+		queue.start();
+
+		for (int i = 0; i < messageToSend; i++) {
+			queue.log(createEvent());
+
+		}
+		// Let's wait for second
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+
+		queue.waitToComplete();
+		queue.stop();
+		queue.waitToComplete();
+
+		assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+		assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+		assertEquals("Total batch", expectedBatchSize,
+				testConsumer.getBatchCount());
+		assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+	}
+
+	@Test
+	public void testAuditBatchProcessorByTime() {
+		logger.debug("testAuditBatchProcessor()...");
+
+		int messageToSend = 10;
+
+		String basePropName = "ranger.test.batch";
+		int batchSize = messageToSend * 2; // Deliberately big size
+		int queueSize = messageToSend * 2;
+		int intervalMS = (1000 / messageToSend) * 3; // e.g (1000/10 * 3) = 300
+														// ms
+		int pauseMS = 1000 / messageToSend + 3; // e.g. 1000/10 -5 = 95ms
+		int expectedBatchSize = (messageToSend * pauseMS) / intervalMS + 1;
+
+		Properties props = new Properties();
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+				+ batchSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+				+ queueSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+				"" + intervalMS);
+
+		TestConsumer testConsumer = new TestConsumer();
+		AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+		queue.init(props, basePropName);
+		queue.start();
+
+		for (int i = 0; i < messageToSend; i++) {
+			queue.log(createEvent());
+			try {
+				Thread.sleep(pauseMS);
+			} catch (InterruptedException e) {
+				// ignore
+			}
+		}
+		// Let's wait for second
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+		queue.waitToComplete();
+		queue.stop();
+		queue.waitToComplete();
+
+		assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+		assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+		assertEquals("Total batch", expectedBatchSize,
+				testConsumer.getBatchCount());
+		assertNull("Event not in sequnce", testConsumer.isInSequence());
+	}
+
+	@Test
+	public void testAuditBatchProcessorDestDown() {
+		logger.debug("testAuditBatchProcessorDestDown()...");
+		int messageToSend = 10;
+
+		String basePropName = "ranger.test.batch";
+		int batchSize = messageToSend / 3;
+		int queueSize = messageToSend * 2;
+		int intervalMS = Integer.MAX_VALUE; // Deliberately big interval
+		Properties props = new Properties();
+		props.put(basePropName + "." + BaseAuditProvider.PROP_NAME,
+				"testAuditBatchProcessorDestDown");
+
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+				+ batchSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+				+ queueSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+				"" + intervalMS);
+
+		// Enable File Spooling
+		props.put(basePropName + "." + "filespool.enable", "" + true);
+		props.put(basePropName + "." + "filespool.dir", "target");
+
+		TestConsumer testConsumer = new TestConsumer();
+		testConsumer.isDown = true;
+
+		AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+		queue.init(props, basePropName);
+		queue.start();
+
+		for (int i = 0; i < messageToSend; i++) {
+			queue.log(createEvent());
+
+		}
+		// Let's wait for second
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+
+		queue.waitToComplete(5000);
+		queue.stop();
+		queue.waitToComplete();
+
+		assertEquals("Total count", 0, testConsumer.getCountTotal());
+		assertEquals("Total sum", 0, testConsumer.getSumTotal());
+		assertEquals("Total batch", 0, testConsumer.getBatchCount());
+		assertNull("Event not in sequnce", testConsumer.isInSequence());
+	}
+
+	//@Test
+	public void testAuditBatchProcessorDestDownFlipFlop() {
+		logger.debug("testAuditBatchProcessorDestDown()...");
+		int messageToSend = 10;
+
+		String basePropName = "ranger.test.batch";
+		int batchSize = messageToSend / 3;
+		int expectedBatchSize = batchSize
+				+ (batchSize * 3 < messageToSend ? 1 : 0);
+		int queueSize = messageToSend * 2;
+		int intervalMS = 3000; // Deliberately big interval
+		Properties props = new Properties();
+		props.put(
+				basePropName + "." + BaseAuditProvider.PROP_NAME,
+				"testAuditBatchProcessorDestDownFlipFlop_"
+						+ MiscUtil.generateUniqueId());
+
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+				+ batchSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+				+ queueSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+				"" + intervalMS);
+
+		// Enable File Spooling
+		int destRetryMS = 10;
+		props.put(
+				basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+				"" + true);
+		props.put(
+				basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
+				"target");
+		props.put(basePropName + "."
+				+ AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, ""
+				+ destRetryMS);
+
+		TestConsumer testConsumer = new TestConsumer();
+		testConsumer.isDown = false;
+
+		AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+		queue.init(props, basePropName);
+		queue.start();
+
+		try {
+			queue.log(createEvent());
+			queue.log(createEvent());
+			queue.log(createEvent());
+			Thread.sleep(1000);
+			testConsumer.isDown = true;
+			Thread.sleep(1000);
+			queue.log(createEvent());
+			queue.log(createEvent());
+			queue.log(createEvent());
+			Thread.sleep(1000);
+			testConsumer.isDown = false;
+			Thread.sleep(1000);
+			queue.log(createEvent());
+			queue.log(createEvent());
+			queue.log(createEvent());
+			Thread.sleep(1000);
+			testConsumer.isDown = true;
+			Thread.sleep(1000);
+			queue.log(createEvent());
+			Thread.sleep(1000);
+			testConsumer.isDown = false;
+			Thread.sleep(1000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+		// Let's wait for second
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+
+		queue.waitToComplete(5000);
+		queue.stop();
+		queue.waitToComplete();
+
+		assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+		assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+		assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+	}
+
+	/**
+	 * See if we recover after restart
+	 */
+	public void testAuditBatchProcessorDestDownRestart() {
+		logger.debug("testAuditBatchProcessorDestDownRestart()...");
+		int messageToSend = 10;
+
+		String basePropName = "ranger.test.batch";
+		int batchSize = messageToSend / 3;
+		int queueSize = messageToSend * 2;
+		int intervalMS = 3000; // Deliberately big interval
+		int maxArchivedFiles = 1;
+		Properties props = new Properties();
+		props.put(
+				basePropName + "." + BaseAuditProvider.PROP_NAME,
+				"testAuditBatchProcessorDestDownRestart_"
+						+ MiscUtil.generateUniqueId());
+
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+				+ batchSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+				+ queueSize);
+		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+				"" + intervalMS);
+
+		// Enable File Spooling
+		int destRetryMS = 10;
+		props.put(
+				basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+				"" + true);
+		props.put(
+				basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
+				"target");
+		props.put(basePropName + "."
+				+ AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, ""
+				+ destRetryMS);
+		props.put(basePropName + "."
+				+ AuditFileSpool.PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, ""
+				+ maxArchivedFiles);
+
+		TestConsumer testConsumer = new TestConsumer();
+		testConsumer.isDown = true;
+
+		AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+		queue.init(props, basePropName);
+		queue.start();
+
+		for (int i = 0; i < messageToSend; i++) {
+			queue.log(createEvent());
+
+		}
+		// Let's wait for second or two
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+
+		queue.waitToComplete(5000);
+		queue.stop();
+		queue.waitToComplete();
+
+		testConsumer.isDown = true;
+
+		// Let's wait for second or two
+		try {
+			Thread.sleep(5000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+		
+		
+		// Let's now recreate the objects
+		testConsumer = new TestConsumer();
+
+		queue = new AuditBatchProcessor(testConsumer);
+		queue.init(props, basePropName);
+		queue.start();
+
+		// Let's wait for second
+		try {
+			Thread.sleep(2000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+
+		queue.waitToComplete(5000);
+		queue.stop();
+		queue.waitToComplete();
+
+		assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+		assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+		assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+	}
+
+	@Test
+	public void testFileDestination() {
+		logger.debug("testFileDestination()...");
+
+		int messageToSend = 10;
+		int batchSize = messageToSend / 3;
+		int queueSize = messageToSend * 2;
+		int intervalMS = 500; // Should be less than final sleep time
+
+		String logFolderName = "target/testFileDestination";
+		File logFolder = new File(logFolderName);
+		String logFileName = "test_ranger_audit.log";
+		File logFile = new File(logFolder, logFileName);
+
+		Properties props = new Properties();
+		// Destination
+		String filePropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".file";
+		props.put(filePropPrefix, "enable");
+		props.put(filePropPrefix + "." + BaseAuditProvider.PROP_NAME, "file");
+		props.put(filePropPrefix + "."
+				+ FileAuditDestination.PROP_FILE_LOCAL_DIR, logFolderName);
+		props.put(filePropPrefix + "."
+				+ FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT,
+				"%app-type%_ranger_audit.log");
+		props.put(filePropPrefix + "."
+				+ FileAuditDestination.PROP_FILE_FILE_ROLLOVER, "" + 10);
+
+		props.put(filePropPrefix + "." + BaseAuditProvider.PROP_QUEUE, "batch");
+		String batchPropPrefix = filePropPrefix + "." + "batch";
+
+		props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+				+ batchSize);
+		props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+				+ queueSize);
+		props.put(
+				batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+				"" + intervalMS);
+
+		// Enable File Spooling
+		int destRetryMS = 10;
+		props.put(batchPropPrefix + "."
+				+ BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, "" + true);
+		props.put(batchPropPrefix + "."
+				+ AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, "target");
+		props.put(batchPropPrefix + "."
+				+ AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, ""
+				+ destRetryMS);
+
+		AuditProviderFactory factory = AuditProviderFactory.getInstance();
+		factory.init(props, "test");
+
+		// FileAuditDestination fileDest = new FileAuditDestination();
+		// fileDest.init(props, filePropPrefix);
+		//
+		// AuditBatchProcessor queue = new AuditBatchProcessor(fileDest);
+		// queue.init(props, batchPropPrefix);
+		// queue.start();
+
+		AuditProvider queue = factory.getProvider();
+
+		for (int i = 0; i < messageToSend; i++) {
+			queue.log(createEvent());
+		}
+		// Let's wait for second
+		try {
+			Thread.sleep(1000);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+
+		queue.waitToComplete();
+		queue.stop();
+		queue.waitToComplete();
+
+		assertTrue("File created", logFile.exists());
+		try {
+			List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>();
+			int totalSum = 0;
+			BufferedReader br = new BufferedReader(new FileReader(logFile));
+			String line;
+			int lastSeq = -1;
+			boolean outOfSeq = false;
+			while ((line = br.readLine()) != null) {
+				AuthzAuditEvent event = MiscUtil.fromJson(line,
+						AuthzAuditEvent.class);
+				eventList.add(event);
+				totalSum += event.getFrequencyCount();
+				if (event.getSeqNum() <= lastSeq) {
+					outOfSeq = true;
+				}
+			}
+			br.close();
+			assertEquals("Total count", messageToSend, eventList.size());
+			assertEquals("Total sum", messageToSend, totalSum);
+			assertFalse("Event not in sequnce", outOfSeq);
+
+		} catch (Throwable e) {
+			logger.error("Error opening file for reading.", e);
+			assertTrue("Error reading file. fileName=" + logFile + ", error="
+					+ e.toString(), true);
+		}
+
+	}
+
+	private AuthzAuditEvent createEvent() {
+		AuthzAuditEvent event = new AuthzAuditEvent();
+		event.setSeqNum(++seqNum);
+		return event;
+	}
+
+	class TestConsumer extends AuditDestination {
+
+		int countTotal = 0;
+		int sumTotal = 0;
+		int batchCount = 0;
+		String providerName = getClass().getName();
+		boolean isDown = false;
+		int batchSize = 3;
+
+		List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>();
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see
+		 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
+		 * .audit.model.AuditEventBase)
+		 */
+		@Override
+		public boolean log(AuditEventBase event) {
+			if (isDown) {
+				return false;
+			}
+			countTotal++;
+			if (event instanceof AuthzAuditEvent) {
+				AuthzAuditEvent azEvent = (AuthzAuditEvent) event;
+				sumTotal += azEvent.getFrequencyCount();
+				logger.info("EVENT:" + event);
+				eventList.add(azEvent);
+			}
+			return true;
+		}
+
+		@Override
+		public boolean log(Collection<AuditEventBase> events) {
+			if (isDown) {
+				return false;
+			}
+			batchCount++;
+			for (AuditEventBase event : events) {
+				log(event);
+			}
+			return true;
+		}
+
+		@Override
+		public boolean logJSON(String jsonStr) {
+			if (isDown) {
+				return false;
+			}
+			countTotal++;
+			AuthzAuditEvent event = MiscUtil.fromJson(jsonStr,
+					AuthzAuditEvent.class);
+			sumTotal += event.getFrequencyCount();
+			logger.info("JSON:" + jsonStr);
+			eventList.add(event);
+			return true;
+		}
+
+		@Override
+		public boolean logJSON(Collection<String> events) {
+			if (isDown) {
+				return false;
+			}
+			for (String event : events) {
+				logJSON(event);
+			}
+			return true;
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see
+		 * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
+		 * )
+		 */
+		@Override
+		public void init(Properties prop) {
+			// Nothing to do here
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+		 */
+		@Override
+		public void start() {
+			// Nothing to do here
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+		 */
+		@Override
+		public void stop() {
+			// Nothing to do here
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+		 */
+		@Override
+		public void waitToComplete() {
+		}
+
+		@Override
+		public int getMaxBatchSize() {
+			return batchSize;
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+		 */
+		@Override
+		public boolean isFlushPending() {
+			return false;
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see
+		 * org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
+		 */
+		@Override
+		public long getLastFlushTime() {
+			return 0;
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+		 */
+		@Override
+		public void flush() {
+			// Nothing to do here
+		}
+
+		public int getCountTotal() {
+			return countTotal;
+		}
+
+		public int getSumTotal() {
+			return sumTotal;
+		}
+
+		public int getBatchCount() {
+			return batchCount;
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see
+		 * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
+		 * , java.lang.String)
+		 */
+		@Override
+		public void init(Properties prop, String basePropertyName) {
+
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see
+		 * org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
+		 */
+		@Override
+		public void waitToComplete(long timeout) {
+
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see org.apache.ranger.audit.provider.AuditProvider#getName()
+		 */
+		@Override
+		public String getName() {
+			return providerName;
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
+		 */
+		@Override
+		public boolean isDrain() {
+			return false;
+		}
+
+		// Local methods
+		public AuthzAuditEvent isInSequence() {
+			int lastSeq = -1;
+			for (AuthzAuditEvent event : eventList) {
+				if (event.getSeqNum() <= lastSeq) {
+					return event;
+				}
+			}
+			return null;
+		}
+	}
+}


[3/4] incubator-ranger git commit: RANGER-397 - Implement reliable streaming audits to configurable destinations

Posted by bo...@apache.org.
RANGER-397 - Implement reliable streaming audits to configurable
destinations

First cut with HDFS and File destination working

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/eb1e9b5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/eb1e9b5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/eb1e9b5c

Branch: refs/heads/master
Commit: eb1e9b5c447dc3960d132bd2dc5adfbb33d5cb2c
Parents: 917833c
Author: Don Bosco Durai <bo...@apache.org>
Authored: Tue Apr 14 18:00:46 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Tue Apr 14 18:02:57 2015 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../ranger/audit/model/AuditEventBase.java      |  14 +-
 .../ranger/audit/model/AuthzAuditEvent.java     |  41 +-
 .../audit/provider/AsyncAuditProvider.java      |   5 +-
 .../ranger/audit/provider/AuditAsyncQueue.java  | 167 ++++
 .../audit/provider/AuditBatchProcessor.java     | 327 +++++++
 .../ranger/audit/provider/AuditDestination.java |  70 ++
 .../ranger/audit/provider/AuditFileSpool.java   | 875 +++++++++++++++++++
 .../audit/provider/AuditMessageException.java   |  67 ++
 .../ranger/audit/provider/AuditProvider.java    |  22 +-
 .../audit/provider/AuditProviderFactory.java    | 386 +++++---
 .../audit/provider/BaseAuditProvider.java       | 400 +++++++--
 .../audit/provider/BufferedAuditProvider.java   |  32 +-
 .../ranger/audit/provider/DbAuditProvider.java  |  47 +-
 .../audit/provider/DummyAuditProvider.java      |  76 +-
 .../audit/provider/FileAuditDestination.java    | 230 +++++
 .../audit/provider/HDFSAuditDestination.java    | 243 +++++
 .../audit/provider/LocalFileLogBuffer.java      |  17 +-
 .../audit/provider/Log4jAuditProvider.java      |  57 +-
 .../ranger/audit/provider/LogDestination.java   |  16 +-
 .../apache/ranger/audit/provider/MiscUtil.java  | 291 ++++--
 .../audit/provider/MultiDestAuditProvider.java  | 153 +++-
 .../audit/provider/hdfs/HdfsAuditProvider.java  |   3 +-
 .../audit/provider/hdfs/HdfsLogDestination.java |  48 +-
 .../provider/kafka/KafkaAuditProvider.java      |  46 +-
 .../audit/provider/solr/SolrAuditProvider.java  |  52 +-
 security-admin/.gitignore                       |   3 +
 security-admin/pom.xml                          |   6 +-
 .../apache/ranger/audit/TestAuditProcessor.java | 786 +++++++++++++++++
 29 files changed, 4081 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2c746ed..dd4e2c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
 *.class
 *.iml
+.pydevproject
 .settings/
 .metadata
 .classpath

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
index 82fcab8..a44e047 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
@@ -17,24 +17,26 @@
  * under the License.
  */
 
- package org.apache.ranger.audit.model;
+package org.apache.ranger.audit.model;
 
 import org.apache.ranger.audit.dao.DaoManager;
 
-
 public abstract class AuditEventBase {
+
 	protected AuditEventBase() {
 	}
 
 	public abstract void persist(DaoManager daoManager);
-	
+
 	protected String trim(String str, int len) {
-		String ret = str ;
+		String ret = str;
 		if (str != null) {
 			if (str.length() > len) {
-				ret = str.substring(0,len) ;
+				ret = str.substring(0, len);
 			}
 		}
-		return ret ;
+		return ret;
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
index d0c1526..af89f60 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import org.apache.ranger.audit.dao.DaoManager;
 import org.apache.ranger.audit.entity.AuthzAuditEventDbObj;
 
+import com.google.gson.Gson;
 import com.google.gson.annotations.SerializedName;
 
 
@@ -33,7 +34,6 @@ public class AuthzAuditEvent extends AuditEventBase {
 	protected static final int MAX_ACTION_FIELD_SIZE       = 1800 ;
 	protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800 ;
 
-
 	@SerializedName("repoType")
 	protected int    repositoryType = 0;
 
@@ -94,6 +94,17 @@ public class AuthzAuditEvent extends AuditEventBase {
 	@SerializedName("id")
 	protected String eventId        = null;
 
+	/**
+	 * This to ensure order within a session. Order not guaranteed across processes and hosts 
+	 */
+	@SerializedName("seq_num")
+	protected long seqNum = 0;
+
+	@SerializedName("freq_count")
+	protected long frequencyCount = 1;
+
+	@SerializedName("freq_dur_ms")
+	protected long frequencyDurationMS = 0;
 
 	public AuthzAuditEvent() {
 		super();
@@ -400,6 +411,31 @@ public class AuthzAuditEvent extends AuditEventBase {
 	}
 
 
+	
+	public long getSeqNum() {
+		return seqNum;
+	}
+
+	public void setSeqNum(long seqNum) {
+		this.seqNum = seqNum;
+	}
+
+	public long getFrequencyCount() {
+		return frequencyCount;
+	}
+
+	public void setFrequencyCount(long frequencyCount) {
+		this.frequencyCount = frequencyCount;
+	}
+
+	public long getFrequencyDurationMS() {
+		return frequencyDurationMS;
+	}
+
+	public void setFrequencyDurationMS(long frequencyDurationMS) {
+		this.frequencyDurationMS = frequencyDurationMS;
+	}
+
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
@@ -432,6 +468,9 @@ public class AuthzAuditEvent extends AuditEventBase {
 		  .append("agentHostname=").append(agentHostname).append(FIELD_SEPARATOR)
 		  .append("logType=").append(logType).append(FIELD_SEPARATOR)
 		  .append("eventId=").append(eventId).append(FIELD_SEPARATOR)
+		  .append("seq_num=").append(seqNum).append(FIELD_SEPARATOR)
+		  .append("freq_count=").append(frequencyCount).append(FIELD_SEPARATOR)
+		  .append("freq_dur_ms=").append(frequencyDurationMS).append(FIELD_SEPARATOR)
 		;
 		return sb;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index 5da5064..53adc86 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -90,10 +90,11 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 	}
 
 	@Override
-	public void log(AuditEventBase event) {
+	public boolean log(AuditEventBase event) {
 		LOG.debug("AsyncAuditProvider.logEvent(AuditEventBase)");
 
 		queueEvent(event);
+		return true;
 	}
 
 	@Override
@@ -230,7 +231,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
 		return mQueue.isEmpty();
 	}
 
-	private void waitToComplete(long maxWaitSeconds) {
+	public void waitToComplete(long maxWaitSeconds) {
 		LOG.debug("==> AsyncAuditProvider.waitToComplete()");
 
 		for (long waitTime = 0; !isEmpty()

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
new file mode 100644
index 0000000..5553bcc
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedTransferQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+/**
+ * This is a non-blocking queue with no limit on capacity.
+ */
+public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+	private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class);
+
+	LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+	Thread consumerThread = null;
+
+	static int threadCount = 0;
+	static final String DEFAULT_NAME = "async";
+
+	public AuditAsyncQueue() {
+		setName(DEFAULT_NAME);
+	}
+
+	public AuditAsyncQueue(AuditProvider consumer) {
+		super(consumer);
+		setName(DEFAULT_NAME);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+	 * audit.model.AuditEventBase)
+	 */
+	@Override
+	public boolean log(AuditEventBase event) {
+		// Add to the queue and return ASAP
+		if (queue.size() >= getMaxQueueSize()) {
+			return false;
+		}
+		queue.add(event);
+		addLifeTimeInLogCount(1);
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+	 */
+	@Override
+	public void start() {
+		if(consumer != null) {
+			consumer.start();
+		}
+		
+		consumerThread = new Thread(this, this.getClass().getName()
+				+ (threadCount++));
+		consumerThread.setDaemon(true);
+		consumerThread.start();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+	 */
+	@Override
+	public void stop() {
+		setDrain(true);
+		try {
+			consumerThread.interrupt();
+		} catch (Throwable t) {
+			// ignore any exception
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+	 */
+	@Override
+	public boolean isFlushPending() {
+		if (queue.isEmpty()) {
+			return consumer.isFlushPending();
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see java.lang.Runnable#run()
+	 */
+	@Override
+	public void run() {
+		while (true) {
+			try {
+				AuditEventBase event = null;
+				if (!isDrain()) {
+					// For Transfer queue take() is blocking
+					event = queue.take();
+				} else {
+					// For Transfer queue poll() is non blocking
+					event = queue.poll();
+				}
+				if (event != null) {
+					Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+					eventList.add(event);
+					// TODO: Put a limit. Hard coding to 1000 (use batch size
+					// property)
+					queue.drainTo(eventList, 1000 - 1);
+					consumer.log(eventList);
+					eventList.clear();
+				}
+			} catch (InterruptedException e) {
+				logger.info(
+						"Caught exception in consumer thread. Mostly to about loop",
+						e);
+			} catch (Throwable t) {
+				logger.error("Caught error during processing request.", t);
+			}
+			if (isDrain() && queue.isEmpty()) {
+				break;
+			}
+		}
+		try {
+			// Call stop on the consumer
+			consumer.stop();
+		} catch (Throwable t) {
+			logger.error("Error while calling stop on consumer.", t);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
new file mode 100644
index 0000000..58d122a
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+public class AuditBatchProcessor extends BaseAuditProvider implements Runnable {
+	private static final Log logger = LogFactory
+			.getLog(AuditBatchProcessor.class);
+
+	private BlockingQueue<AuditEventBase> queue = null;
+	private Collection<AuditEventBase> localBatchBuffer = new ArrayList<AuditEventBase>();
+
+	Thread consumerThread = null;
+	static int threadCount = 0;
+
+	public AuditBatchProcessor() {
+	}
+
+	public AuditBatchProcessor(AuditProvider consumer) {
+		super(consumer);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+	 * audit.model.AuditEventBase)
+	 */
+	@Override
+	public boolean log(AuditEventBase event) {
+		// Add to batchQueue. Block if full
+		queue.add(event);
+		addLifeTimeInLogCount(1);
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
+	}
+
+	@Override
+	public void init(Properties prop, String basePropertyName) {
+		String propPrefix = "xasecure.audit.batch";
+		if (basePropertyName != null) {
+			propPrefix = basePropertyName;
+		}
+
+		super.init(prop, propPrefix);
+
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+	 */
+	@Override
+	synchronized public void start() {
+		if (consumerThread != null) {
+			logger.error("Provider is already started. name=" + getName());
+			return;
+		}
+		logger.info("Creating ArrayBlockingQueue with maxSize="
+				+ getMaxQueueSize());
+		queue = new ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize());
+
+		// Start the consumer first
+		consumer.start();
+
+		// Then the FileSpooler
+		if (fileSpoolerEnabled) {
+			fileSpooler.start();
+		}
+
+		// Finally the queue listener
+		consumerThread = new Thread(this, this.getClass().getName()
+				+ (threadCount++));
+		consumerThread.setDaemon(true);
+		consumerThread.start();
+
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+	 */
+	@Override
+	public void stop() {
+		setDrain(true);
+		flush();
+		try {
+			consumerThread.interrupt();
+		} catch (Throwable t) {
+			// ignore any exception
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+	 */
+	@Override
+	public void waitToComplete() {
+		int defaultTimeOut = -1;
+		waitToComplete(defaultTimeOut);
+		consumer.waitToComplete(defaultTimeOut);
+	}
+
+	@Override
+	public void waitToComplete(long timeout) {
+		setDrain(true);
+		flush();
+		long sleepTime = 1000;
+		long startTime = System.currentTimeMillis();
+		int prevQueueSize = -1;
+		int staticLoopCount = 0;
+		while ((queue.size() > 0 || localBatchBuffer.size() > 0)) {
+			if (prevQueueSize == queue.size()) {
+				logger.error("Queue size is not changing. " + getName()
+						+ ".size=" + queue.size());
+				staticLoopCount++;
+				if (staticLoopCount > 5) {
+					logger.error("Aborting writing to consumer. Some logs will be discarded."
+							+ getName() + ".size=" + queue.size());
+				}
+			} else {
+				staticLoopCount = 0;
+			}
+			consumerThread.interrupt();
+			try {
+				Thread.sleep(sleepTime);
+				if (timeout > 0
+						&& (System.currentTimeMillis() - startTime > timeout)) {
+					break;
+				}
+			} catch (InterruptedException e) {
+				break;
+			}
+		}
+		consumer.waitToComplete(timeout);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+	 */
+	@Override
+	public boolean isFlushPending() {
+		if (queue.isEmpty()) {
+			return consumer.isFlushPending();
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+	 */
+	@Override
+	public void flush() {
+		if (fileSpoolerEnabled) {
+			fileSpooler.flush();
+		}
+		consumer.flush();
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see java.lang.Runnable#run()
+	 */
+	@Override
+	public void run() {
+		long lastDispatchTime = System.currentTimeMillis();
+		boolean isDestActive = true;
+		while (true) {
+			// Time to next dispatch
+			long nextDispatchDuration = lastDispatchTime
+					- System.currentTimeMillis() + getMaxBatchInterval();
+
+			boolean isToSpool = false;
+			boolean fileSpoolDrain = false;
+			try {
+				if (fileSpoolerEnabled && fileSpooler.isPending()) {
+					int percentUsed = (getMaxQueueSize() - queue.size()) * 100
+							/ getMaxQueueSize();
+					long lastAttemptDelta = fileSpooler
+							.getLastAttemptTimeDelta();
+
+					fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime;
+					// If we should even read from queue?
+					if (!isDrain() && !fileSpoolDrain
+							&& percentUsed < fileSpoolDrainThresholdPercent) {
+						// Since some files are still under progress and it is
+						// not in drain mode, lets wait and retry
+						if (nextDispatchDuration > 0) {
+							Thread.sleep(nextDispatchDuration);
+						}
+						continue;
+					}
+					isToSpool = true;
+				}
+
+				AuditEventBase event = null;
+
+				if (!isToSpool && !isDrain() && !fileSpoolDrain
+						&& nextDispatchDuration > 0) {
+					event = queue.poll(nextDispatchDuration,
+							TimeUnit.MILLISECONDS);
+
+				} else {
+					// For poll() is non blocking
+					event = queue.poll();
+				}
+				if (event != null) {
+					localBatchBuffer.add(event);
+					if (getMaxBatchSize() >= localBatchBuffer.size()) {
+						queue.drainTo(localBatchBuffer, getMaxBatchSize()
+								- localBatchBuffer.size());
+					}
+				}
+			} catch (InterruptedException e) {
+				logger.info(
+						"Caught exception in consumer thread. Mostly to abort loop",
+						e);
+			} catch (Throwable t) {
+				logger.error("Caught error during processing request.", t);
+			}
+
+			if (localBatchBuffer.size() > 0 && isToSpool) {
+				// Let spool to the file directly
+				if (isDestActive) {
+					logger.info("Switching to file spool. Queue=" + getName()
+							+ ", dest=" + consumer.getName());
+				}
+				isDestActive = false;
+				fileSpooler.stashLogs(localBatchBuffer);
+				localBatchBuffer.clear();
+				// Reset all variables
+				lastDispatchTime = System.currentTimeMillis();
+			} else if (localBatchBuffer.size() > 0
+					&& (isDrain()
+							|| localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
+				if (fileSpoolerEnabled && !isDestActive) {
+					logger.info("Switching to writing to destination. Queue="
+							+ getName() + ", dest=" + consumer.getName());
+				}
+				boolean ret = consumer.log(localBatchBuffer);
+				if (!ret) {
+					if (fileSpoolerEnabled) {
+						logger.info("Switching to file spool. Queue="
+								+ getName() + ", dest=" + consumer.getName());
+						// Transient error. Stash and move on
+						fileSpooler.stashLogs(localBatchBuffer);
+						isDestActive = false;
+					} else {
+						// We need to drop this event
+						logFailedEvent(localBatchBuffer, null);
+					}
+				} else {
+					isDestActive = true;
+				}
+				localBatchBuffer.clear();
+				// Reset all variables
+				lastDispatchTime = System.currentTimeMillis();
+			}
+
+			if (isDrain()) {
+				if (!queue.isEmpty() || localBatchBuffer.size() > 0) {
+					logger.info("Queue is not empty. Will retry. queue.size)="
+							+ queue.size() + ", localBatchBuffer.size()="
+							+ localBatchBuffer.size());
+				} else {
+					break;
+				}
+			}
+		}
+
+		logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
+				+ consumer.getName());
+		try {
+			// Call stop on the consumer
+			consumer.stop();
+			if (fileSpoolerEnabled) {
+				fileSpooler.stop();
+			}
+		} catch (Throwable t) {
+			logger.error("Error while calling stop on consumer.", t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
new file mode 100644
index 0000000..11c32ca
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class needs to be extended by anyone who wants to build custom
+ * destination
+ */
+public abstract class AuditDestination extends BaseAuditProvider {
+	private static final Log logger = LogFactory.getLog(AuditDestination.class);
+
+	public AuditDestination() {
+		logger.info("AuditDestination() enter");
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties,
+	 * java.lang.String)
+	 */
+	@Override
+	public void init(Properties prop, String basePropertyName) {
+		super.init(prop, basePropertyName);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+	 */
+	@Override
+	public boolean isFlushPending() {
+		return false;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+	 */
+	@Override
+	public void flush() {
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
new file mode 100644
index 0000000..8b006de
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * This class temporarily stores logs in file system if the destination is
+ * overloaded or down
+ */
+public class AuditFileSpool implements Runnable {
+	private static final Log logger = LogFactory.getLog(AuditFileSpool.class);
+
+	public enum SPOOL_FILE_STATUS {
+		pending, write_inprogress, read_inprogress, done
+	}
+
+	public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir";
+	public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format";
+	public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir";
+	public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files";
+	public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix";
+	public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec";
+	public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename";
+	// public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE =
+	// "filespool.index.done_filename";
+	public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms";
+
+	AuditProvider queueProvider = null;
+	AuditProvider consumerProvider = null;
+
+	BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>();
+
+	// Folder and File attributes
+	File logFolder = null;
+	String logFileNameFormat = null;
+	File archiveFolder = null;
+	String fileNamePrefix = null;
+	String indexFileName = null;
+	File indexFile = null;
+	String indexDoneFileName = null;
+	File indexDoneFile = null;
+	int retryDestinationMS = 30 * 1000; // Default 30 seconds
+	int fileRolloverSec = 24 * 60 * 60; // In seconds
+	int maxArchiveFiles = 100;
+
+	int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+	long lastErrorLogMS = 0;
+
+	List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>();
+
+	boolean isPending = false;
+	long lastAttemptTime = 0;
+	boolean initDone = false;
+
+	PrintWriter logWriter = null;
+	AuditIndexRecord currentWriterIndexRecord = null;
+	AuditIndexRecord currentConsumerIndexRecord = null;
+
+	BufferedReader logReader = null;
+
+	Thread destinationThread = null;
+
+	boolean isWriting = true;
+	boolean isDrain = false;
+	boolean isDestDown = true;
+
+	private static Gson gson = null;
+
+	public AuditFileSpool(AuditProvider queueProvider,
+			AuditProvider consumerProvider) {
+		this.queueProvider = queueProvider;
+		this.consumerProvider = consumerProvider;
+	}
+
+	public void init(Properties prop) {
+		init(prop, null);
+	}
+
+	public void init(Properties props, String basePropertyName) {
+		if (initDone) {
+			logger.error("init() called more than once. queueProvider="
+					+ queueProvider.getName() + ", consumerProvider="
+					+ consumerProvider.getName());
+			return;
+		}
+		String propPrefix = "xasecure.audit.filespool";
+		if (basePropertyName != null) {
+			propPrefix = basePropertyName;
+		}
+
+		try {
+			gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+					.create();
+
+			// Initial folder and file properties
+			String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+					+ "." + PROP_FILE_SPOOL_LOCAL_DIR);
+			logFileNameFormat = MiscUtil.getStringProperty(props,
+					basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME);
+			String archiveFolderProp = MiscUtil.getStringProperty(props,
+					propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR);
+			fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_FILENAME_PREFIX);
+			indexFileName = MiscUtil.getStringProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_INDEX_FILE);
+			retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix
+					+ "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS);
+			fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec);
+			maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "."
+					+ PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles);
+
+			logger.info("retryDestinationMS=" + retryDestinationMS
+					+ ", queueName=" + queueProvider.getName());
+			logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName="
+					+ queueProvider.getName());
+			logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName="
+					+ queueProvider.getName());
+
+			if (logFolderProp == null || logFolderProp.isEmpty()) {
+				logger.error("Audit spool folder is not configured. Please set "
+						+ propPrefix
+						+ "."
+						+ PROP_FILE_SPOOL_LOCAL_DIR
+						+ ". queueName=" + queueProvider.getName());
+				return;
+			}
+			logFolder = new File(logFolderProp);
+			if (!logFolder.isDirectory()) {
+				logFolder.mkdirs();
+				if (!logFolder.isDirectory()) {
+					logger.error("File Spool folder not found and can't be created. folder="
+							+ logFolder.getAbsolutePath()
+							+ ", queueName="
+							+ queueProvider.getName());
+					return;
+				}
+			}
+			logger.info("logFolder=" + logFolder + ", queueName="
+					+ queueProvider.getName());
+
+			if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+				logFileNameFormat = "spool_" + "%app-type%" + "_"
+						+ "%time:yyyyMMdd-HHmm.ss%.log";
+			}
+			logger.info("logFileNameFormat=" + logFileNameFormat
+					+ ", queueName=" + queueProvider.getName());
+
+			if (archiveFolderProp == null || archiveFolderProp.isEmpty()) {
+				archiveFolder = new File(logFolder, "archive");
+			} else {
+				archiveFolder = new File(archiveFolderProp);
+			}
+			if (!archiveFolder.isDirectory()) {
+				archiveFolder.mkdirs();
+				if (!archiveFolder.isDirectory()) {
+					logger.error("File Spool archive folder not found and can't be created. folder="
+							+ archiveFolder.getAbsolutePath()
+							+ ", queueName="
+							+ queueProvider.getName());
+					return;
+				}
+			}
+			logger.info("archiveFolder=" + archiveFolder + ", queueName="
+					+ queueProvider.getName());
+
+			if (indexFileName == null || indexFileName.isEmpty()) {
+				indexFileName = "index_" + fileNamePrefix + ".json";
+			}
+
+			indexFile = new File(logFolder, indexFileName);
+			if (!indexFile.exists()) {
+				indexFile.createNewFile();
+			}
+			logger.info("indexFile=" + indexFile + ", queueName="
+					+ queueProvider.getName());
+
+			int lastDot = indexFileName.lastIndexOf('.');
+			indexDoneFileName = indexFileName.substring(0, lastDot)
+					+ "_closed.json";
+			indexDoneFile = new File(logFolder, indexDoneFileName);
+			if (!indexDoneFile.exists()) {
+				indexDoneFile.createNewFile();
+			}
+			logger.info("indexDoneFile=" + indexDoneFile + ", queueName="
+					+ queueProvider.getName());
+
+			// Load index file
+			loadIndexFile();
+			for (AuditIndexRecord auditIndexRecord : indexRecords) {
+				if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) {
+					isPending = true;
+				}
+				if (auditIndexRecord.status
+						.equals(SPOOL_FILE_STATUS.write_inprogress)) {
+					currentWriterIndexRecord = auditIndexRecord;
+					logger.info("currentWriterIndexRecord="
+							+ currentWriterIndexRecord.filePath
+							+ ", queueName=" + queueProvider.getName());
+				}
+				if (auditIndexRecord.status
+						.equals(SPOOL_FILE_STATUS.read_inprogress)) {
+					indexQueue.add(auditIndexRecord);
+				}
+			}
+			printIndex();
+			// One more loop to add the rest of the pending records in reverse
+			// order
+			for (int i = 0; i < indexRecords.size(); i++) {
+				AuditIndexRecord auditIndexRecord = indexRecords.get(i);
+				if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
+					File consumerFile = new File(auditIndexRecord.filePath);
+					if (!consumerFile.exists()) {
+						logger.error("INIT: Consumer file="
+								+ consumerFile.getPath() + " not found.");
+						System.exit(1);
+					}
+					indexQueue.add(auditIndexRecord);
+				}
+			}
+
+		} catch (Throwable t) {
+			logger.fatal("Error initializing File Spooler. queue="
+					+ queueProvider.getName(), t);
+			return;
+		}
+		initDone = true;
+	}
+
+	/**
+	 * Start looking for outstanding logs and update status according.
+	 */
+	public void start() {
+		if (!initDone) {
+			logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName="
+					+ queueProvider.getName());
+			return;
+		}
+
+		logger.info("Starting writerThread, queueName="
+				+ queueProvider.getName() + ", consumer="
+				+ consumerProvider.getName());
+
+		// Let's start the thread to read
+		destinationThread = new Thread(this, queueProvider.getName()
+				+ "_destWriter");
+		destinationThread.setDaemon(true);
+		destinationThread.start();
+	}
+
+	public void stop() {
+		if (!initDone) {
+			logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName="
+					+ queueProvider.getName());
+			return;
+		}
+		logger.info("Stop called, queueName=" + queueProvider.getName()
+				+ ", consumer=" + consumerProvider.getName());
+
+		isDrain = true;
+		flush();
+
+		PrintWriter out = getOpenLogFileStream();
+		if (out != null) {
+			// If write is still going on, then let's give it enough time to
+			// complete
+			for (int i = 0; i < 3; i++) {
+				if (isWriting) {
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+						// ignore
+					}
+					continue;
+				}
+				try {
+					logger.info("Closing open file, queueName="
+							+ queueProvider.getName() + ", consumer="
+							+ consumerProvider.getName());
+
+					out.flush();
+					out.close();
+				} catch (Throwable t) {
+					logger.debug("Error closing spool out file.", t);
+				}
+			}
+		}
+		try {
+			destinationThread.interrupt();
+		} catch (Throwable e) {
+			// ignore
+		}
+	}
+
+	public void flush() {
+		if (!initDone) {
+			logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName="
+					+ queueProvider.getName());
+			return;
+		}
+		PrintWriter out = getOpenLogFileStream();
+		if (out != null) {
+			out.flush();
+		}
+	}
+
+	/**
+	 * If any files are still not processed. Also, if the destination is not
+	 * reachable
+	 * 
+	 * @return
+	 */
+	public boolean isPending() {
+		if (!initDone) {
+			logError("isPending(): File Spooler not initialized. queueName="
+					+ queueProvider.getName());
+			return false;
+		}
+
+		return isPending;
+	}
+
+	/**
+	 * Milliseconds from last attempt time
+	 * 
+	 * @return
+	 */
+	public long getLastAttemptTimeDelta() {
+		if (lastAttemptTime == 0) {
+			return 0;
+		}
+		return System.currentTimeMillis() - lastAttemptTime;
+	}
+
+	synchronized public void stashLogs(AuditEventBase event) {
+		if (isDrain) {
+			// Stop has been called, so this method shouldn't be called
+			logger.error("stashLogs() is called after stop is called. event="
+					+ event);
+			return;
+		}
+		try {
+			isWriting = true;
+			PrintWriter logOut = getLogFileStream();
+			// Convert event to json
+			String jsonStr = MiscUtil.stringify(event);
+			logOut.println(jsonStr);
+			isPending = true;
+		} catch (Exception ex) {
+			logger.error("Error writing to file. event=" + event, ex);
+		} finally {
+			isWriting = false;
+		}
+
+	}
+
+	synchronized public void stashLogs(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			stashLogs(event);
+		}
+		flush();
+	}
+
+	synchronized public void stashLogsString(String event) {
+		if (isDrain) {
+			// Stop has been called, so this method shouldn't be called
+			logger.error("stashLogs() is called after stop is called. event="
+					+ event);
+			return;
+		}
+		try {
+			isWriting = true;
+			PrintWriter logOut = getLogFileStream();
+			logOut.println(event);
+		} catch (Exception ex) {
+			logger.error("Error writing to file. event=" + event, ex);
+		} finally {
+			isWriting = false;
+		}
+
+	}
+
+	synchronized public void stashLogsString(Collection<String> events) {
+		for (String event : events) {
+			stashLogsString(event);
+		}
+		flush();
+	}
+
+	/**
+	 * This return the current file. If there are not current open output file,
+	 * then it will return null
+	 * 
+	 * @return
+	 * @throws Exception
+	 */
+	synchronized private PrintWriter getOpenLogFileStream() {
+		return logWriter;
+	}
+
+	/**
+	 * @return
+	 * @throws Exception
+	 */
+	synchronized private PrintWriter getLogFileStream() throws Exception {
+		closeFileIfNeeded();
+
+		// Either there are no open log file or the previous one has been rolled
+		// over
+		if (currentWriterIndexRecord == null) {
+			Date currentTime = new Date();
+			// Create a new file
+			String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+					currentTime.getTime());
+			String newFileName = fileName;
+			File outLogFile = null;
+			int i = 0;
+			while (true) {
+				outLogFile = new File(logFolder, newFileName);
+				File archiveLogFile = new File(archiveFolder, newFileName);
+				if (!outLogFile.exists() && !archiveLogFile.exists()) {
+					break;
+				}
+				i++;
+				int lastDot = fileName.lastIndexOf('.');
+				String baseName = fileName.substring(0, lastDot);
+				String extension = fileName.substring(lastDot);
+				newFileName = baseName + "." + i + extension;
+			}
+			fileName = newFileName;
+			logger.info("Creating new file. queueName="
+					+ queueProvider.getName() + ", fileName=" + fileName);
+			// Open the file
+			logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+					outLogFile)));
+
+			AuditIndexRecord tmpIndexRecord = new AuditIndexRecord();
+
+			tmpIndexRecord.id = MiscUtil.generateUniqueId();
+			tmpIndexRecord.filePath = outLogFile.getPath();
+			tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress;
+			tmpIndexRecord.fileCreateTime = currentTime;
+			tmpIndexRecord.lastAttempt = true;
+			currentWriterIndexRecord = tmpIndexRecord;
+			indexRecords.add(currentWriterIndexRecord);
+			saveIndexFile();
+
+		} else {
+			if (logWriter == null) {
+				// This means the process just started. We need to open the file
+				// in append mode.
+				logger.info("Opening existing file for append. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+				logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+						currentWriterIndexRecord.filePath, true)));
+			}
+		}
+		return logWriter;
+	}
+
+	synchronized private void closeFileIfNeeded() throws FileNotFoundException,
+			IOException {
+		// Is there file open to write or there are no pending file, then close
+		// the active file
+		if (currentWriterIndexRecord != null) {
+			// Check whether the file needs to rolled
+			boolean closeFile = false;
+			if (indexRecords.size() == 1) {
+				closeFile = true;
+				logger.info("Closing file. Only one open file. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+			} else if (System.currentTimeMillis()
+					- currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) {
+				closeFile = true;
+				logger.info("Closing file. Rolling over. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+			}
+			if (closeFile) {
+				// Roll the file
+				if (logWriter != null) {
+					logWriter.flush();
+					logWriter.close();
+					logWriter = null;
+				}
+				currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending;
+				currentWriterIndexRecord.writeCompleteTime = new Date();
+				saveIndexFile();
+				logger.info("Adding file to queue. queueName="
+						+ queueProvider.getName() + ", fileName="
+						+ currentWriterIndexRecord.filePath);
+				indexQueue.add(currentWriterIndexRecord);
+				currentWriterIndexRecord = null;
+			}
+		}
+	}
+
+	/**
+	 * Load the index file
+	 * 
+	 * @throws IOException
+	 */
+	void loadIndexFile() throws IOException {
+		logger.info("Loading index file. fileName=" + indexFile.getPath());
+		BufferedReader br = new BufferedReader(new FileReader(indexFile));
+		indexRecords.clear();
+		String line;
+		while ((line = br.readLine()) != null) {
+			if (!line.isEmpty() && !line.startsWith("#")) {
+				AuditIndexRecord record = gson.fromJson(line,
+						AuditIndexRecord.class);
+				indexRecords.add(record);
+			}
+		}
+		br.close();
+	}
+
+	synchronized void printIndex() {
+		logger.info("INDEX printIndex() ==== START");
+		Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+		while (iter.hasNext()) {
+			AuditIndexRecord record = iter.next();
+			logger.info("INDEX=" + record + ", isFileExist="
+					+ (new File(record.filePath).exists()));
+		}
+		logger.info("INDEX printIndex() ==== END");
+	}
+
+	synchronized void removeIndexRecord(AuditIndexRecord indexRecord)
+			throws FileNotFoundException, IOException {
+		Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+		while (iter.hasNext()) {
+			AuditIndexRecord record = iter.next();
+			if (record.id.equals(indexRecord.id)) {
+				logger.info("Removing file from index. file=" + record.filePath
+						+ ", queueName=" + queueProvider.getName()
+						+ ", consumer=" + consumerProvider.getName());
+
+				iter.remove();
+				appendToDoneFile(record);
+			}
+		}
+		saveIndexFile();
+	}
+
+	synchronized void saveIndexFile() throws FileNotFoundException, IOException {
+		PrintWriter out = new PrintWriter(indexFile);
+		for (AuditIndexRecord auditIndexRecord : indexRecords) {
+			out.println(gson.toJson(auditIndexRecord));
+		}
+		out.close();
+		// printIndex();
+
+	}
+
+	void appendToDoneFile(AuditIndexRecord indexRecord)
+			throws FileNotFoundException, IOException {
+		logger.info("Moving to done file. " + indexRecord.filePath
+				+ ", queueName=" + queueProvider.getName() + ", consumer="
+				+ consumerProvider.getName());
+		String line = gson.toJson(indexRecord);
+		PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(
+				indexDoneFile, true)));
+		out.println(line);
+		out.flush();
+		out.close();
+
+		// Move to archive folder
+		File logFile = null;
+		File archiveFile = null;
+		try {
+			logFile = new File(indexRecord.filePath);
+			String fileName = logFile.getName();
+			archiveFile = new File(archiveFolder, fileName);
+			logger.info("Moving logFile " + logFile + " to " + archiveFile);
+			logFile.renameTo(archiveFile);
+		} catch (Throwable t) {
+			logger.error("Error moving log file to archive folder. logFile="
+					+ logFile + ", archiveFile=" + archiveFile, t);
+		}
+
+		archiveFile = null;
+		try {
+			// Remove old files
+			File[] logFiles = archiveFolder.listFiles(new FileFilter() {
+				public boolean accept(File pathname) {
+					return pathname.getName().toLowerCase().endsWith(".log");
+				}
+			});
+
+			if (logFiles.length > maxArchiveFiles) {
+				int filesToDelete = logFiles.length - maxArchiveFiles;
+				BufferedReader br = new BufferedReader(new FileReader(
+						indexDoneFile));
+				try {
+					int filesDeletedCount = 0;
+					while ((line = br.readLine()) != null) {
+						if (!line.isEmpty() && !line.startsWith("#")) {
+							AuditIndexRecord record = gson.fromJson(line,
+									AuditIndexRecord.class);
+							logFile = new File(record.filePath);
+							String fileName = logFile.getName();
+							archiveFile = new File(archiveFolder, fileName);
+							if (archiveFile.exists()) {
+								logger.info("Deleting archive file "
+										+ archiveFile);
+								boolean ret = archiveFile.delete();
+								if (!ret) {
+									logger.error("Error deleting archive file. archiveFile="
+											+ archiveFile);
+								}
+								filesDeletedCount++;
+								if (filesDeletedCount >= filesToDelete) {
+									logger.info("Deleted " + filesDeletedCount
+											+ " files");
+									break;
+								}
+							}
+						}
+					}
+				} finally {
+					br.close();
+				}
+			}
+		} catch (Throwable t) {
+			logger.error("Error deleting older archive file. archiveFile="
+					+ archiveFile, t);
+		}
+
+	}
+
+	void logError(String msg) {
+		long currTimeMS = System.currentTimeMillis();
+		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+			logger.error(msg);
+			lastErrorLogMS = currTimeMS;
+		}
+	}
+
+	class AuditIndexRecord {
+		String id;
+		String filePath;
+		int linePosition = 0;
+		SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress;
+		Date fileCreateTime;
+		Date writeCompleteTime;
+		Date doneCompleteTime;
+		Date lastSuccessTime;
+		Date lastFailedTime;
+		int failedAttemptCount = 0;
+		boolean lastAttempt = false;
+
+		@Override
+		public String toString() {
+			return "AuditIndexRecord [id=" + id + ", filePath=" + filePath
+					+ ", linePosition=" + linePosition + ", status=" + status
+					+ ", fileCreateTime=" + fileCreateTime
+					+ ", writeCompleteTime=" + writeCompleteTime
+					+ ", doneCompleteTime=" + doneCompleteTime
+					+ ", lastSuccessTime=" + lastSuccessTime
+					+ ", lastFailedTime=" + lastFailedTime
+					+ ", failedAttemptCount=" + failedAttemptCount
+					+ ", lastAttempt=" + lastAttempt + "]";
+		}
+
+	}
+
+	class AuditFileSpoolAttempt {
+		Date attemptTime;
+		String status;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see java.lang.Runnable#run()
+	 */
+	@Override
+	public void run() {
+		while (true) {
+			try {
+				// Let's pause between each iteration
+				if (currentConsumerIndexRecord == null) {
+					currentConsumerIndexRecord = indexQueue.poll(
+							retryDestinationMS, TimeUnit.MILLISECONDS);
+				} else {
+					Thread.sleep(retryDestinationMS);
+				}
+
+				if (isDrain) {
+					// Need to exit
+					break;
+				}
+				if (currentConsumerIndexRecord == null) {
+					closeFileIfNeeded();
+					continue;
+				}
+
+				boolean isRemoveIndex = false;
+				File consumerFile = new File(
+						currentConsumerIndexRecord.filePath);
+				if (!consumerFile.exists()) {
+					logger.error("Consumer file=" + consumerFile.getPath()
+							+ " not found.");
+					printIndex();
+					isRemoveIndex = true;
+				} else {
+					// Let's open the file to write
+					BufferedReader br = new BufferedReader(new FileReader(
+							currentConsumerIndexRecord.filePath));
+					try {
+						int startLine = currentConsumerIndexRecord.linePosition;
+						String line;
+						int currLine = 0;
+						boolean isResumed = false;
+						List<String> lines = new ArrayList<String>();
+						while ((line = br.readLine()) != null) {
+							currLine++;
+							if (currLine < startLine) {
+								continue;
+							}
+							lines.add(line);
+							if (lines.size() == queueProvider.getMaxBatchSize()) {
+								boolean ret = sendEvent(lines,
+										currentConsumerIndexRecord, currLine);
+								if (!ret) {
+									throw new Exception("Destination down");
+								} else {
+									if (!isResumed) {
+										logger.info("Started writing to destination. file="
+												+ currentConsumerIndexRecord.filePath
+												+ ", queueName="
+												+ queueProvider.getName()
+												+ ", consumer="
+												+ consumerProvider.getName());
+									}
+								}
+								lines.clear();
+							}
+						}
+						if (lines.size() > 0) {
+							boolean ret = sendEvent(lines,
+									currentConsumerIndexRecord, currLine);
+							if (!ret) {
+								throw new Exception("Destination down");
+							} else {
+								if (!isResumed) {
+									logger.info("Started writing to destination. file="
+											+ currentConsumerIndexRecord.filePath
+											+ ", queueName="
+											+ queueProvider.getName()
+											+ ", consumer="
+											+ consumerProvider.getName());
+								}
+							}
+							lines.clear();
+						}
+						logger.info("Done reading file. file="
+								+ currentConsumerIndexRecord.filePath
+								+ ", queueName=" + queueProvider.getName()
+								+ ", consumer=" + consumerProvider.getName());
+						// The entire file is read
+						currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done;
+						currentConsumerIndexRecord.doneCompleteTime = new Date();
+						currentConsumerIndexRecord.lastAttempt = true;
+
+						isRemoveIndex = true;
+					} catch (Exception ex) {
+						isDestDown = true;
+						logError("Destination down. queueName="
+								+ queueProvider.getName() + ", consumer="
+								+ consumerProvider.getName());
+						lastAttemptTime = System.currentTimeMillis();
+						// Update the index file
+						currentConsumerIndexRecord.lastFailedTime = new Date();
+						currentConsumerIndexRecord.failedAttemptCount++;
+						currentConsumerIndexRecord.lastAttempt = false;
+						saveIndexFile();
+					} finally {
+						br.close();
+					}
+				}
+				if (isRemoveIndex) {
+					// Remove this entry from index
+					removeIndexRecord(currentConsumerIndexRecord);
+					currentConsumerIndexRecord = null;
+					closeFileIfNeeded();
+				}
+			} catch (Throwable t) {
+				logger.error("Exception in destination writing thread.", t);
+			}
+		}
+		logger.info("Exiting file spooler. provider=" + queueProvider.getName()
+				+ ", consumer=" + consumerProvider.getName());
+	}
+
+	private boolean sendEvent(List<String> lines, AuditIndexRecord indexRecord,
+			int currLine) {
+		boolean ret = true;
+		try {
+			ret = consumerProvider.logJSON(lines);
+			if (!ret) {
+				// Need to log error after fixed interval
+				logError("Error sending logs to consumer. provider="
+						+ queueProvider.getName() + ", consumer="
+						+ consumerProvider.getName());
+			} else {
+				// Update index and save
+				indexRecord.linePosition = currLine;
+				indexRecord.status = SPOOL_FILE_STATUS.read_inprogress;
+				indexRecord.lastSuccessTime = new Date();
+				indexRecord.lastAttempt = true;
+				saveIndexFile();
+
+				if (isDestDown) {
+					isDestDown = false;
+					logger.info("Destination up now. " + indexRecord.filePath
+							+ ", queueName=" + queueProvider.getName()
+							+ ", consumer=" + consumerProvider.getName());
+				}
+			}
+		} catch (Throwable t) {
+			logger.error("Error while sending logs to consumer. provider="
+					+ queueProvider.getName() + ", consumer="
+					+ consumerProvider.getName() + ", log=" + lines, t);
+		}
+
+		return ret;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
new file mode 100644
index 0000000..3ef3e30
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+/**
+ * This exception should be thrown only when there is an error in the message
+ * itself. E.g. invalid field type, etc. Don't throw this exception if there is
+ * a transient error
+ */
+public class AuditMessageException extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+	public AuditMessageException() {
+	}
+
+	/**
+	 * @param message
+	 */
+	public AuditMessageException(String message) {
+		super(message);
+	}
+
+	/**
+	 * @param cause
+	 */
+	public AuditMessageException(Throwable cause) {
+		super(cause);
+	}
+
+	/**
+	 * @param message
+	 * @param cause
+	 */
+	public AuditMessageException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	/**
+	 * @param message
+	 * @param cause
+	 * @param enableSuppression
+	 * @param writableStackTrace
+	 */
+	public AuditMessageException(String message, Throwable cause,
+			boolean enableSuppression, boolean writableStackTrace) {
+		super(message, cause, enableSuppression, writableStackTrace);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
index 47c2d7f..0e38624 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
@@ -18,18 +18,38 @@
 
 package org.apache.ranger.audit.provider;
 
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.ranger.audit.model.AuditEventBase;
 
 public interface AuditProvider {
-	public void log(AuditEventBase event);
+	public boolean log(AuditEventBase event);
+	public boolean log(Collection<AuditEventBase> events);	
+
+	public boolean logJSON(String event);
+	public boolean logJSON(Collection<String> events);	
 
     public void init(Properties prop);
+    public void init(Properties prop, String basePropertyName);
     public void start();
     public void stop();
     public void waitToComplete();
+    public void waitToComplete(long timeout);
+
+    /**
+     * Name for this provider. Used only during logging. Uniqueness is not guaranteed
+     */
+    public String getName();
 
+    /**
+     * If this AuditProvider in the state of shutdown
+     * @return
+     */
+    public boolean isDrain();
+    
+    public int getMaxBatchSize();
+    public int getMaxBatchInterval();
 	public boolean isFlushPending();
 	public long    getLastFlushTime();
     public void    flush();

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index bb8fa6d..13b3142 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -28,7 +28,6 @@ import org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider;
 import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
 import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
 
-
 /*
  * TODO:
  * 1) Flag to enable/disable audit logging
@@ -37,22 +36,25 @@ import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
  */
 
 public class AuditProviderFactory {
-	private static final Log LOG = LogFactory.getLog(AuditProviderFactory.class);
-
-	private static final String AUDIT_IS_ENABLED_PROP       = "xasecure.audit.is.enabled" ;
-	private static final String AUDIT_DB_IS_ENABLED_PROP    = "xasecure.audit.db.is.enabled" ;
-	private static final String AUDIT_HDFS_IS_ENABLED_PROP  = "xasecure.audit.hdfs.is.enabled";
-	private static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled" ;
-	private static final String AUDIT_KAFKA_IS_ENABLED_PROP = "xasecure.audit.kafka.is.enabled";
-	private static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled";
-	
-	private static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT     = 10 * 1024;
-	private static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT =  5 * 1000;
-	
+	private static final Log LOG = LogFactory
+			.getLog(AuditProviderFactory.class);
+
+	public static final String AUDIT_IS_ENABLED_PROP = "xasecure.audit.is.enabled";
+	public static final String AUDIT_DB_IS_ENABLED_PROP = "xasecure.audit.db.is.enabled";
+	public static final String AUDIT_HDFS_IS_ENABLED_PROP = "xasecure.audit.hdfs.is.enabled";
+	public static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled";
+	public static final String AUDIT_KAFKA_IS_ENABLED_PROP = "xasecure.audit.kafka.is.enabled";
+	public static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled";
+
+	public static final String AUDIT_DEST_BASE = "xasecure.audit.destination";
+
+	public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024;
+	public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000;
+
 	private static AuditProviderFactory sFactory;
 
 	private AuditProvider mProvider = null;
-	private boolean       mInitDone = false;
+	private boolean mInitDone = false;
 
 	private AuditProviderFactory() {
 		LOG.info("AuditProviderFactory: creating..");
@@ -61,9 +63,9 @@ public class AuditProviderFactory {
 	}
 
 	public static AuditProviderFactory getInstance() {
-		if(sFactory == null) {
-			synchronized(AuditProviderFactory.class) {
-				if(sFactory == null) {
+		if (sFactory == null) {
+			synchronized (AuditProviderFactory.class) {
+				if (sFactory == null) {
 					sFactory = new AuditProviderFactory();
 				}
 			}
@@ -75,7 +77,7 @@ public class AuditProviderFactory {
 	public static AuditProvider getAuditProvider() {
 		return AuditProviderFactory.getInstance().getProvider();
 	}
-	
+
 	public AuditProvider getProvider() {
 		return mProvider;
 	}
@@ -86,133 +88,301 @@ public class AuditProviderFactory {
 
 	public synchronized void init(Properties props, String appType) {
 		LOG.info("AuditProviderFactory: initializing..");
-		
-		if(mInitDone) {
-			LOG.warn("AuditProviderFactory.init(): already initialized!", new Exception());
+
+		if (mInitDone) {
+			LOG.warn("AuditProviderFactory.init(): already initialized!",
+					new Exception());
 
 			return;
 		}
 		mInitDone = true;
-		
-		MiscUtil.setApplicationType(appType);
 
-		boolean isEnabled             = BaseAuditProvider.getBooleanProperty(props, AUDIT_IS_ENABLED_PROP, false);
-		boolean isAuditToDbEnabled    = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false);
-		boolean isAuditToHdfsEnabled  = BaseAuditProvider.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false);
-		boolean isAuditToLog4jEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false);
-		boolean isAuditToKafkaEnabled  = BaseAuditProvider.getBooleanProperty(props, AUDIT_KAFKA_IS_ENABLED_PROP, false);
-		boolean isAuditToSolrEnabled  = BaseAuditProvider.getBooleanProperty(props, AUDIT_SOLR_IS_ENABLED_PROP, false);
+		MiscUtil.setApplicationType(appType);
 
-		if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToKafkaEnabled || isAuditToLog4jEnabled || isAuditToSolrEnabled)) {
-			LOG.info("AuditProviderFactory: Audit not enabled..");
+		boolean isEnabled = MiscUtil.getBooleanProperty(props,
+				AUDIT_IS_ENABLED_PROP, false);
+		boolean isAuditToDbEnabled = MiscUtil.getBooleanProperty(props,
+				AUDIT_DB_IS_ENABLED_PROP, false);
+		boolean isAuditToHdfsEnabled = MiscUtil.getBooleanProperty(props,
+				AUDIT_HDFS_IS_ENABLED_PROP, false);
+		boolean isAuditToLog4jEnabled = MiscUtil.getBooleanProperty(props,
+				AUDIT_LOG4J_IS_ENABLED_PROP, false);
+		boolean isAuditToKafkaEnabled = MiscUtil.getBooleanProperty(props,
+				AUDIT_KAFKA_IS_ENABLED_PROP, false);
+		boolean isAuditToSolrEnabled = MiscUtil.getBooleanProperty(props,
+				AUDIT_SOLR_IS_ENABLED_PROP, false);
 
-			mProvider = getDefaultProvider();
+		List<AuditProvider> providers = new ArrayList<AuditProvider>();
 
-			return;
+		// TODO: Delete me
+		for (Object propNameObj : props.keySet()) {
+			LOG.info("DELETE ME: " + propNameObj.toString() + "="
+					+ props.getProperty(propNameObj.toString()));
 		}
 
-		List<AuditProvider> providers = new ArrayList<AuditProvider>();
+		// Process new audit configurations
+		List<String> destNameList = new ArrayList<String>();
 
-		if(isAuditToDbEnabled) {
-			LOG.info("DbAuditProvider is enabled");
-			DbAuditProvider dbProvider = new DbAuditProvider();
-
-			boolean isAuditToDbAsync = BaseAuditProvider.getBooleanProperty(props, DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false);
+		for (Object propNameObj : props.keySet()) {
+			String propName = propNameObj.toString();
+			if (propName.length() <= AUDIT_DEST_BASE.length() + 1) {
+				continue;
+			}
+			String destName = propName.substring(AUDIT_DEST_BASE.length() + 1);
+			List<String> splits = MiscUtil.toArray(destName, ".");
+			if (splits.size() > 1) {
+				continue;
+			}
+			String value = props.getProperty(propName);
+			if (value.equalsIgnoreCase("enable")
+					|| value.equalsIgnoreCase("true")) {
+				destNameList.add(destName);
+				LOG.info("Audit destination " + propName + " is set to "
+						+ value);
+			}
+		}
 
-			if(isAuditToDbAsync) {
-				int maxQueueSize     = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-				int maxFlushInterval = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+		for (String destName : destNameList) {
+			String destPropPrefix = AUDIT_DEST_BASE + "." + destName;
+			AuditProvider destProvider = getProviderFromConfig(props,
+					destPropPrefix, destName);
 
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("DbAuditProvider", maxQueueSize, maxFlushInterval, dbProvider);
+			if (destProvider != null) {
+				destProvider.init(props, destPropPrefix);
 
-				providers.add(asyncProvider);
-			} else {
-				providers.add(dbProvider);
+				String queueName = MiscUtil.getStringProperty(props,
+						destPropPrefix + "." + BaseAuditProvider.PROP_QUEUE);
+				if( queueName == null || queueName.isEmpty()) {
+					queueName = "batch";
+				}
+				if (queueName != null && !queueName.isEmpty()
+						&& !queueName.equalsIgnoreCase("none")) {
+					String queuePropPrefix = destPropPrefix + "." + queueName;
+					AuditProvider queueProvider = getProviderFromConfig(props,
+							queuePropPrefix, queueName);
+					if (queueProvider != null) {
+						if (queueProvider instanceof BaseAuditProvider) {
+							BaseAuditProvider qProvider = (BaseAuditProvider) queueProvider;
+							qProvider.setConsumer(destProvider);
+							qProvider.init(props, queuePropPrefix);
+							providers.add(queueProvider);
+						} else {
+							LOG.fatal("Provider queue doesn't extend BaseAuditProvider destination "
+									+ destName
+									+ " can't be created. queueName="
+									+ queueName);
+						}
+					} else {
+						LOG.fatal("Queue provider for destination " + destName
+								+ " can't be created. queueName=" + queueName);
+					}
+				} else {
+					LOG.info("Audit destination " + destProvider.getName()
+							+ " added to provider list");
+					providers.add(destProvider);
+				}
 			}
 		}
+		if (providers.size() > 0) {
+			LOG.info("Using v2 audit configuration");
+			AuditAsyncQueue asyncQueue = new AuditAsyncQueue();
+			String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "async";
+			asyncQueue.init(props, propPrefix);
+
+			if (providers.size() == 1) {
+				asyncQueue.setConsumer(providers.get(0));
+			} else {
+				MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
+				multiDestProvider.init(props);
+				multiDestProvider.addAuditProviders(providers);
+				asyncQueue.setConsumer(multiDestProvider);
+			}
 
-		if(isAuditToHdfsEnabled) {
-			LOG.info("HdfsAuditProvider is enabled");
+			mProvider = asyncQueue;
+			mProvider.start();
+		} else {
+			LOG.info("No v2 audit configuration found. Trying v1 audit configurations");
+			if (!isEnabled
+					|| !(isAuditToDbEnabled || isAuditToHdfsEnabled
+							|| isAuditToKafkaEnabled || isAuditToLog4jEnabled
+							|| isAuditToSolrEnabled || providers.size() == 0)) {
+				LOG.info("AuditProviderFactory: Audit not enabled..");
 
-			HdfsAuditProvider hdfsProvider = new HdfsAuditProvider();
+				mProvider = getDefaultProvider();
 
-			boolean isAuditToHdfsAsync = BaseAuditProvider.getBooleanProperty(props, HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false);
+				return;
+			}
 
-			if(isAuditToHdfsAsync) {
-				int maxQueueSize     = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-				int maxFlushInterval = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+			if (isAuditToDbEnabled) {
+				LOG.info("DbAuditProvider is enabled");
+				DbAuditProvider dbProvider = new DbAuditProvider();
 
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("HdfsAuditProvider", maxQueueSize, maxFlushInterval, hdfsProvider);
+				boolean isAuditToDbAsync = MiscUtil.getBooleanProperty(props,
+						DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false);
 
-				providers.add(asyncProvider);
-			} else {
-				providers.add(hdfsProvider);
-			}
-		}
+				if (isAuditToDbAsync) {
+					int maxQueueSize = MiscUtil.getIntProperty(props,
+							DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP,
+							AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+					int maxFlushInterval = MiscUtil.getIntProperty(props,
+							DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP,
+							AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
 
-		if(isAuditToKafkaEnabled) {
-			LOG.info("KafkaAuditProvider is enabled");
-			KafkaAuditProvider kafkaProvider = new KafkaAuditProvider();
-			kafkaProvider.init(props);
-			
-			if( kafkaProvider.isAsync()) {
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MyKafkaAuditProvider", kafkaProvider.getMaxQueueSize(), kafkaProvider.getMaxFlushInterval(), kafkaProvider);
-				providers.add(asyncProvider);
-			} else {
-				providers.add(kafkaProvider);
-			}
-		}
+					AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+							"DbAuditProvider", maxQueueSize, maxFlushInterval,
+							dbProvider);
 
-		if(isAuditToSolrEnabled) {
-			LOG.info("SolrAuditProvider is enabled");
-			SolrAuditProvider solrProvider = new SolrAuditProvider();
-			solrProvider.init(props);
-			
-			if( solrProvider.isAsync()) {
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MySolrAuditProvider", solrProvider.getMaxQueueSize(), solrProvider.getMaxFlushInterval(), solrProvider);
-				providers.add(asyncProvider);
-			} else {
-				providers.add(solrProvider);
+					providers.add(asyncProvider);
+				} else {
+					providers.add(dbProvider);
+				}
 			}
-		}
 
-		if(isAuditToLog4jEnabled) {
-			Log4jAuditProvider log4jProvider = new Log4jAuditProvider();
+			if (isAuditToHdfsEnabled) {
+				LOG.info("HdfsAuditProvider is enabled");
 
-			boolean isAuditToLog4jAsync = BaseAuditProvider.getBooleanProperty(props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP, false);
+				HdfsAuditProvider hdfsProvider = new HdfsAuditProvider();
 
-			if(isAuditToLog4jAsync) {
-				int maxQueueSize     = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
-				int maxFlushInterval = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+				boolean isAuditToHdfsAsync = MiscUtil.getBooleanProperty(props,
+						HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false);
 
-				AsyncAuditProvider asyncProvider = new AsyncAuditProvider("Log4jAuditProvider", maxQueueSize, maxFlushInterval, log4jProvider);
+				if (isAuditToHdfsAsync) {
+					int maxQueueSize = MiscUtil.getIntProperty(props,
+							HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP,
+							AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+					int maxFlushInterval = MiscUtil
+							.getIntProperty(
+									props,
+									HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP,
+									AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+
+					AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+							"HdfsAuditProvider", maxQueueSize,
+							maxFlushInterval, hdfsProvider);
+
+					providers.add(asyncProvider);
+				} else {
+					providers.add(hdfsProvider);
+				}
+			}
 
-				providers.add(asyncProvider);
+			if (isAuditToKafkaEnabled) {
+				LOG.info("KafkaAuditProvider is enabled");
+				KafkaAuditProvider kafkaProvider = new KafkaAuditProvider();
+				kafkaProvider.init(props);
+
+				if (kafkaProvider.isAsync()) {
+					AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+							"MyKafkaAuditProvider",
+							kafkaProvider.getMaxQueueSize(),
+							kafkaProvider.getMaxBatchInterval(), kafkaProvider);
+					providers.add(asyncProvider);
+				} else {
+					providers.add(kafkaProvider);
+				}
+			}
+
+			if (isAuditToSolrEnabled) {
+				LOG.info("SolrAuditProvider is enabled");
+				SolrAuditProvider solrProvider = new SolrAuditProvider();
+				solrProvider.init(props);
+
+				if (solrProvider.isAsync()) {
+					AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+							"MySolrAuditProvider",
+							solrProvider.getMaxQueueSize(),
+							solrProvider.getMaxBatchInterval(), solrProvider);
+					providers.add(asyncProvider);
+				} else {
+					providers.add(solrProvider);
+				}
+			}
+
+			if (isAuditToLog4jEnabled) {
+				Log4jAuditProvider log4jProvider = new Log4jAuditProvider();
+
+				boolean isAuditToLog4jAsync = MiscUtil.getBooleanProperty(
+						props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP,
+						false);
+
+				if (isAuditToLog4jAsync) {
+					int maxQueueSize = MiscUtil.getIntProperty(props,
+							Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP,
+							AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+					int maxFlushInterval = MiscUtil
+							.getIntProperty(
+									props,
+									Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP,
+									AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+
+					AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+							"Log4jAuditProvider", maxQueueSize,
+							maxFlushInterval, log4jProvider);
+
+					providers.add(asyncProvider);
+				} else {
+					providers.add(log4jProvider);
+				}
+			}
+			if (providers.size() == 0) {
+				mProvider = getDefaultProvider();
+			} else if (providers.size() == 1) {
+				mProvider = providers.get(0);
 			} else {
-				providers.add(log4jProvider);
+				MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
+
+				multiDestProvider.addAuditProviders(providers);
+
+				mProvider = multiDestProvider;
 			}
-		}
 
-		if(providers.size() == 0) {
-			mProvider = getDefaultProvider();
-		} else if(providers.size() == 1) {
-			mProvider = providers.get(0);
-		} else {
-			MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
-			
-			multiDestProvider.addAuditProviders(providers);
-			
-			mProvider = multiDestProvider;
+			mProvider.init(props);
+			mProvider.start();
 		}
-		
-		mProvider.init(props);
-		mProvider.start();
 
 		JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider);
 
-	    Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+		Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+	}
+
+	private AuditProvider getProviderFromConfig(Properties props,
+			String propPrefix, String providerName) {
+		AuditProvider provider = null;
+		String className = MiscUtil.getStringProperty(props, propPrefix + "."
+				+ BaseAuditProvider.PROP_CLASS_NAME);
+		if (className != null && !className.isEmpty()) {
+			try {
+				provider = (AuditProvider) Class.forName(className)
+						.newInstance();
+			} catch (Exception e) {
+				LOG.fatal("Can't instantiate audit class for providerName="
+						+ providerName + ", className=" + className);
+			}
+		} else {
+			if (providerName.equals("file")) {
+				provider = new FileAuditDestination();
+			} else if (providerName.equalsIgnoreCase("hdfs")) {
+				provider = new HDFSAuditDestination();
+			} else if (providerName.equals("solr")) {
+				provider = new SolrAuditProvider();
+			} else if (providerName.equals("kafka")) {
+				provider = new KafkaAuditProvider();
+			} else if (providerName.equals("db")) {
+				provider = new DbAuditProvider();
+			} else if (providerName.equals("log4j")) {
+				provider = new Log4jAuditProvider();
+			} else if (providerName.equals("batch")) {
+				provider = new AuditBatchProcessor();
+			} else if (providerName.equals("async")) {
+				provider = new AuditAsyncQueue();
+			} else {
+				LOG.error("Provider name doesn't have any class associated with it. providerName="
+						+ providerName);
+			}
+		}
+		return provider;
 	}
-	
+
 	private AuditProvider getDefaultProvider() {
 		return new DummyAuditProvider();
 	}
@@ -227,6 +397,6 @@ public class AuditProviderFactory {
 		public void run() {
 			mProvider.waitToComplete();
 			mProvider.stop();
-	    }
+		}
 	}
 }


[4/4] incubator-ranger git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ranger.git

Posted by bo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ranger.git

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/44f403ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/44f403ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/44f403ea

Branch: refs/heads/master
Commit: 44f403ead9f920495af247e97450eb9e6a3c29b8
Parents: eb1e9b5 4a20a9d
Author: Don Bosco Durai <bo...@apache.org>
Authored: Sun Apr 19 15:10:57 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Sun Apr 19 15:10:57 2015 -0700

----------------------------------------------------------------------
 .../plugin/policyengine/RangerPolicyEngine.java |   8 -
 .../policyengine/RangerPolicyEngineImpl.java    |  21 --
 .../RangerDefaultPolicyEvaluator.java           |   2 +-
 .../RangerAbstractResourceMatcher.java          |  20 +-
 .../RangerDefaultResourceMatcher.java           |  13 +-
 .../RangerPathResourceMatcher.java              |  67 ++--
 .../resourcematcher/RangerResourceMatcher.java  |   9 +-
 .../ranger/plugin/service/RangerBasePlugin.java |  38 +-
 .../resourcematcher/TestResourceMatcher.java    | 136 +++++++
 .../test_resourcematcher_default.json           | 327 +++++++++++++++++
 .../test_resourcematcher_path.json              | 290 +++++++++++++++
 .../hbase/AuthorizationSession.java             |  16 +-
 .../authorization/hbase/HbaseAuditHandler.java  |   5 +
 .../hbase/HbaseAuditHandlerImpl.java            |  21 +-
 .../authorization/hbase/HbaseAuthUtilsImpl.java |   1 -
 .../authorization/hbase/HbaseFactory.java       |   6 +
 .../authorization/hbase/HbaseUserUtils.java     |   7 +
 .../authorization/hbase/HbaseUserUtilsImpl.java |  50 ++-
 .../hbase/RangerAuthorizationCoprocessor.java   |  44 +--
 .../hadoop/RangerHdfsAuthorizer.java            |  52 ++-
 kms/scripts/db_setup.py                         |  21 +-
 kms/scripts/dba_script.py                       |  58 ++-
 kms/scripts/install.properties                  | 109 +++++-
 kms/scripts/kms-initd                           |  78 ----
 kms/scripts/ranger-kms                          |  48 ++-
 kms/scripts/ranger-kms-services.sh              |   4 +-
 kms/scripts/setup.sh                            |  61 +++-
 .../apache/hadoop/crypto/key/RangerKMSDB.java   |   2 +
 .../crypto/key/RangerKeyStoreProvider.java      |  12 +-
 plugin-kms/scripts/enable-kms-plugin.sh         |  39 +-
 plugin-kms/scripts/install.properties           | 112 ------
 plugin-kms/scripts/install.sh                   | 364 -------------------
 .../scripts/kms-plugin-install.properties       |  23 --
 plugin-kms/scripts/uninstall.sh                 |  70 ----
 pom.xml                                         |   2 +-
 security-admin/scripts/db_setup.py              |  17 +-
 security-admin/scripts/dba_script.py            |  19 +-
 .../java/org/apache/ranger/biz/AssetMgr.java    |   2 +-
 .../org/apache/ranger/biz/ServiceDBStore.java   |  31 ++
 .../java/org/apache/ranger/biz/ServiceMgr.java  |  19 +-
 .../org/apache/ranger/common/ServiceUtil.java   | 161 +++++---
 .../org/apache/ranger/rest/ServiceREST.java     |  23 +-
 .../ranger/service/RangerPolicyServiceBase.java |   3 +-
 .../ranger/service/RangerServiceService.java    |  29 +-
 .../service/RangerServiceServiceBase.java       |   3 +-
 .../ranger/solr/SolrAccessAuditsService.java    |  11 +-
 .../main/webapp/scripts/prelogin/XAPrelogin.js  |   1 +
 .../views/policies/RangerPolicyTableLayout.js   |   6 +-
 .../rest/TestServiceRESTForValidation.java      | 120 +++---
 .../src/test/resources/log4j.properties         |  36 ++
 src/main/assembly/kms.xml                       |  25 +-
 51 files changed, 1607 insertions(+), 1035 deletions(-)
----------------------------------------------------------------------



[2/4] incubator-ranger git commit: RANGER-397 - Implement reliable streaming audits to configurable destinations

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
index a068b8f..576176c 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
@@ -17,71 +17,296 @@
  * under the License.
  */
 package org.apache.ranger.audit.provider;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+
+import com.google.gson.GsonBuilder;
+
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Properties;
 
 public abstract class BaseAuditProvider implements AuditProvider {
 	private static final Log LOG = LogFactory.getLog(BaseAuditProvider.class);
 
 	private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
-	public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT     = 10 * 1024;
-	public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT =  5 * 1000;
+	public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
+	public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
+	public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+
+	private AtomicLong lifeTimeInLogCount = new AtomicLong(0);
 
-	private int   mLogFailureReportMinIntervalInMs = 60 * 1000;
+	private int mLogFailureReportMinIntervalInMs = 60 * 1000;
 
-	private AtomicLong mFailedLogLastReportTime       = new AtomicLong(0);
+	private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
 	private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
-	private AtomicLong mFailedLogCountLifeTime        = new AtomicLong(0);
-	private int maxQueueSize     =  AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT;
-	private int maxFlushInterval = AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT;
+	private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
+
+	public static final String PROP_NAME = "name";
+	public static final String PROP_CLASS_NAME = "classname";
+	public static final String PROP_QUEUE = "queue";
+
+	public static final String PROP_BATCH_SIZE = "batch.size";
+	public static final String PROP_QUEUE_SIZE = "queue.size";
+	public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
+
+	public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
+	public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms";
+	public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent";
+
+	public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider";
+
+	private boolean isDrain = false;
+	private String providerName = null;
+
+	private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
+	private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
+	private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
+
+	protected int failedRetryTimes = 3;
+	protected int failedRetrySleep = 3 * 1000;
+
+	protected AuditProvider consumer = null;
+	protected AuditFileSpool fileSpooler = null;
+
+	protected boolean fileSpoolerEnabled = false;
+	protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
+	protected int fileSpoolDrainThresholdPercent = 80;
+
+	int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+	long lastErrorLogMS = 0;
 
 	protected Properties props = null;
 
 	public BaseAuditProvider() {
 	}
-	
+
+	public BaseAuditProvider(AuditProvider consumer) {
+		this.consumer = consumer;
+	}
+
 	@Override
 	public void init(Properties props) {
+		init(props, null);
+	}
+
+	@Override
+	public void init(Properties props, String basePropertyName) {
 		LOG.info("BaseAuditProvider.init()");
 		this.props = props;
-		
-		mLogFailureReportMinIntervalInMs = getIntProperty(props, AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
+		String propPrefix = PROP_DEFAULT_PREFIX;
+		if (basePropertyName != null) {
+			propPrefix = basePropertyName;
+		}
+		LOG.info("propPrefix=" + propPrefix);
+		// Get final token
+		List<String> tokens = MiscUtil.toArray(propPrefix, ".");
+		String finalToken = tokens.get(tokens.size() - 1);
+
+		String name = MiscUtil.getStringProperty(props, basePropertyName + "."
+				+ PROP_NAME);
+		if (name != null && !name.isEmpty()) {
+			providerName = name;
+		}
+		if (providerName == null) {
+			providerName = finalToken;
+			LOG.info("Using providerName from property prefix. providerName="
+					+ providerName);
+		}
+		LOG.info("providerName=" + providerName);
+
+		setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_BATCH_SIZE, getMaxBatchSize()));
+		setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_QUEUE_SIZE, getMaxQueueSize()));
+		setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_BATCH_INTERVAL, getMaxBatchInterval()));
+
+		fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix
+				+ "." + PROP_FILE_SPOOL_ENABLE, false);
+		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+				+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
+		if (fileSpoolerEnabled || logFolderProp != null) {
+			LOG.info("File spool is enabled for " + getName()
+					+ ", logFolderProp=" + logFolderProp + ", " + propPrefix
+					+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
+					+ fileSpoolerEnabled);
+			fileSpoolerEnabled = true;
+			fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix
+					+ "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
+					fileSpoolMaxWaitTime);
+			fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props,
+					propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD,
+					fileSpoolDrainThresholdPercent);
+			fileSpooler = new AuditFileSpool(this, consumer);
+			fileSpooler.init(props, basePropertyName);
+		} else {
+			LOG.info("File spool is disabled for " + getName());
+		}
+
+		try {
+			new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+		} catch (Throwable excp) {
+			LOG.warn(
+					"Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json",
+					excp);
+		}
 
+		mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props,
+				AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
+
+	}
+
+	public AuditProvider getConsumer() {
+		return consumer;
+	}
+
+	public void setConsumer(AuditProvider consumer) {
+		this.consumer = consumer;
 	}
 
 	public void logFailedEvent(AuditEventBase event) {
 		logFailedEvent(event, null);
- 	}
+	}
 
 	public void logFailedEvent(AuditEventBase event, Throwable excp) {
 		long now = System.currentTimeMillis();
-		
-		long timeSinceLastReport  = now - mFailedLogLastReportTime.get();
-		long countSinceLastReport = mFailedLogCountSinceLastReport.incrementAndGet();
-		long countLifeTime        = mFailedLogCountLifeTime.incrementAndGet();
 
-		if(timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+		long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+		long countSinceLastReport = mFailedLogCountSinceLastReport
+				.incrementAndGet();
+		long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+		if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
 			mFailedLogLastReportTime.set(now);
 			mFailedLogCountSinceLastReport.set(0);
 
-			if(excp != null) {
-				LOG.warn("failed to log audit event: " + MiscUtil.stringify(event), excp);
+			if (excp != null) {
+				LOG.warn(
+						"failed to log audit event: "
+								+ MiscUtil.stringify(event), excp);
 			} else {
-				LOG.warn("failed to log audit event: " + MiscUtil.stringify(event));
+				LOG.warn("failed to log audit event: "
+						+ MiscUtil.stringify(event));
+			}
+
+			if (countLifeTime > 1) { // no stats to print for the 1st failure
+				LOG.warn("Log failure count: " + countSinceLastReport
+						+ " in past "
+						+ formatIntervalForLog(timeSinceLastReport) + "; "
+						+ countLifeTime + " during process lifetime");
+			}
+		}
+	}
+
+	public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) {
+		for (AuditEventBase event : events) {
+			logFailedEvent(event, excp);
+		}
+	}
+
+	public void logFailedEventJSON(String event, Throwable excp) {
+		long now = System.currentTimeMillis();
+
+		long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+		long countSinceLastReport = mFailedLogCountSinceLastReport
+				.incrementAndGet();
+		long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+		if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+			mFailedLogLastReportTime.set(now);
+			mFailedLogCountSinceLastReport.set(0);
+
+			if (excp != null) {
+				LOG.warn("failed to log audit event: " + event, excp);
+			} else {
+				LOG.warn("failed to log audit event: " + event);
+			}
+
+			if (countLifeTime > 1) { // no stats to print for the 1st failure
+				LOG.warn("Log failure count: " + countSinceLastReport
+						+ " in past "
+						+ formatIntervalForLog(timeSinceLastReport) + "; "
+						+ countLifeTime + " during process lifetime");
 			}
+		}
+	}
 
-			if(countLifeTime > 1) { // no stats to print for the 1st failure
-				LOG.warn("Log failure count: " + countSinceLastReport + " in past " + formatIntervalForLog(timeSinceLastReport) + "; " + countLifeTime + " during process lifetime");
+	public void logFailedEventJSON(Collection<String> events, Throwable excp) {
+		for (String event : events) {
+			logFailedEventJSON(event, excp);
+		}
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+	 * audit.model.AuditEventBase)
+	 */
+	@Override
+	public boolean log(AuditEventBase event) {
+		List<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+		eventList.add(event);
+		return log(eventList);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
+	 */
+	@Override
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
+	 * )
+	 */
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		boolean ret = true;
+		for (String event : events) {
+			ret = logJSON(event);
+			if (!ret) {
+				break;
 			}
 		}
- 	}
+		return ret;
+	}
+
+	public void setName(String name) {
+		providerName = name;
+	}
+
+	@Override
+	public String getName() {
+		return providerName;
+	}
+
+	@Override
+	public boolean isDrain() {
+		return isDrain;
+	}
+
+	public void setDrain(boolean isDrain) {
+		this.isDrain = isDrain;
+	}
 
-	
 	public int getMaxQueueSize() {
 		return maxQueueSize;
 	}
@@ -90,84 +315,95 @@ public abstract class BaseAuditProvider implements AuditProvider {
 		this.maxQueueSize = maxQueueSize;
 	}
 
-	public int getMaxFlushInterval() {
-		return maxFlushInterval;
+	@Override
+	public int getMaxBatchInterval() {
+		return maxBatchInterval;
 	}
 
-	public void setMaxFlushInterval(int maxFlushInterval) {
-		this.maxFlushInterval = maxFlushInterval;
+	public void setMaxBatchInterval(int maxBatchInterval) {
+		this.maxBatchInterval = maxBatchInterval;
 	}
 
-	public static Map<String, String> getPropertiesWithPrefix(Properties props, String prefix) {
-		Map<String, String> prefixedProperties = new HashMap<String, String>();
-
-		if(props != null && prefix != null) {
-			for(String key : props.stringPropertyNames()) {
-				if(key == null) {
-					continue;
-				}
-
-				String val = props.getProperty(key);
+	@Override
+	public int getMaxBatchSize() {
+		return maxBatchSize;
+	}
 
-				if(key.startsWith(prefix)) {
-					key = key.substring(prefix.length());
+	public void setMaxBatchSize(int maxBatchSize) {
+		this.maxBatchSize = maxBatchSize;
+	}
 
-					if(key == null) {
-						continue;
-					}
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+	 */
+	@Override
+	public void waitToComplete() {
+		if (consumer != null) {
+			consumer.waitToComplete(-1);
+		}
+	}
 
-					prefixedProperties.put(key, val);
-				}
-			}
+	@Override
+	public void waitToComplete(long timeout) {
+		if (consumer != null) {
+			consumer.waitToComplete(timeout);
 		}
+	}
 
-		return prefixedProperties;
+	@Override
+	public boolean isFlushPending() {
+		return false;
 	}
-	
-	public static boolean getBooleanProperty(Properties props, String propName, boolean defValue) {
-		boolean ret = defValue;
 
-		if(props != null && propName != null) {
-			String val = props.getProperty(propName);
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
+	 */
+	@Override
+	public long getLastFlushTime() {
+		if (consumer != null) {
+			return consumer.getLastFlushTime();
+		}
+		return 0;
+	}
 
-			if(val != null) {
-				ret = Boolean.valueOf(val);
-			}
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+	 */
+	@Override
+	public void flush() {
+		if (consumer != null) {
+			consumer.flush();
 		}
+	}
 
-		return ret;
+	public AtomicLong getLifeTimeInLogCount() {
+		return lifeTimeInLogCount;
 	}
-	
-	public static int getIntProperty(Properties props, String propName, int defValue) {
-		int ret = defValue;
 
-		if(props != null && propName != null) {
-			String val = props.getProperty(propName);
+	public long addLifeTimeInLogCount(long count) {
+		return lifeTimeInLogCount.addAndGet(count);
+	}
 
-			if(val != null) {
-				try {
-					ret = Integer.parseInt(val);
-				} catch(NumberFormatException excp) {
-					ret = defValue;
-				}
-			}
+	public void logError(String msg) {
+		long currTimeMS = System.currentTimeMillis();
+		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+			LOG.error(msg);
+			lastErrorLogMS = currTimeMS;
 		}
-
-		return ret;
 	}
-	
-	
-	public static String getStringProperty(Properties props, String propName) {
-		String ret = null;
 
-		if(props != null && propName != null) {
-			String val = props.getProperty(propName);
-			if ( val != null){
-				ret = val;
-			}
+	public void logError(String msg, Throwable ex) {
+		long currTimeMS = System.currentTimeMillis();
+		if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+			LOG.error(msg, ex);
+			lastErrorLogMS = currTimeMS;
 		}
-
-		return ret;
 	}
 
 	public String getTimeDiffStr(long time1, long time2) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
index be32519..ec5e9a8 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ranger.audit.provider;
 
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.ranger.audit.model.AuditEventBase;
@@ -32,7 +33,7 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 	}
 
 	@Override
-	public void log(AuditEventBase event) {
+	public boolean log(AuditEventBase event) {
 		if(event instanceof AuthzAuditEvent) {
 			AuthzAuditEvent authzEvent = (AuthzAuditEvent)event;
 
@@ -52,6 +53,30 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 		if(! mBuffer.add(event)) {
 			logFailedEvent(event);
 		}
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
+	}
+
+	@Override
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
+	}
+
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		for (String event : events) {
+			logJSON(event);
+		}
+		return false;
 	}
 
 	@Override
@@ -68,6 +93,11 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
 	public void waitToComplete() {
 	}
 
+	
+	@Override
+	public void waitToComplete(long timeout) {
+	}
+
 	@Override
 	public boolean isFlushPending() {
 		return false;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index 7414a7a..f4976fb 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -19,6 +19,7 @@
 package org.apache.ranger.audit.provider;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 
@@ -31,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.dao.DaoManager;
 import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
 import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
 
 
@@ -38,7 +40,7 @@ import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
  * NOTE:
  * - Instances of this class are not thread-safe.
  */
-public class DbAuditProvider extends BaseAuditProvider {
+public class DbAuditProvider extends AuditDestination {
 
 	private static final Log LOG = LogFactory.getLog(DbAuditProvider.class);
 
@@ -73,17 +75,17 @@ public class DbAuditProvider extends BaseAuditProvider {
 
 		super.init(props);
 
-		mDbProperties         = BaseAuditProvider.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
-		mCommitBatchSize      = BaseAuditProvider.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
-		mDbRetryMinIntervalMs = BaseAuditProvider.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15 * 1000);
+		mDbProperties         = MiscUtil.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
+		mCommitBatchSize      = MiscUtil.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
+		mDbRetryMinIntervalMs = MiscUtil.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15 * 1000);
 
-		boolean isAsync = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
+		boolean isAsync = MiscUtil.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
 
 		if(! isAsync) {
 			mCommitBatchSize = 1; // Batching not supported in sync mode
 		}
 
-		String jdbcPassword = getCredentialString(BaseAuditProvider.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
+		String jdbcPassword = getCredentialString(MiscUtil.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
 
 		if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
 			mDbProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
@@ -91,7 +93,7 @@ public class DbAuditProvider extends BaseAuditProvider {
 	}
 
 	@Override
-	public void log(AuditEventBase event) {
+	public boolean log(AuditEventBase event) {
 		LOG.debug("DbAuditProvider.log()");
 
 		boolean isSuccess = false;
@@ -113,6 +115,30 @@ public class DbAuditProvider extends BaseAuditProvider {
 				logFailedEvent(event);
 			}
 		}
+		return isSuccess;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
+	}
+
+	@Override
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
+	}
+
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		for (String event : events) {
+			logJSON(event);
+		}
+		return false;
 	}
 
 	@Override
@@ -132,6 +158,13 @@ public class DbAuditProvider extends BaseAuditProvider {
 	@Override
     public void waitToComplete() {
 		LOG.info("DbAuditProvider.waitToComplete()");
+		waitToComplete(-1);
+	}
+
+	@Override
+	public void waitToComplete(long timeout) {
+		LOG.info("DbAuditProvider.waitToComplete():timeout=" + timeout);
+
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
index a6e3ef1..619a99d 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
@@ -17,9 +17,11 @@
  */
 package org.apache.ranger.audit.provider;
 
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
 
 
 public class DummyAuditProvider implements AuditProvider {
@@ -29,8 +31,32 @@ public class DummyAuditProvider implements AuditProvider {
 	}
 
 	@Override
-	public void log(AuditEventBase event) {
+	public boolean log(AuditEventBase event) {
 		// intentionally left empty
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
+	}
+
+	@Override
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
+	}
+
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		for (String event : events) {
+			logJSON(event);
+		}
+		return false;
 	}
 
 	@Override
@@ -48,6 +74,13 @@ public class DummyAuditProvider implements AuditProvider {
 		// intentionally left empty
 	}
 
+	
+	@Override
+	public int getMaxBatchSize() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
 	@Override
 	public boolean isFlushPending() {
 		return false;
@@ -62,4 +95,45 @@ public class DummyAuditProvider implements AuditProvider {
 	public void flush() {
 		// intentionally left empty
 	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties, java.lang.String)
+	 */
+	@Override
+	public void init(Properties prop, String basePropertyName) {
+		// intentionally left empty		
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
+	 */
+	@Override
+	public void waitToComplete(long timeout) {
+		// intentionally left empty		
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.ranger.audit.provider.AuditProvider#getName()
+	 */
+	@Override
+	public String getName() {
+		return this.getClass().getName();
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
+	 */
+	@Override
+	public boolean isDrain() {
+		return false;
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.ranger.audit.provider.AuditProvider#getMaxBatchInterval()
+	 */
+	@Override
+	public int getMaxBatchInterval() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
new file mode 100644
index 0000000..62ecab1
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+/**
+ * This class write the logs to local file
+ */
+public class FileAuditDestination extends AuditDestination {
+	private static final Log logger = LogFactory
+			.getLog(FileAuditDestination.class);
+
+	public static final String PROP_FILE_LOCAL_DIR = "dir";
+	public static final String PROP_FILE_LOCAL_FILE_NAME_FORMAT = "filename.format";
+	public static final String PROP_FILE_FILE_ROLLOVER = "file.rollover.sec";
+
+	String baseFolder = null;
+	String fileFormat = null;
+	int fileRolloverSec = 24 * 60 * 60; // In seconds
+	private String logFileNameFormat;
+
+	boolean initDone = false;
+
+	private File logFolder;
+	PrintWriter logWriter = null;
+
+	private Date fileCreateTime = null;
+
+	private String currentFileName;
+
+	private boolean isStopped = false;
+
+	@Override
+	public void init(Properties prop, String propPrefix) {
+		super.init(prop, propPrefix);
+
+		// Initialize properties for this class
+		// Initial folder and file properties
+		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+				+ "." + PROP_FILE_LOCAL_DIR);
+		logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
+				+ PROP_FILE_LOCAL_FILE_NAME_FORMAT);
+		fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_FILE_FILE_ROLLOVER, fileRolloverSec);
+
+		if (logFolderProp == null || logFolderProp.isEmpty()) {
+			logger.error("File destination folder is not configured. Please set "
+					+ propPrefix
+					+ "."
+					+ PROP_FILE_LOCAL_DIR
+					+ ". name="
+					+ getName());
+			return;
+		}
+		logFolder = new File(logFolderProp);
+		if (!logFolder.isDirectory()) {
+			logFolder.mkdirs();
+			if (!logFolder.isDirectory()) {
+				logger.error("FileDestination folder not found and can't be created. folder="
+						+ logFolder.getAbsolutePath() + ", name=" + getName());
+				return;
+			}
+		}
+		logger.info("logFolder=" + logFolder + ", name=" + getName());
+
+		if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+			logFileNameFormat = "%app-type%_ranger_audit.log";
+		}
+
+		logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
+				+ getName());
+
+		initDone = true;
+	}
+
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		try {
+			PrintWriter out = getLogFileStream();
+			for (String event : events) {
+				out.println(event);
+			}
+			out.flush();
+		} catch (Throwable t) {
+			logError("Error writing to log file.", t);
+			return false;
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
+	 */
+	@Override
+	synchronized public boolean log(Collection<AuditEventBase> events) {
+		if (isStopped) {
+			logError("log() called after stop was requested. name=" + getName());
+			return false;
+		}
+		List<String> jsonList = new ArrayList<String>();
+		for (AuditEventBase event : events) {
+			try {
+				jsonList.add(MiscUtil.stringify(event));
+			} catch (Throwable t) {
+				logger.error("Error converting to JSON. event=" + event);
+			}
+		}
+		return logJSON(jsonList);
+
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+	 */
+	@Override
+	public void start() {
+		// Nothing to do here. We will open the file when the first log request
+		// comes
+	}
+
+	@Override
+	synchronized public void stop() {
+		if (logWriter != null) {
+			logWriter.flush();
+			logWriter.close();
+			logWriter = null;
+			isStopped = true;
+		}
+	}
+
+	// Helper methods in this class
+	synchronized private PrintWriter getLogFileStream() throws Exception {
+		closeFileIfNeeded();
+
+		// Either there are no open log file or the previous one has been rolled
+		// over
+		if (logWriter == null) {
+			Date currentTime = new Date();
+			// Create a new file
+			String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+					currentTime.getTime());
+			File outLogFile = new File(logFolder, fileName);
+			if (outLogFile.exists()) {
+				// Let's try to get the next available file
+				int i = 0;
+				while (true) {
+					i++;
+					int lastDot = fileName.lastIndexOf('.');
+					String baseName = fileName.substring(0, lastDot);
+					String extension = fileName.substring(lastDot);
+					String newFileName = baseName + "." + i + extension;
+					File newLogFile = new File(logFolder, newFileName);
+					if (!newLogFile.exists()) {
+						// Move the file
+						if (!outLogFile.renameTo(newLogFile)) {
+							logger.error("Error renameing file. " + outLogFile
+									+ " to " + newLogFile);
+						}
+						break;
+					}
+				}
+			}
+			if (!outLogFile.exists()) {
+				logger.info("Creating new file. destName=" + getName()
+						+ ", fileName=" + fileName);
+				// Open the file
+				logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+						outLogFile)));
+			} else {
+				logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+						outLogFile, true)));
+			}
+			fileCreateTime = new Date();
+			currentFileName = outLogFile.getPath();
+		}
+		return logWriter;
+	}
+
+	private void closeFileIfNeeded() throws FileNotFoundException, IOException {
+		if (logWriter == null) {
+			return;
+		}
+		if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
+			logger.info("Closing file. Rolling over. name=" + getName()
+					+ ", fileName=" + currentFileName);
+			logWriter.flush();
+			logWriter.close();
+			logWriter = null;
+			currentFileName = null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
new file mode 100644
index 0000000..a36c40f
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+/**
+ * This class write the logs to local file
+ */
+public class HDFSAuditDestination extends AuditDestination {
+	private static final Log logger = LogFactory
+			.getLog(HDFSAuditDestination.class);
+
+	public static final String PROP_HDFS_DIR = "dir";
+	public static final String PROP_HDFS_SUBDIR = "subdir";
+	public static final String PROP_HDFS_FILE_NAME_FORMAT = "filename.format";
+	public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec";
+
+	String baseFolder = null;
+	String fileFormat = null;
+	int fileRolloverSec = 24 * 60 * 60; // In seconds
+	private String logFileNameFormat;
+
+	boolean initDone = false;
+
+	private String logFolder;
+	PrintWriter logWriter = null;
+
+	private Date fileCreateTime = null;
+
+	private String currentFileName;
+
+	private boolean isStopped = false;
+
+	@Override
+	public void init(Properties prop, String propPrefix) {
+		super.init(prop, propPrefix);
+
+		// Initialize properties for this class
+		// Initial folder and file properties
+		String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+				+ "." + PROP_HDFS_DIR);
+		String logSubFolder = MiscUtil.getStringProperty(props, propPrefix
+				+ "." + PROP_HDFS_SUBDIR);
+		if (logSubFolder == null || logSubFolder.isEmpty()) {
+			logSubFolder = "%app-type%/%time:yyyyMMdd%";
+		}
+
+		logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
+				+ PROP_HDFS_FILE_NAME_FORMAT);
+		fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_HDFS_ROLLOVER, fileRolloverSec);
+
+		if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+			logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log";
+		}
+
+		if (logFolderProp == null || logFolderProp.isEmpty()) {
+			logger.fatal("File destination folder is not configured. Please set "
+					+ propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
+			return;
+		}
+
+		logFolder = logFolderProp + "/" + logSubFolder;
+		logger.info("logFolder=" + logFolder + ", destName=" + getName());
+		logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
+				+ getName());
+
+		initDone = true;
+	}
+
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		try {
+			PrintWriter out = getLogFileStream();
+			for (String event : events) {
+				out.println(event);
+			}
+			out.flush();
+		} catch (Throwable t) {
+			logError("Error writing to log file.", t);
+			return false;
+		}
+		return true;
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see
+	 * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
+	 */
+	@Override
+	synchronized public boolean log(Collection<AuditEventBase> events) {
+		if (isStopped) {
+			logError("log() called after stop was requested. name=" + getName());
+			return false;
+		}
+		List<String> jsonList = new ArrayList<String>();
+		for (AuditEventBase event : events) {
+			try {
+				jsonList.add(MiscUtil.stringify(event));
+			} catch (Throwable t) {
+				logger.error("Error converting to JSON. event=" + event);
+			}
+		}
+		return logJSON(jsonList);
+
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ranger.audit.provider.AuditProvider#start()
+	 */
+	@Override
+	public void start() {
+		// Nothing to do here. We will open the file when the first log request
+		// comes
+	}
+
+	@Override
+	synchronized public void stop() {
+		try {
+			if (logWriter != null) {
+				logWriter.flush();
+				logWriter.close();
+				logWriter = null;
+				isStopped = true;
+			}
+		} catch (Throwable t) {
+			logger.error("Error closing HDFS file.", t);
+		}
+	}
+
+	// Helper methods in this class
+	synchronized private PrintWriter getLogFileStream() throws Throwable {
+		closeFileIfNeeded();
+
+		// Either there are no open log file or the previous one has been rolled
+		// over
+		if (logWriter == null) {
+			Date currentTime = new Date();
+			// Create a new file
+			String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+					currentTime.getTime());
+			String parentFolder = MiscUtil.replaceTokens(logFolder,
+					currentTime.getTime());
+			Configuration conf = new Configuration();
+
+			String fullPath = parentFolder
+					+ org.apache.hadoop.fs.Path.SEPARATOR + fileName;
+			String defaultPath = fullPath;
+			URI uri = URI.create(fullPath);
+			FileSystem fileSystem = FileSystem.get(uri, conf);
+
+			Path hdfPath = new Path(fullPath);
+			logger.info("Checking whether log file exists. hdfPath=" + fullPath);
+			int i = 0;
+			while (fileSystem.exists(hdfPath)) {
+				i++;
+				int lastDot = defaultPath.lastIndexOf('.');
+				String baseName = defaultPath.substring(0, lastDot);
+				String extension = defaultPath.substring(lastDot);
+				fullPath = baseName + "." + i + extension;
+				hdfPath = new Path(fullPath);
+				logger.info("Checking whether log file exists. hdfPath=" + fullPath);
+			}
+			logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath);
+			// Create parent folders
+			createParents(hdfPath, fileSystem);
+
+			// Create the file to write
+			logger.info("Creating new log file. hdfPath=" + fullPath);
+			FSDataOutputStream ostream = fileSystem.create(hdfPath);
+			logWriter = new PrintWriter(ostream);
+			fileCreateTime = new Date();
+			currentFileName = fullPath;
+		}
+		return logWriter;
+	}
+
+	private void createParents(Path pathLogfile, FileSystem fileSystem)
+			throws Throwable {
+		logger.info("Creating parent folder for " + pathLogfile);
+		Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;
+
+		if (parentPath != null && fileSystem != null
+				&& !fileSystem.exists(parentPath)) {
+			fileSystem.mkdirs(parentPath);
+		}
+	}
+
+	private void closeFileIfNeeded() throws FileNotFoundException, IOException {
+		if (logWriter == null) {
+			return;
+		}
+		// TODO: Close the file on absolute time. Currently it is implemented as
+		// relative time
+		if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
+			logger.info("Closing file. Rolling over. name=" + getName()
+					+ ", fileName=" + currentFileName);
+			logWriter.flush();
+			logWriter.close();
+			logWriter = null;
+			currentFileName = null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
index cdc4d53..83eb324 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
@@ -502,13 +502,18 @@ class DestinationDispatcherThread<T> extends Thread {
 				break;
 			}
 
-			// loop until log is sent successfully
-			while(!mStopThread && !mDestination.sendStringified(log)) {
-				try {
-					Thread.sleep(destinationPollIntervalInMs);
-				} catch(InterruptedException excp) {
-					throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed while waiting for destination to be available", excp);
+			try {
+				// loop until log is sent successfully
+				while(!mStopThread && !mDestination.sendStringified(log)) {
+					try {
+						Thread.sleep(destinationPollIntervalInMs);
+					} catch(InterruptedException excp) {
+						throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed while waiting for destination to be available", excp);
+					}
 				}
+			} catch ( AuditMessageException msgError) {
+				mLogger.error("Error in log message:" + log);
+				//If there is error in log message, then it will be skipped
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
index 0d0809a..a5a52a0 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -18,17 +18,18 @@
 
 package org.apache.ranger.audit.provider;
 
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import com.sun.tools.hat.internal.util.Misc;
 
 
-public class Log4jAuditProvider extends BaseAuditProvider {
+public class Log4jAuditProvider extends AuditDestination {
 
 	private static final Log LOG      = LogFactory.getLog(Log4jAuditProvider.class);
 	private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class.getName());
@@ -37,7 +38,6 @@ public class Log4jAuditProvider extends BaseAuditProvider {
 	public static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.log4j.async.max.queue.size" ;
 	public static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
 
-	private Gson mGsonBuilder = null;
 
 	public Log4jAuditProvider() {
 		LOG.info("Log4jAuditProvider: creating..");
@@ -48,53 +48,54 @@ public class Log4jAuditProvider extends BaseAuditProvider {
 		LOG.info("Log4jAuditProvider.init()");
 
 		super.init(props);
-
-		try {
-			mGsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
-		} catch(Throwable excp) {
-			LOG.warn("Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json", excp);
-		}
 	}
 
 	@Override
-	public void log(AuditEventBase event) {
+	public boolean log(AuditEventBase event) {
 		if(! AUDITLOG.isInfoEnabled())
-			return;
+			return true;
 		
 		if(event != null) {
-			String eventStr = mGsonBuilder != null ? mGsonBuilder.toJson(event) : event.toString();
-
+			String eventStr = MiscUtil.stringify(event);
 			AUDITLOG.info(eventStr);
 		}
+		return true;
 	}
 
 	@Override
-	public void start() {
-		// intentionally left empty
-	}
-
-	@Override
-	public void stop() {
-		// intentionally left empty
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
 	}
 
 	@Override
-    public void waitToComplete() {
-		// intentionally left empty
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
 	}
 
 	@Override
-	public boolean isFlushPending() {
+	public boolean logJSON(Collection<String> events) {
+		for (String event : events) {
+			logJSON(event);
+		}
 		return false;
 	}
-	
+
 	@Override
-	public long getLastFlushTime() {
-		return 0;
+	public void start() {
+		// intentionally left empty
 	}
 
 	@Override
-	public void flush() {
+	public void stop() {
 		// intentionally left empty
 	}
+
+	
+
+	
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
index 44e94ad..d6f87cf 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ranger.audit.provider;
 
+import org.apache.ranger.audit.model.AuditEventBase;
 
 public interface LogDestination<T> {
 	public void start();
@@ -26,9 +27,20 @@ public interface LogDestination<T> {
 
 	boolean isAvailable();
 
-	public boolean send(T log);
+	public boolean send(AuditEventBase log) throws AuditMessageException;
 
-	public boolean sendStringified(String log);
+	public boolean send(AuditEventBase[] logs) throws AuditMessageException;
+
+	public boolean sendStringified(String log) throws AuditMessageException;
+
+	public boolean sendStringified(String[] logs) throws AuditMessageException;
 
 	public boolean flush();
+
+	/**
+	 * Name for the destination
+	 * 
+	 * @return
+	 */
+	public String getName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
index 17230b2..487da5a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -20,6 +20,12 @@ import java.io.File;
 import java.net.InetAddress;
 import java.rmi.dgc.VMID;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
 import java.util.UUID;
 
 import org.apache.log4j.helpers.LogLog;
@@ -28,88 +34,96 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
 public class MiscUtil {
-	public static final String TOKEN_START        = "%";
-	public static final String TOKEN_END          = "%";
-	public static final String TOKEN_HOSTNAME     = "hostname";
-	public static final String TOKEN_APP_TYPE     = "app-type";
+	public static final String TOKEN_START = "%";
+	public static final String TOKEN_END = "%";
+	public static final String TOKEN_HOSTNAME = "hostname";
+	public static final String TOKEN_APP_TYPE = "app-type";
 	public static final String TOKEN_JVM_INSTANCE = "jvm-instance";
-	public static final String TOKEN_TIME         = "time:";
-	public static final String TOKEN_PROPERTY     = "property:";
-	public static final String TOKEN_ENV          = "env:";
-	public static final String ESCAPE_STR           = "\\";
+	public static final String TOKEN_TIME = "time:";
+	public static final String TOKEN_PROPERTY = "property:";
+	public static final String TOKEN_ENV = "env:";
+	public static final String ESCAPE_STR = "\\";
 
 	static VMID sJvmID = new VMID();
 
 	public static String LINE_SEPARATOR = System.getProperty("line.separator");
 
-	private static Gson   sGsonBuilder = null;
+	private static Gson sGsonBuilder = null;
 	private static String sApplicationType = null;
 
 	static {
 		try {
-			sGsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS").create();
-		} catch(Throwable excp) {
-			LogLog.warn("failed to create GsonBuilder object. stringigy() will return obj.toString(), instead of Json", excp);
+			sGsonBuilder = new GsonBuilder().setDateFormat(
+					"yyyy-MM-dd HH:mm:ss.SSS").create();
+		} catch (Throwable excp) {
+			LogLog.warn(
+					"failed to create GsonBuilder object. stringigy() will return obj.toString(), instead of Json",
+					excp);
 		}
 	}
 
 	public static String replaceTokens(String str, long time) {
-		if(str == null) {
+		if (str == null) {
 			return str;
 		}
 
-		if(time <= 0) {
+		if (time <= 0) {
 			time = System.currentTimeMillis();
 		}
 
-        for(int startPos = 0; startPos < str.length(); ) {
-            int tagStartPos = str.indexOf(TOKEN_START, startPos);
-            
-            if(tagStartPos == -1) {
-            	break;
-            }
-
-            int tagEndPos = str.indexOf(TOKEN_END, tagStartPos + TOKEN_START.length());
-
-            if(tagEndPos == -1) {
-            	break;
-            }
-
-            String tag   = str.substring(tagStartPos, tagEndPos+TOKEN_END.length());
-            String token = tag.substring(TOKEN_START.length(), tag.lastIndexOf(TOKEN_END));
-            String val   = "";
-
-            if(token != null) {
-	            if(token.equals(TOKEN_HOSTNAME)) {
-	            	val = getHostname();
-	            } else if(token.equals(TOKEN_APP_TYPE)) {
-	            	val = getApplicationType();
-	            } else if(token.equals(TOKEN_JVM_INSTANCE)) {
-	            	val = getJvmInstanceId();
-	            } else if(token.startsWith(TOKEN_PROPERTY)) {
-	            	String propertyName = token.substring(TOKEN_PROPERTY.length());
-	
-	                val = getSystemProperty(propertyName);
-	            } else if(token.startsWith(TOKEN_ENV)) {
-	            	String envName = token.substring(TOKEN_ENV.length());
-	
-	                val = getEnv(envName);
-	            } else if(token.startsWith(TOKEN_TIME)) {
-	                String dtFormat = token.substring(TOKEN_TIME.length());
-	                
-	                val = getFormattedTime(time, dtFormat);
-	            }
-            }
-
-            if(val == null) {
-            	val = "";
-            }
-
-            str = str.substring(0, tagStartPos) + val + str.substring(tagEndPos + TOKEN_END.length());
-            startPos = tagStartPos + val.length();
-        }
-
-        return str;
+		for (int startPos = 0; startPos < str.length();) {
+			int tagStartPos = str.indexOf(TOKEN_START, startPos);
+
+			if (tagStartPos == -1) {
+				break;
+			}
+
+			int tagEndPos = str.indexOf(TOKEN_END,
+					tagStartPos + TOKEN_START.length());
+
+			if (tagEndPos == -1) {
+				break;
+			}
+
+			String tag = str.substring(tagStartPos,
+					tagEndPos + TOKEN_END.length());
+			String token = tag.substring(TOKEN_START.length(),
+					tag.lastIndexOf(TOKEN_END));
+			String val = "";
+
+			if (token != null) {
+				if (token.equals(TOKEN_HOSTNAME)) {
+					val = getHostname();
+				} else if (token.equals(TOKEN_APP_TYPE)) {
+					val = getApplicationType();
+				} else if (token.equals(TOKEN_JVM_INSTANCE)) {
+					val = getJvmInstanceId();
+				} else if (token.startsWith(TOKEN_PROPERTY)) {
+					String propertyName = token.substring(TOKEN_PROPERTY
+							.length());
+
+					val = getSystemProperty(propertyName);
+				} else if (token.startsWith(TOKEN_ENV)) {
+					String envName = token.substring(TOKEN_ENV.length());
+
+					val = getEnv(envName);
+				} else if (token.startsWith(TOKEN_TIME)) {
+					String dtFormat = token.substring(TOKEN_TIME.length());
+
+					val = getFormattedTime(time, dtFormat);
+				}
+			}
+
+			if (val == null) {
+				val = "";
+			}
+
+			str = str.substring(0, tagStartPos) + val
+					+ str.substring(tagEndPos + TOKEN_END.length());
+			startPos = tagStartPos + val.length();
+		}
+
+		return str;
 	}
 
 	public static String getHostname() {
@@ -142,7 +156,8 @@ public class MiscUtil {
 		String ret = null;
 
 		try {
-			ret = propertyName != null ? System.getProperty(propertyName) : null;
+			ret = propertyName != null ? System.getProperty(propertyName)
+					: null;
 		} catch (Exception excp) {
 			LogLog.warn("getSystemProperty(" + propertyName + ") failed", excp);
 		}
@@ -166,9 +181,9 @@ public class MiscUtil {
 		String ret = null;
 
 		try {
-            SimpleDateFormat sdf = new SimpleDateFormat(format);
+			SimpleDateFormat sdf = new SimpleDateFormat(format);
 
-            ret = sdf.format(time);
+			ret = sdf.format(time);
 		} catch (Exception excp) {
 			LogLog.warn("SimpleDateFormat.format() failed: " + format, excp);
 		}
@@ -177,15 +192,16 @@ public class MiscUtil {
 	}
 
 	public static void createParents(File file) {
-		if(file != null) {
+		if (file != null) {
 			String parentName = file.getParent();
 
 			if (parentName != null) {
 				File parentDir = new File(parentName);
 
-				if(!parentDir.exists()) {
-					if(! parentDir.mkdirs()) {
-						LogLog.warn("createParents(): failed to create " + parentDir.getAbsolutePath());
+				if (!parentDir.exists()) {
+					if (!parentDir.mkdirs()) {
+						LogLog.warn("createParents(): failed to create "
+								+ parentDir.getAbsolutePath());
 					}
 				}
 			}
@@ -195,14 +211,16 @@ public class MiscUtil {
 	public static long getNextRolloverTime(long lastRolloverTime, long interval) {
 		long now = System.currentTimeMillis() / 1000 * 1000; // round to second
 
-		if(lastRolloverTime <= 0) {
-			// should this be set to the next multiple-of-the-interval from start of the day?
+		if (lastRolloverTime <= 0) {
+			// should this be set to the next multiple-of-the-interval from
+			// start of the day?
 			return now + interval;
-		} else if(lastRolloverTime <= now) {
+		} else if (lastRolloverTime <= now) {
 			long nextRolloverTime = now + interval;
 
 			// keep it at 'interval' boundary
-			long trimInterval = (nextRolloverTime - lastRolloverTime) % interval;
+			long trimInterval = (nextRolloverTime - lastRolloverTime)
+					% interval;
 
 			return nextRolloverTime - trimInterval;
 		} else {
@@ -211,23 +229,24 @@ public class MiscUtil {
 	}
 
 	public static long getRolloverStartTime(long nextRolloverTime, long interval) {
-		return (nextRolloverTime <= interval) ? System.currentTimeMillis() : nextRolloverTime - interval;
+		return (nextRolloverTime <= interval) ? System.currentTimeMillis()
+				: nextRolloverTime - interval;
 	}
 
 	public static int parseInteger(String str, int defValue) {
 		int ret = defValue;
 
-		if(str != null) {
+		if (str != null) {
 			try {
 				ret = Integer.parseInt(str);
-			} catch(Exception excp) {
+			} catch (Exception excp) {
 				// ignore
 			}
 		}
 
 		return ret;
 	}
-	
+
 	public static String generateUniqueId() {
 		return UUID.randomUUID().toString();
 	}
@@ -235,10 +254,10 @@ public class MiscUtil {
 	public static <T> String stringify(T log) {
 		String ret = null;
 
-		if(log != null) {
-			if(log instanceof String) {
-				ret = (String)log;
-			} else if(MiscUtil.sGsonBuilder != null) {
+		if (log != null) {
+			if (log instanceof String) {
+				ret = (String) log;
+			} else if (MiscUtil.sGsonBuilder != null) {
 				ret = MiscUtil.sGsonBuilder.toJson(log);
 			} else {
 				ret = log.toString();
@@ -247,4 +266,114 @@ public class MiscUtil {
 
 		return ret;
 	}
+
+	static public <T> T fromJson(String jsonStr, Class<T> clazz) {
+		return sGsonBuilder.fromJson(jsonStr, clazz);
+	}
+
+	public static String getStringProperty(Properties props, String propName) {
+		String ret = null;
+
+		if (props != null && propName != null) {
+			String val = props.getProperty(propName);
+			if (val != null) {
+				ret = val;
+			}
+		}
+
+		return ret;
+	}
+
+	public static boolean getBooleanProperty(Properties props, String propName,
+			boolean defValue) {
+		boolean ret = defValue;
+
+		if (props != null && propName != null) {
+			String val = props.getProperty(propName);
+
+			if (val != null) {
+				ret = Boolean.valueOf(val);
+			}
+		}
+
+		return ret;
+	}
+
+	public static int getIntProperty(Properties props, String propName,
+			int defValue) {
+		int ret = defValue;
+
+		if (props != null && propName != null) {
+			String val = props.getProperty(propName);
+			if (val != null) {
+				try {
+					ret = Integer.parseInt(val);
+				} catch (NumberFormatException excp) {
+					ret = defValue;
+				}
+			}
+		}
+
+		return ret;
+	}
+
+	public static long getLongProperty(Properties props, String propName,
+			long defValue) {
+		long ret = defValue;
+
+		if (props != null && propName != null) {
+			String val = props.getProperty(propName);
+			if (val != null) {
+				try {
+					ret = Long.parseLong(val);
+				} catch (NumberFormatException excp) {
+					ret = defValue;
+				}
+			}
+		}
+
+		return ret;
+	}
+
+	public static Map<String, String> getPropertiesWithPrefix(Properties props,
+			String prefix) {
+		Map<String, String> prefixedProperties = new HashMap<String, String>();
+
+		if (props != null && prefix != null) {
+			for (String key : props.stringPropertyNames()) {
+				if (key == null) {
+					continue;
+				}
+
+				String val = props.getProperty(key);
+
+				if (key.startsWith(prefix)) {
+					key = key.substring(prefix.length());
+
+					if (key == null) {
+						continue;
+					}
+
+					prefixedProperties.put(key, val);
+				}
+			}
+		}
+
+		return prefixedProperties;
+	}
+
+	/**
+	 * @param destListStr
+	 * @param delim
+	 * @return
+	 */
+	public static List<String> toArray(String destListStr, String delim) {
+		List<String> list = new ArrayList<String>();
+		StringTokenizer tokenizer = new StringTokenizer(destListStr, delim);
+		while (tokenizer.hasMoreTokens()) {
+			list.add(tokenizer.nextToken());
+		}
+		return list;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 1eec345..57ac0a0 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -18,6 +18,7 @@
 package org.apache.ranger.audit.provider;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
@@ -25,14 +26,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 
-
 public class MultiDestAuditProvider extends BaseAuditProvider {
 
-	private static final Log LOG = LogFactory.getLog(MultiDestAuditProvider.class);
+	private static final Log LOG = LogFactory
+			.getLog(MultiDestAuditProvider.class);
 
 	protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>();
 
-
 	public MultiDestAuditProvider() {
 		LOG.info("MultiDestAuditProvider: creating..");
 	}
@@ -47,109 +47,166 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
 
 		super.init(props);
 
-		for(AuditProvider provider : mProviders) {
-    		try {
-                provider.init(props);
-    		} catch(Throwable excp) {
-    			LOG.info("MultiDestAuditProvider.init(): failed " + provider.getClass().getCanonicalName() + ")", excp);
-    		}
-        }
+		for (AuditProvider provider : mProviders) {
+			try {
+				provider.init(props);
+			} catch (Throwable excp) {
+				LOG.info("MultiDestAuditProvider.init(): failed "
+						+ provider.getClass().getCanonicalName() + ")", excp);
+			}
+		}
 	}
 
 	public void addAuditProvider(AuditProvider provider) {
-		if(provider != null) {
-			LOG.info("MultiDestAuditProvider.addAuditProvider(providerType=" + provider.getClass().getCanonicalName() + ")");
+		if (provider != null) {
+			LOG.info("MultiDestAuditProvider.addAuditProvider(providerType="
+					+ provider.getClass().getCanonicalName() + ")");
 
 			mProviders.add(provider);
 		}
 	}
 
 	public void addAuditProviders(List<AuditProvider> providers) {
-		if(providers != null) {
-			for(AuditProvider provider : providers) {
+		if (providers != null) {
+			for (AuditProvider provider : providers) {
 				addAuditProvider(provider);
 			}
 		}
 	}
 
 	@Override
-	public void log(AuditEventBase event) {
-        for(AuditProvider provider : mProviders) {
-    		try {
-                provider.log(event);
-    		} catch(Throwable excp) {
-    			logFailedEvent(event, excp);
-    		}
-        }
+	public boolean log(AuditEventBase event) {
+		for (AuditProvider provider : mProviders) {
+			try {
+				provider.log(event);
+			} catch (Throwable excp) {
+				logFailedEvent(event, excp);
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditProvider provider : mProviders) {
+			try {
+				provider.log(events);
+			} catch (Throwable excp) {
+				logFailedEvent(events, excp);
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public boolean logJSON(String event) {
+		for (AuditProvider provider : mProviders) {
+			try {
+				provider.logJSON(event);
+			} catch (Throwable excp) {
+				logFailedEventJSON(event, excp);
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		for (AuditProvider provider : mProviders) {
+			try {
+				provider.logJSON(events);
+			} catch (Throwable excp) {
+				logFailedEventJSON(events, excp);
+			}
+		}
+		return true;
 	}
 
 	@Override
 	public void start() {
-		for(AuditProvider provider : mProviders) {
-    		try {
+		for (AuditProvider provider : mProviders) {
+			try {
 				provider.start();
-    		} catch(Throwable excp) {
-    			LOG.error("AsyncAuditProvider.start(): failed for provider { " + provider.getClass().getName() + " }", excp);
-    		}
+			} catch (Throwable excp) {
+				LOG.error("AsyncAuditProvider.start(): failed for provider { "
+						+ provider.getClass().getName() + " }", excp);
+			}
 		}
 	}
 
 	@Override
 	public void stop() {
-		for(AuditProvider provider : mProviders) {
+		for (AuditProvider provider : mProviders) {
 			try {
 				provider.stop();
-			} catch(Throwable excp) {
-    			LOG.error("AsyncAuditProvider.stop(): failed for provider { " + provider.getClass().getName() + " }", excp);
+			} catch (Throwable excp) {
+				LOG.error("AsyncAuditProvider.stop(): failed for provider { "
+						+ provider.getClass().getName() + " }", excp);
 			}
 		}
 	}
 
 	@Override
-    public void waitToComplete() {
-		for(AuditProvider provider : mProviders) {
+	public void waitToComplete() {
+		for (AuditProvider provider : mProviders) {
 			try {
 				provider.waitToComplete();
-			} catch(Throwable excp) {
-    			LOG.error("AsyncAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName() + " }", excp);
+			} catch (Throwable excp) {
+				LOG.error(
+						"AsyncAuditProvider.waitToComplete(): failed for provider { "
+								+ provider.getClass().getName() + " }", excp);
+			}
+		}
+	}
+
+	@Override
+	public void waitToComplete(long timeout) {
+		for (AuditProvider provider : mProviders) {
+			try {
+				provider.waitToComplete(timeout);
+			} catch (Throwable excp) {
+				LOG.error(
+						"AsyncAuditProvider.waitToComplete(): failed for provider { "
+								+ provider.getClass().getName() + " }", excp);
 			}
 		}
 	}
-	
+
 	@Override
 	public boolean isFlushPending() {
-		for(AuditProvider provider : mProviders) {
-			if(provider.isFlushPending()) {
+		for (AuditProvider provider : mProviders) {
+			if (provider.isFlushPending()) {
 				return true;
 			}
 		}
-		
+
 		return false;
 	}
-	
+
 	@Override
 	public long getLastFlushTime() {
 		long lastFlushTime = 0;
-		for(AuditProvider provider : mProviders) {
+		for (AuditProvider provider : mProviders) {
 			long flushTime = provider.getLastFlushTime();
-			
-			if(flushTime != 0) {
-				if(lastFlushTime == 0 || lastFlushTime > flushTime) {
+
+			if (flushTime != 0) {
+				if (lastFlushTime == 0 || lastFlushTime > flushTime) {
 					lastFlushTime = flushTime;
 				}
 			}
 		}
-		
+
 		return lastFlushTime;
 	}
-	
+
 	@Override
 	public void flush() {
-		for(AuditProvider provider : mProviders) {
+		for (AuditProvider provider : mProviders) {
 			try {
 				provider.flush();
-			} catch(Throwable excp) {
-    			LOG.error("AsyncAuditProvider.flush(): failed for provider { " + provider.getClass().getName() + " }", excp);
+			} catch (Throwable excp) {
+				LOG.error("AsyncAuditProvider.flush(): failed for provider { "
+						+ provider.getClass().getName() + " }", excp);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
index 620951c..a18e3e9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
@@ -22,7 +22,6 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
 import org.apache.ranger.audit.provider.BufferedAuditProvider;
 import org.apache.ranger.audit.provider.DebugTracer;
 import org.apache.ranger.audit.provider.LocalFileLogBuffer;
@@ -44,7 +43,7 @@ public class HdfsAuditProvider extends BufferedAuditProvider {
 
 		super.init(props);
 
-		Map<String, String> hdfsProps = BaseAuditProvider.getPropertiesWithPrefix(props, "xasecure.audit.hdfs.config.");
+		Map<String, String> hdfsProps = MiscUtil.getPropertiesWithPrefix(props, "xasecure.audit.hdfs.config.");
 
 		String encoding                                = hdfsProps.get("encoding");
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
index 6b5cb4b..49f4e65 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.provider.DebugTracer;
 import org.apache.ranger.audit.provider.LogDestination;
 import org.apache.ranger.audit.provider.MiscUtil;
@@ -36,6 +37,8 @@ import org.apache.ranger.audit.provider.MiscUtil;
 public class HdfsLogDestination<T> implements LogDestination<T> {
 	public final static String EXCP_MSG_FILESYSTEM_CLOSED = "Filesystem closed";
 
+	private String name = getClass().getName();
+	
 	private String  mDirectory                = null;
 	private String  mFile                     = null;
 	private int     mFlushIntervalSeconds     = 1 * 60;
@@ -57,6 +60,20 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
 		mLogger = tracer;
 	}
 
+	
+	public void setName(String name) {
+		this.name = name;
+	}
+
+
+	/* (non-Javadoc)
+	 * @see org.apache.ranger.audit.provider.LogDestination#getName()
+	 */
+	@Override
+	public String getName() {
+		return name;
+	}
+	
 	public String getDirectory() {
 		return mDirectory;
 	}
@@ -133,11 +150,11 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
 	}
 
 	@Override
-	public boolean send(T log) {
-		boolean ret = false;
+	public boolean send(AuditEventBase log) {
+		boolean ret = true;
 		
 		if(log != null) {
-			String msg = log.toString();
+			String msg = MiscUtil.stringify(log);
 
 			ret = sendStringified(msg);
 		}
@@ -145,6 +162,18 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
 		return ret;
 	}
 
+	
+	@Override
+	public boolean send(AuditEventBase[] logs) {
+		for(int i = 0; i < logs.length; i++) {
+			boolean ret = send(logs[i]);
+			if(!ret) {
+				return ret;
+			}
+		}
+		return true;
+	}
+
 	@Override
 	public boolean sendStringified(String log) {
 		boolean ret = false;
@@ -169,6 +198,18 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
 	}
 
 	@Override
+	public boolean sendStringified(String[] logs) {
+		for(int i = 0; i < logs.length; i++) {
+			boolean ret = sendStringified(logs[i]);
+			if(!ret) {
+				return ret;
+			}
+		}
+		return true;
+	}
+	
+	
+	@Override
 	public boolean flush() {
 		mLogger.debug("==> HdfsLogDestination.flush()");
 
@@ -448,4 +489,5 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
 		
 		return sb.toString();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
index 0ec8790..5f39e69 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ranger.audit.provider.kafka;
 
+import java.util.Collection;
 import java.util.Properties;
 
 import kafka.javaapi.producer.Producer;
@@ -46,12 +47,12 @@ public class KafkaAuditProvider extends BaseAuditProvider {
 		LOG.info("init() called");
 		super.init(props);
 
-		setMaxQueueSize(BaseAuditProvider.getIntProperty(props,
-				AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT));
-		setMaxFlushInterval(BaseAuditProvider.getIntProperty(props,
+		setMaxQueueSize(MiscUtil.getIntProperty(props,
+				AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
+		setMaxBatchInterval(MiscUtil.getIntProperty(props,
 				AUDIT_MAX_QUEUE_SIZE_PROP,
-				AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT));
-		topic = BaseAuditProvider.getStringProperty(props,
+				AUDIT_BATCH_INTERVAL_DEFAULT_MS));
+		topic = MiscUtil.getStringProperty(props,
 				AUDIT_KAFKA_TOPIC_NAME);
 		if (topic == null || topic.isEmpty()) {
 			topic = "ranger_audits";
@@ -59,7 +60,7 @@ public class KafkaAuditProvider extends BaseAuditProvider {
 
 		try {
 			if (!initDone) {
-				String brokerList = BaseAuditProvider.getStringProperty(props,
+				String brokerList = MiscUtil.getStringProperty(props,
 						AUDIT_KAFKA_BROKER_LIST);
 				if (brokerList == null || brokerList.isEmpty()) {
 					brokerList = "localhost:9092";
@@ -87,7 +88,7 @@ public class KafkaAuditProvider extends BaseAuditProvider {
 	}
 
 	@Override
-	public void log(AuditEventBase event) {
+	public boolean log(AuditEventBase event) {
 		if (event instanceof AuthzAuditEvent) {
 			AuthzAuditEvent authzEvent = (AuthzAuditEvent) event;
 
@@ -118,7 +119,32 @@ public class KafkaAuditProvider extends BaseAuditProvider {
 		} catch (Throwable t) {
 			LOG.error("Error sending message to Kafka topic. topic=" + topic
 					+ ", message=" + message, t);
+			return false;
 		}
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
+	}
+
+	@Override
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
+	}
+
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		for (String event : events) {
+			logJSON(event);
+		}
+		return false;
 	}
 
 	@Override
@@ -143,8 +169,10 @@ public class KafkaAuditProvider extends BaseAuditProvider {
 	@Override
 	public void waitToComplete() {
 		LOG.info("waitToComplete() called");
-		// TODO Auto-generated method stub
-
+	}
+	
+	@Override
+	public void waitToComplete(long timeout) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
index 1b463e6..9ee4ec0 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.audit.provider.solr;
 
+import java.util.Collection;
 import java.util.Date;
 import java.util.Properties;
 
@@ -55,12 +56,12 @@ public class SolrAuditProvider extends BaseAuditProvider {
 		LOG.info("init() called");
 		super.init(props);
 
-		setMaxQueueSize(BaseAuditProvider.getIntProperty(props,
-				AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT));
-		setMaxFlushInterval(BaseAuditProvider.getIntProperty(props,
+		setMaxQueueSize(MiscUtil.getIntProperty(props,
+				AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
+		setMaxBatchInterval(MiscUtil.getIntProperty(props,
 				AUDIT_MAX_QUEUE_SIZE_PROP,
-				AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT));
-		retryWaitTime = BaseAuditProvider.getIntProperty(props,
+				AUDIT_BATCH_INTERVAL_DEFAULT_MS));
+		retryWaitTime = MiscUtil.getIntProperty(props,
 				AUDIT_RETRY_WAIT_PROP, retryWaitTime);
 	}
 
@@ -68,7 +69,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
 		if (solrClient == null) {
 			synchronized (lock) {
 				if (solrClient == null) {
-					String solrURL = BaseAuditProvider.getStringProperty(props,
+					String solrURL = MiscUtil.getStringProperty(props,
 							"xasecure.audit.solr.solr_url");
 
 					if (lastConnectTime != null) {
@@ -118,11 +119,11 @@ public class SolrAuditProvider extends BaseAuditProvider {
 	 * audit.model.AuditEventBase)
 	 */
 	@Override
-	public void log(AuditEventBase event) {
+	public boolean log(AuditEventBase event) {
 		if (!(event instanceof AuthzAuditEvent)) {
 			LOG.error(event.getClass().getName()
 					+ " audit event class type is not supported");
-			return;
+			return false;
 		}
 		AuthzAuditEvent authzEvent = (AuthzAuditEvent) event;
 		// TODO: This should be done at a higher level
@@ -144,7 +145,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
 				connect();
 				if (solrClient == null) {
 					// Solr is still not initialized. So need to throw error
-					return;
+					return false;
 				}
 			}
 
@@ -155,7 +156,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
 						LOG.debug("Ignore sending audit. lastConnect=" + diff
 								+ " ms");
 					}
-					return;
+					return false;
 				}
 			}
 			// Convert AuditEventBase to Solr document
@@ -176,8 +177,32 @@ public class SolrAuditProvider extends BaseAuditProvider {
 
 		} catch (Throwable t) {
 			LOG.error("Error sending message to Solr", t);
+			return false;
 		}
+		return true;
+	}
+
+	@Override
+	public boolean log(Collection<AuditEventBase> events) {
+		for (AuditEventBase event : events) {
+			log(event);
+		}
+		return true;
+	}
+
+	@Override
+	public boolean logJSON(String event) {
+		AuditEventBase eventObj = MiscUtil.fromJson(event,
+				AuthzAuditEvent.class);
+		return log(eventObj);
+	}
 
+	@Override
+	public boolean logJSON(Collection<String> events) {
+		for (String event : events) {
+			logJSON(event);
+		}
+		return false;
 	}
 
 	/*
@@ -208,10 +233,15 @@ public class SolrAuditProvider extends BaseAuditProvider {
 	 */
 	@Override
 	public void waitToComplete() {
-		// TODO Auto-generated method stub
 
 	}
 
+	
+	@Override
+	public void waitToComplete(long timeout) {
+		
+	}
+
 	/*
 	 * (non-Javadoc)
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/security-admin/.gitignore
----------------------------------------------------------------------
diff --git a/security-admin/.gitignore b/security-admin/.gitignore
index 4a3ed53..bf7dc37 100644
--- a/security-admin/.gitignore
+++ b/security-admin/.gitignore
@@ -3,3 +3,6 @@
 /bin/
 /target
 .settings/
+.pydevproject
+log4j.xml
+*.log

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/security-admin/pom.xml
----------------------------------------------------------------------
diff --git a/security-admin/pom.xml b/security-admin/pom.xml
index 3220886..286740c 100644
--- a/security-admin/pom.xml
+++ b/security-admin/pom.xml
@@ -251,6 +251,8 @@
 		<dependency>
 		    <groupId>junit</groupId>
 		    <artifactId>junit</artifactId>
+		    <version>4.11</version>
+		    <scope>test</scope>
 		</dependency>
 		<dependency>
 			<groupId>org.mockito</groupId>
@@ -421,13 +423,15 @@
 					<additionalClasspathElement>${project.basedir}/src/main/webapp/META-INF</additionalClasspathElement>
 				</additionalClasspathElements>
 			</configuration>
+<!--
 			<dependencies>
 				<dependency>
 					<groupId>org.apache.maven.surefire</groupId>
 					<artifactId>surefire-junit47</artifactId>
-					<version>2.17</version>
+					<version>2.18</version>
 				</dependency>
 			</dependencies>
+-->
 		</plugin>
 
 	</plugins>