You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/22 19:23:28 UTC
[11/12] incubator-ranger git commit: RANGER-397 - Implement reliable
streaming audits to configurable destinations - Incorporate Review Feedback
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
index e102d8b..3e1940b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -25,26 +25,25 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
import org.apache.ranger.audit.provider.MiscUtil;
/**
* This is a non-blocking queue with no limit on capacity.
*/
-public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
+public class AuditSummaryQueue extends AuditQueue implements Runnable {
private static final Log logger = LogFactory
.getLog(AuditSummaryQueue.class);
public static final String PROP_SUMMARY_INTERVAL = "summary.interval.ms";
- LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+ LinkedBlockingQueue<AuditEventBase> queue = new LinkedBlockingQueue<AuditEventBase>();
Thread consumerThread = null;
static int threadCount = 0;
@@ -52,15 +51,11 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
private static final int MAX_DRAIN = 100000;
- private int maxSummaryInterval = 5000;
+ private int maxSummaryIntervalMs = 5000;
HashMap<String, AuditSummary> summaryMap = new HashMap<String, AuditSummary>();
- public AuditSummaryQueue() {
- setName(DEFAULT_NAME);
- }
-
- public AuditSummaryQueue(AuditProvider consumer) {
+ public AuditSummaryQueue(AuditHandler consumer) {
super(consumer);
setName(DEFAULT_NAME);
}
@@ -68,9 +63,9 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
@Override
public void init(Properties props, String propPrefix) {
super.init(props, propPrefix);
- maxSummaryInterval = MiscUtil.getIntProperty(props, propPrefix + "."
- + PROP_SUMMARY_INTERVAL, maxSummaryInterval);
- logger.info("maxSummaryInterval=" + maxSummaryInterval + ", name="
+ maxSummaryIntervalMs = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_SUMMARY_INTERVAL, maxSummaryIntervalMs);
+ logger.info("maxSummaryInterval=" + maxSummaryIntervalMs + ", name="
+ getName());
}
@@ -88,7 +83,6 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
return false;
}
queue.add(event);
- addLifeTimeInLogCount(1);
return true;
}
@@ -133,23 +127,10 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
if (consumerThread != null) {
consumerThread.interrupt();
}
- consumerThread = null;
} catch (Throwable t) {
// ignore any exception
}
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
- */
- @Override
- public boolean isFlushPending() {
- if (queue.isEmpty()) {
- return consumer.isFlushPending();
- }
- return true;
+ consumerThread = null;
}
/*
@@ -164,7 +145,7 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
while (true) {
// Time to next dispatch
long nextDispatchDuration = lastDispatchTime
- - System.currentTimeMillis() + maxSummaryInterval;
+ - System.currentTimeMillis() + maxSummaryIntervalMs;
Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
@@ -184,7 +165,7 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
} else {
// poll returned due to timeout, so reseting clock
nextDispatchDuration = lastDispatchTime
- - System.currentTimeMillis() + maxSummaryInterval;
+ - System.currentTimeMillis() + maxSummaryIntervalMs;
lastDispatchTime = System.currentTimeMillis();
}
} catch (InterruptedException e) {
@@ -213,6 +194,9 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
}
if (isDrain() || nextDispatchDuration <= 0) {
+ // Reset time just before sending the logs
+ lastDispatchTime = System.currentTimeMillis();
+
for (Map.Entry<String, AuditSummary> entry : summaryMap
.entrySet()) {
AuditSummary auditSummary = entry.getValue();
@@ -221,9 +205,6 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
- auditSummary.startTime.getTime();
timeDiff = timeDiff > 0 ? timeDiff : 1;
auditSummary.event.setEventDurationMS(timeDiff);
-
- // Reset time just before sending the logs
- lastDispatchTime = System.currentTimeMillis();
boolean ret = consumer.log(auditSummary.event);
if (!ret) {
// We need to drop this event
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
index c2dc955..87c6a8f 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
@@ -23,7 +23,7 @@ import org.apache.log4j.xml.DOMConfigurator;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
import org.apache.ranger.audit.model.EnumRepositoryType;
-import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
import org.apache.ranger.audit.provider.AuditProviderFactory;
import org.apache.commons.logging.LogFactory;
@@ -74,7 +74,7 @@ public class TestEvents {
AuditProviderFactory.getInstance().init(auditProperties, "hdfs");
- AuditProvider provider = AuditProviderFactory.getAuditProvider();
+ AuditHandler provider = AuditProviderFactory.getAuditProvider();
LOG.info("provider=" + provider.toString());
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
----------------------------------------------------------------------
diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
index 45477e2..021c49a 100644
--- a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
@@ -32,14 +32,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.destination.FileAuditDestination;
import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
import org.apache.ranger.audit.provider.AuditProviderFactory;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.audit.provider.MultiDestAuditProvider;
import org.apache.ranger.audit.queue.AuditAsyncQueue;
import org.apache.ranger.audit.queue.AuditBatchQueue;
import org.apache.ranger.audit.queue.AuditFileSpool;
+import org.apache.ranger.audit.queue.AuditQueue;
import org.apache.ranger.audit.queue.AuditSummaryQueue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -93,9 +94,9 @@ public class TestAuditQueue {
AuditSummaryQueue queue = new AuditSummaryQueue(testConsumer);
Properties props = new Properties();
- props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
+ props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "."
+ AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 300);
- queue.init(props, BaseAuditProvider.PROP_DEFAULT_PREFIX);
+ queue.init(props, BaseAuditHandler.PROP_DEFAULT_PREFIX);
queue.start();
@@ -103,7 +104,7 @@ public class TestAuditQueue {
}
private void commonTestSummary(TestConsumer testConsumer,
- BaseAuditProvider queue) {
+ BaseAuditHandler queue) {
int messageToSend = 0;
int pauseMS = 330;
@@ -171,7 +172,6 @@ public class TestAuditQueue {
}
assertEquals(messageToSend, testConsumer.getSumTotal());
assertEquals(countToCheck, testConsumer.getCountTotal());
- assertNull("Event not in sequnce", testConsumer.isInSequence());
}
@Test
@@ -182,22 +182,23 @@ public class TestAuditQueue {
// Destination
String propPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".test";
props.put(propPrefix, "enable");
- props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "summary" + "."
+ props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "summary" + "."
+ "enabled", "true");
- props.put(propPrefix + "." + BaseAuditProvider.PROP_NAME, "test");
- props.put(propPrefix + "." + BaseAuditProvider.PROP_QUEUE, "none");
+ props.put(propPrefix + "." + BaseAuditHandler.PROP_NAME, "test");
+ props.put(propPrefix + "." + AuditQueue.PROP_QUEUE, "none");
- props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
+ props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "."
+ AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 300);
- props.put(propPrefix + "." + BaseAuditProvider.PROP_CLASS_NAME,
+ props.put(propPrefix + "." + BaseAuditHandler.PROP_CLASS_NAME,
TestConsumer.class.getName());
AuditProviderFactory factory = AuditProviderFactory.getInstance();
factory.init(props, "test");
- BaseAuditProvider queue = (BaseAuditProvider) factory.getProvider();
- BaseAuditProvider consumer = (BaseAuditProvider) queue.getConsumer();
- while (consumer.getConsumer() != null) {
- consumer = (BaseAuditProvider) consumer.getConsumer();
+ AuditQueue queue = (AuditQueue) factory.getProvider();
+ BaseAuditHandler consumer = (BaseAuditHandler) queue.getConsumer();
+ while (consumer != null && consumer instanceof AuditQueue) {
+ AuditQueue cQueue = (AuditQueue) consumer;
+ consumer = (BaseAuditHandler) cQueue.getConsumer();
}
assertTrue("Consumer should be TestConsumer. class="
+ consumer.getClass().getName(),
@@ -257,12 +258,12 @@ public class TestAuditQueue {
int queueSize = messageToSend * 2;
int intervalMS = messageToSend * 100; // Deliberately big interval
Properties props = new Properties();
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
+ batchSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
+ queueSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
- "" + intervalMS);
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+ + intervalMS);
TestConsumer testConsumer = new TestConsumer();
AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
@@ -308,12 +309,12 @@ public class TestAuditQueue {
int expectedBatchSize = (messageToSend * pauseMS) / intervalMS + 1;
Properties props = new Properties();
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
+ batchSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
+ queueSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
- "" + intervalMS);
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+ + intervalMS);
TestConsumer testConsumer = new TestConsumer();
AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
@@ -356,15 +357,15 @@ public class TestAuditQueue {
int queueSize = messageToSend * 2;
int intervalMS = Integer.MAX_VALUE; // Deliberately big interval
Properties props = new Properties();
- props.put(basePropName + "." + BaseAuditProvider.PROP_NAME,
+ props.put(basePropName + "." + BaseAuditHandler.PROP_NAME,
"testAuditBatchQueueDestDown");
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
+ batchSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
+ queueSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
- "" + intervalMS);
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+ + intervalMS);
// Enable File Spooling
props.put(basePropName + "." + "filespool.enable", "" + true);
@@ -410,21 +411,20 @@ public class TestAuditQueue {
int intervalMS = 3000; // Deliberately big interval
Properties props = new Properties();
props.put(
- basePropName + "." + BaseAuditProvider.PROP_NAME,
+ basePropName + "." + BaseAuditHandler.PROP_NAME,
"testAuditBatchQueueDestDownFlipFlop_"
+ MiscUtil.generateUniqueId());
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
+ batchSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
+ queueSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
- "" + intervalMS);
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+ + intervalMS);
// Enable File Spooling
int destRetryMS = 10;
- props.put(
- basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+ props.put(basePropName + "." + AuditQueue.PROP_FILE_SPOOL_ENABLE,
"" + true);
props.put(
basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
@@ -499,21 +499,20 @@ public class TestAuditQueue {
int maxArchivedFiles = 1;
Properties props = new Properties();
props.put(
- basePropName + "." + BaseAuditProvider.PROP_NAME,
+ basePropName + "." + BaseAuditHandler.PROP_NAME,
"testAuditBatchQueueDestDownRestart_"
+ MiscUtil.generateUniqueId());
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, ""
+ batchSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, ""
+ queueSize);
- props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
- "" + intervalMS);
+ props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, ""
+ + intervalMS);
// Enable File Spooling
int destRetryMS = 10;
- props.put(
- basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+ props.put(basePropName + "." + AuditQueue.PROP_FILE_SPOOL_ENABLE,
"" + true);
props.put(
basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
@@ -598,7 +597,7 @@ public class TestAuditQueue {
// Destination
String filePropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".file";
props.put(filePropPrefix, "enable");
- props.put(filePropPrefix + "." + BaseAuditProvider.PROP_NAME, "file");
+ props.put(filePropPrefix + "." + AuditQueue.PROP_NAME, "file");
props.put(filePropPrefix + "."
+ FileAuditDestination.PROP_FILE_LOCAL_DIR, logFolderName);
props.put(filePropPrefix + "."
@@ -607,21 +606,20 @@ public class TestAuditQueue {
props.put(filePropPrefix + "."
+ FileAuditDestination.PROP_FILE_FILE_ROLLOVER, "" + 10);
- props.put(filePropPrefix + "." + BaseAuditProvider.PROP_QUEUE, "batch");
+ props.put(filePropPrefix + "." + AuditQueue.PROP_QUEUE, "batch");
String batchPropPrefix = filePropPrefix + "." + "batch";
- props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ props.put(batchPropPrefix + "." + AuditQueue.PROP_BATCH_SIZE, ""
+ batchSize);
- props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ props.put(batchPropPrefix + "." + AuditQueue.PROP_QUEUE_SIZE, ""
+ queueSize);
- props.put(
- batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+ props.put(batchPropPrefix + "." + AuditQueue.PROP_BATCH_INTERVAL,
"" + intervalMS);
// Enable File Spooling
int destRetryMS = 10;
props.put(batchPropPrefix + "."
- + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, "" + true);
+ + AuditQueue.PROP_FILE_SPOOL_ENABLE, "" + true);
props.put(batchPropPrefix + "."
+ AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, "target");
props.put(batchPropPrefix + "."
@@ -638,7 +636,7 @@ public class TestAuditQueue {
// queue.init(props, batchPropPrefix);
// queue.start();
- AuditProvider queue = factory.getProvider();
+ AuditHandler queue = factory.getProvider();
for (int i = 0; i < messageToSend; i++) {
queue.log(createEvent());
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
----------------------------------------------------------------------
diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
index d4d50f0..136874d 100644
--- a/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
@@ -39,15 +39,13 @@ public class TestConsumer extends AuditDestination {
int batchCount = 0;
String providerName = getClass().getName();
boolean isDown = false;
- int batchSize = 3;
List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>();
/*
* (non-Javadoc)
*
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
+ * @see org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
* .audit.model.AuditEventBase)
*/
@Override
@@ -144,32 +142,6 @@ public class TestConsumer extends AuditDestination {
public void waitToComplete() {
}
- @Override
- public int getMaxBatchSize() {
- return batchSize;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
- */
- @Override
- public boolean isFlushPending() {
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
- */
- @Override
- public long getLastFlushTime() {
- return 0;
- }
-
/*
* (non-Javadoc)
*
@@ -207,8 +179,7 @@ public class TestConsumer extends AuditDestination {
/*
* (non-Javadoc)
*
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
+ * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
*/
@Override
public void waitToComplete(long timeout) {
@@ -225,23 +196,14 @@ public class TestConsumer extends AuditDestination {
return providerName;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
- */
- @Override
- public boolean isDrain() {
- return false;
- }
-
// Local methods
public AuthzAuditEvent isInSequence() {
- int lastSeq = -1;
+ long lastSeq = -1;
for (AuthzAuditEvent event : eventList) {
if (event.getSeqNum() <= lastSeq) {
return event;
}
+ lastSeq = event.getSeqNum();
}
return null;
}