You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/22 19:23:28 UTC

[11/12] incubator-ranger git commit: RANGER-397 - Implement reliable streaming audits to configurable destinations - Incorporate Review Feedback

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
index e102d8b..3e1940b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -25,26 +25,25 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 import org.apache.ranger.audit.provider.MiscUtil;
 
 /**
  * This is a non-blocking queue with no limit on capacity.
  */
-public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
+public class AuditSummaryQueue extends AuditQueue implements Runnable {
 	private static final Log logger = LogFactory
 			.getLog(AuditSummaryQueue.class);
 
 	public static final String PROP_SUMMARY_INTERVAL = "summary.interval.ms";
 
-	LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+	LinkedBlockingQueue<AuditEventBase> queue = new LinkedBlockingQueue<AuditEventBase>();
 	Thread consumerThread = null;
 
 	static int threadCount = 0;
@@ -52,15 +51,11 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 
 	private static final int MAX_DRAIN = 100000;
 
-	private int maxSummaryInterval = 5000;
+	private int maxSummaryIntervalMs = 5000;
 
 	HashMap<String, AuditSummary> summaryMap = new HashMap<String, AuditSummary>();
 
-	public AuditSummaryQueue() {
-		setName(DEFAULT_NAME);
-	}
-
-	public AuditSummaryQueue(AuditProvider consumer) {
+	public AuditSummaryQueue(AuditHandler consumer) {
 		super(consumer);
 		setName(DEFAULT_NAME);
 	}
@@ -68,9 +63,9 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 	@Override
 	public void init(Properties props, String propPrefix) {
 		super.init(props, propPrefix);
-		maxSummaryInterval = MiscUtil.getIntProperty(props, propPrefix + "."
-				+ PROP_SUMMARY_INTERVAL, maxSummaryInterval);
-		logger.info("maxSummaryInterval=" + maxSummaryInterval + ", name="
+		maxSummaryIntervalMs = MiscUtil.getIntProperty(props, propPrefix + "."
+				+ PROP_SUMMARY_INTERVAL, maxSummaryIntervalMs);
+		logger.info("maxSummaryInterval=" + maxSummaryIntervalMs + ", name="
 				+ getName());
 	}
 
@@ -88,7 +83,6 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 			return false;
 		}
 		queue.add(event);
-		addLifeTimeInLogCount(1);
 		return true;
 	}
 
@@ -133,23 +127,10 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 			if (consumerThread != null) {
 				consumerThread.interrupt();
 			}
-			consumerThread = null;
 		} catch (Throwable t) {
 			// ignore any exception
 		}
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-	 */
-	@Override
-	public boolean isFlushPending() {
-		if (queue.isEmpty()) {
-			return consumer.isFlushPending();
-		}
-		return true;
+		consumerThread = null;
 	}
 
 	/*
@@ -164,7 +145,7 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 		while (true) {
 			// Time to next dispatch
 			long nextDispatchDuration = lastDispatchTime
-					- System.currentTimeMillis() + maxSummaryInterval;
+					- System.currentTimeMillis() + maxSummaryIntervalMs;
 
 			Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
 
@@ -184,7 +165,7 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 				} else {
 					// poll returned due to timeout, so reseting clock
 					nextDispatchDuration = lastDispatchTime
-							- System.currentTimeMillis() + maxSummaryInterval;
+							- System.currentTimeMillis() + maxSummaryIntervalMs;
 					lastDispatchTime = System.currentTimeMillis();
 				}
 			} catch (InterruptedException e) {
@@ -213,6 +194,9 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 			}
 
 			if (isDrain() || nextDispatchDuration <= 0) {
+				// Reset time just before sending the logs
+				lastDispatchTime = System.currentTimeMillis();
+
 				for (Map.Entry<String, AuditSummary> entry : summaryMap
 						.entrySet()) {
 					AuditSummary auditSummary = entry.getValue();
@@ -221,9 +205,6 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
 							- auditSummary.startTime.getTime();
 					timeDiff = timeDiff > 0 ? timeDiff : 1;
 					auditSummary.event.setEventDurationMS(timeDiff);
-
-					// Reset time just before sending the logs
-					lastDispatchTime = System.currentTimeMillis();
 					boolean ret = consumer.log(auditSummary.event);
 					if (!ret) {
 						// We need to drop this event

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
index c2dc955..87c6a8f 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
@@ -23,7 +23,7 @@ import org.apache.log4j.xml.DOMConfigurator;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 import org.apache.ranger.audit.model.EnumRepositoryType;
-import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 import org.apache.ranger.audit.provider.AuditProviderFactory;
 import org.apache.commons.logging.LogFactory;
 
@@ -74,7 +74,7 @@ public class TestEvents {
 
             AuditProviderFactory.getInstance().init(auditProperties, "hdfs");
 
-            AuditProvider provider = AuditProviderFactory.getAuditProvider();
+            AuditHandler provider = AuditProviderFactory.getAuditProvider();
 
             LOG.info("provider=" + provider.toString());
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
----------------------------------------------------------------------
diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
index 45477e2..021c49a 100644
--- a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
@@ -32,14 +32,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.destination.FileAuditDestination;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
 import org.apache.ranger.audit.provider.AuditProviderFactory;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
 import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.ranger.audit.provider.MultiDestAuditProvider;
 import org.apache.ranger.audit.queue.AuditAsyncQueue;
 import org.apache.ranger.audit.queue.AuditBatchQueue;
 import org.apache.ranger.audit.queue.AuditFileSpool;
+import org.apache.ranger.audit.queue.AuditQueue;
 import org.apache.ranger.audit.queue.AuditSummaryQueue;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -93,9 +94,9 @@ public class TestAuditQueue {
 		AuditSummaryQueue queue = new AuditSummaryQueue(testConsumer);
 
 		Properties props = new Properties();
-		props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
+		props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "."
 				+ AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 300);
-		queue.init(props, BaseAuditProvider.PROP_DEFAULT_PREFIX);
+		queue.init(props, BaseAuditHandler.PROP_DEFAULT_PREFIX);
 
 		queue.start();
 
@@ -103,7 +104,7 @@ public class TestAuditQueue {
 	}
 
 	private void commonTestSummary(TestConsumer testConsumer,
-			BaseAuditProvider queue) {
+			BaseAuditHandler queue) {
 		int messageToSend = 0;
 		int pauseMS = 330;
 
@@ -171,7 +172,6 @@ public class TestAuditQueue {
 		}
 		assertEquals(messageToSend, testConsumer.getSumTotal());
 		assertEquals(countToCheck, testConsumer.getCountTotal());
-		assertNull("Event not in sequnce", testConsumer.isInSequence());
 	}
 
 	@Test
@@ -182,22 +182,23 @@ public class TestAuditQueue {
 		// Destination
 		String propPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".test";
 		props.put(propPrefix, "enable");
-		props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "summary" + "."
+		props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "summary" + "."
 				+ "enabled", "true");
-		props.put(propPrefix + "." + BaseAuditProvider.PROP_NAME, "test");
-		props.put(propPrefix + "." + BaseAuditProvider.PROP_QUEUE, "none");
+		props.put(propPrefix + "." + BaseAuditHandler.PROP_NAME, "test");
+		props.put(propPrefix + "." + AuditQueue.PROP_QUEUE, "none");
 
-		props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
+		props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "."
 				+ AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 300);
-		props.put(propPrefix + "." + BaseAuditProvider.PROP_CLASS_NAME,
+		props.put(propPrefix + "." + BaseAuditHandler.PROP_CLASS_NAME,
 				TestConsumer.class.getName());
 
 		AuditProviderFactory factory = AuditProviderFactory.getInstance();
 		factory.init(props, "test");
-		BaseAuditProvider queue = (BaseAuditProvider) factory.getProvider();
-		BaseAuditProvider consumer = (BaseAuditProvider) queue.getConsumer();
-		while (consumer.getConsumer() != null) {
-			consumer = (BaseAuditProvider) consumer.getConsumer();
+		AuditQueue queue = (AuditQueue) factory.getProvider();
+		BaseAuditHandler consumer = (BaseAuditHandler) queue.getConsumer();
+		while (consumer != null && consumer instanceof AuditQueue) {
+			AuditQueue cQueue = (AuditQueue) consumer;
+			consumer = (BaseAuditHandler) cQueue.getConsumer();
 		}
 		assertTrue("Consumer should be TestConsumer. class="
 				+ consumer.getClass().getName(),
@@ -257,12 +258,12 @@ public class TestAuditQueue {
 		int queueSize = messageToSend * 2;
 		int intervalMS = messageToSend * 100; // Deliberately big interval
 		Properties props = new Properties();
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
 				+ batchSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
 				+ queueSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
-				"" + intervalMS);
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+				+ intervalMS);
 
 		TestConsumer testConsumer = new TestConsumer();
 		AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
@@ -308,12 +309,12 @@ public class TestAuditQueue {
 		int expectedBatchSize = (messageToSend * pauseMS) / intervalMS + 1;
 
 		Properties props = new Properties();
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
 				+ batchSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
 				+ queueSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
-				"" + intervalMS);
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+				+ intervalMS);
 
 		TestConsumer testConsumer = new TestConsumer();
 		AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
@@ -356,15 +357,15 @@ public class TestAuditQueue {
 		int queueSize = messageToSend * 2;
 		int intervalMS = Integer.MAX_VALUE; // Deliberately big interval
 		Properties props = new Properties();
-		props.put(basePropName + "." + BaseAuditProvider.PROP_NAME,
+		props.put(basePropName + "." + BaseAuditHandler.PROP_NAME,
 				"testAuditBatchQueueDestDown");
 
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
 				+ batchSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
 				+ queueSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
-				"" + intervalMS);
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+				+ intervalMS);
 
 		// Enable File Spooling
 		props.put(basePropName + "." + "filespool.enable", "" + true);
@@ -410,21 +411,20 @@ public class TestAuditQueue {
 		int intervalMS = 3000; // Deliberately big interval
 		Properties props = new Properties();
 		props.put(
-				basePropName + "." + BaseAuditProvider.PROP_NAME,
+				basePropName + "." + BaseAuditHandler.PROP_NAME,
 				"testAuditBatchQueueDestDownFlipFlop_"
 						+ MiscUtil.generateUniqueId());
 
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
 				+ batchSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
 				+ queueSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
-				"" + intervalMS);
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+				+ intervalMS);
 
 		// Enable File Spooling
 		int destRetryMS = 10;
-		props.put(
-				basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+		props.put(basePropName + "." + AuditQueue.PROP_FILE_SPOOL_ENABLE,
 				"" + true);
 		props.put(
 				basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
@@ -499,21 +499,20 @@ public class TestAuditQueue {
 		int maxArchivedFiles = 1;
 		Properties props = new Properties();
 		props.put(
-				basePropName + "." + BaseAuditProvider.PROP_NAME,
+				basePropName + "." + BaseAuditHandler.PROP_NAME,
 				"testAuditBatchQueueDestDownRestart_"
 						+ MiscUtil.generateUniqueId());
 
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
 				+ batchSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+		props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
 				+ queueSize);
-		props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
-				"" + intervalMS);
+		props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+				+ intervalMS);
 
 		// Enable File Spooling
 		int destRetryMS = 10;
-		props.put(
-				basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+		props.put(basePropName + "." + AuditQueue.PROP_FILE_SPOOL_ENABLE,
 				"" + true);
 		props.put(
 				basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
@@ -598,7 +597,7 @@ public class TestAuditQueue {
 		// Destination
 		String filePropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".file";
 		props.put(filePropPrefix, "enable");
-		props.put(filePropPrefix + "." + BaseAuditProvider.PROP_NAME, "file");
+		props.put(filePropPrefix + "." + AuditQueue.PROP_NAME, "file");
 		props.put(filePropPrefix + "."
 				+ FileAuditDestination.PROP_FILE_LOCAL_DIR, logFolderName);
 		props.put(filePropPrefix + "."
@@ -607,21 +606,20 @@ public class TestAuditQueue {
 		props.put(filePropPrefix + "."
 				+ FileAuditDestination.PROP_FILE_FILE_ROLLOVER, "" + 10);
 
-		props.put(filePropPrefix + "." + BaseAuditProvider.PROP_QUEUE, "batch");
+		props.put(filePropPrefix + "." + AuditQueue.PROP_QUEUE, "batch");
 		String batchPropPrefix = filePropPrefix + "." + "batch";
 
-		props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+		props.put(batchPropPrefix + "." + AuditQueue.PROP_BATCH_SIZE, ""
 				+ batchSize);
-		props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+		props.put(batchPropPrefix + "." + AuditQueue.PROP_QUEUE_SIZE, ""
 				+ queueSize);
-		props.put(
-				batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+		props.put(batchPropPrefix + "." + AuditQueue.PROP_BATCH_INTERVAL,
 				"" + intervalMS);
 
 		// Enable File Spooling
 		int destRetryMS = 10;
 		props.put(batchPropPrefix + "."
-				+ BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, "" + true);
+				+ AuditQueue.PROP_FILE_SPOOL_ENABLE, "" + true);
 		props.put(batchPropPrefix + "."
 				+ AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, "target");
 		props.put(batchPropPrefix + "."
@@ -638,7 +636,7 @@ public class TestAuditQueue {
 		// queue.init(props, batchPropPrefix);
 		// queue.start();
 
-		AuditProvider queue = factory.getProvider();
+		AuditHandler queue = factory.getProvider();
 
 		for (int i = 0; i < messageToSend; i++) {
 			queue.log(createEvent());

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
----------------------------------------------------------------------
diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
index d4d50f0..136874d 100644
--- a/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
@@ -39,15 +39,13 @@ public class TestConsumer extends AuditDestination {
 	int batchCount = 0;
 	String providerName = getClass().getName();
 	boolean isDown = false;
-	int batchSize = 3;
 
 	List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>();
 
 	/*
 	 * (non-Javadoc)
 	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
+	 * @see org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
 	 * .audit.model.AuditEventBase)
 	 */
 	@Override
@@ -144,32 +142,6 @@ public class TestConsumer extends AuditDestination {
 	public void waitToComplete() {
 	}
 
-	@Override
-	public int getMaxBatchSize() {
-		return batchSize;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-	 */
-	@Override
-	public boolean isFlushPending() {
-		return false;
-	}
-
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
-	 */
-	@Override
-	public long getLastFlushTime() {
-		return 0;
-	}
-
 	/*
 	 * (non-Javadoc)
 	 * 
@@ -207,8 +179,7 @@ public class TestConsumer extends AuditDestination {
 	/*
 	 * (non-Javadoc)
 	 * 
-	 * @see
-	 * org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
+	 * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
 	 */
 	@Override
 	public void waitToComplete(long timeout) {
@@ -225,23 +196,14 @@ public class TestConsumer extends AuditDestination {
 		return providerName;
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
-	 */
-	@Override
-	public boolean isDrain() {
-		return false;
-	}
-
 	// Local methods
 	public AuthzAuditEvent isInSequence() {
-		int lastSeq = -1;
+		long lastSeq = -1;
 		for (AuthzAuditEvent event : eventList) {
 			if (event.getSeqNum() <= lastSeq) {
 				return event;
 			}
+			lastSeq = event.getSeqNum();
 		}
 		return null;
 	}