You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/20 00:11:55 UTC
[1/4] incubator-ranger git commit: RANGER-397 - Implement reliable
streaming audits to configurable destinations
Repository: incubator-ranger
Updated Branches:
refs/heads/master 4a20a9dd9 -> 44f403ead
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
----------------------------------------------------------------------
diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
new file mode 100644
index 0000000..a023b9a
--- /dev/null
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.AuditAsyncQueue;
+import org.apache.ranger.audit.provider.AuditBatchProcessor;
+import org.apache.ranger.audit.provider.AuditDestination;
+import org.apache.ranger.audit.provider.AuditFileSpool;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditProviderFactory;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.FileAuditDestination;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.provider.MultiDestAuditProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAuditProcessor {
+
+ private static final Log logger = LogFactory
+ .getLog(TestAuditProcessor.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ static private int seqNum = 0;
+
+ @Test
+ public void testAuditAsyncQueue() {
+ logger.debug("testAuditAsyncQueue()...");
+ TestConsumer testConsumer = new TestConsumer();
+ AuditAsyncQueue queue = new AuditAsyncQueue(testConsumer);
+ Properties props = new Properties();
+ queue.init(props);
+
+ queue.start();
+
+ int messageToSend = 10;
+ for (int i = 0; i < messageToSend; i++) {
+ queue.log(createEvent());
+ }
+ queue.stop();
+ queue.waitToComplete();
+ // Let's wait for second
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ assertEquals(messageToSend, testConsumer.getCountTotal());
+ assertEquals(messageToSend, testConsumer.getSumTotal());
+ assertNull("Event not in sequnce", testConsumer.isInSequence());
+ }
+
+ @Test
+ public void testMultipleQueue() {
+ logger.debug("testAuditAsyncQueue()...");
+ int destCount = 3;
+ TestConsumer[] testConsumer = new TestConsumer[destCount];
+
+ MultiDestAuditProvider multiQueue = new MultiDestAuditProvider();
+ for (int i = 0; i < destCount; i++) {
+ testConsumer[i] = new TestConsumer();
+ multiQueue.addAuditProvider(testConsumer[i]);
+ }
+
+ AuditAsyncQueue queue = new AuditAsyncQueue(multiQueue);
+ Properties props = new Properties();
+ queue.init(props);
+ queue.start();
+
+ int messageToSend = 10;
+ for (int i = 0; i < messageToSend; i++) {
+ queue.log(createEvent());
+ }
+ queue.stop();
+ queue.waitToComplete();
+ // Let's wait for second
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ for (int i = 0; i < destCount; i++) {
+ assertEquals("consumer" + i, messageToSend,
+ testConsumer[i].getCountTotal());
+ assertEquals("consumer" + i, messageToSend,
+ testConsumer[i].getSumTotal());
+
+ }
+ }
+
+ @Test
+ public void testAuditBatchProcessorBySize() {
+ logger.debug("testAuditBatchProcessor()...");
+ int messageToSend = 10;
+
+ String basePropName = "ranger.test.batch";
+ int batchSize = messageToSend / 3;
+ int expectedBatchSize = batchSize
+ + (batchSize * 3 < messageToSend ? 1 : 0);
+ int queueSize = messageToSend * 2;
+ int intervalMS = messageToSend * 100; // Deliberately big interval
+ Properties props = new Properties();
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ + batchSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ + queueSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+ "" + intervalMS);
+
+ TestConsumer testConsumer = new TestConsumer();
+ AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+ queue.init(props, basePropName);
+ queue.start();
+
+ for (int i = 0; i < messageToSend; i++) {
+ queue.log(createEvent());
+
+ }
+ // Let's wait for second
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ queue.waitToComplete();
+ queue.stop();
+ queue.waitToComplete();
+
+ assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+ assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+ assertEquals("Total batch", expectedBatchSize,
+ testConsumer.getBatchCount());
+ assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+ }
+
+ @Test
+ public void testAuditBatchProcessorByTime() {
+ logger.debug("testAuditBatchProcessor()...");
+
+ int messageToSend = 10;
+
+ String basePropName = "ranger.test.batch";
+ int batchSize = messageToSend * 2; // Deliberately big size
+ int queueSize = messageToSend * 2;
+ int intervalMS = (1000 / messageToSend) * 3; // e.g (1000/10 * 3) = 300
+ // ms
+ int pauseMS = 1000 / messageToSend + 3; // e.g. 1000/10 -5 = 95ms
+ int expectedBatchSize = (messageToSend * pauseMS) / intervalMS + 1;
+
+ Properties props = new Properties();
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ + batchSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ + queueSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+ "" + intervalMS);
+
+ TestConsumer testConsumer = new TestConsumer();
+ AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+ queue.init(props, basePropName);
+ queue.start();
+
+ for (int i = 0; i < messageToSend; i++) {
+ queue.log(createEvent());
+ try {
+ Thread.sleep(pauseMS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ // Let's wait for second
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ queue.waitToComplete();
+ queue.stop();
+ queue.waitToComplete();
+
+ assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+ assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+ assertEquals("Total batch", expectedBatchSize,
+ testConsumer.getBatchCount());
+ assertNull("Event not in sequnce", testConsumer.isInSequence());
+ }
+
+ @Test
+ public void testAuditBatchProcessorDestDown() {
+ logger.debug("testAuditBatchProcessorDestDown()...");
+ int messageToSend = 10;
+
+ String basePropName = "ranger.test.batch";
+ int batchSize = messageToSend / 3;
+ int queueSize = messageToSend * 2;
+ int intervalMS = Integer.MAX_VALUE; // Deliberately big interval
+ Properties props = new Properties();
+ props.put(basePropName + "." + BaseAuditProvider.PROP_NAME,
+ "testAuditBatchProcessorDestDown");
+
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ + batchSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ + queueSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+ "" + intervalMS);
+
+ // Enable File Spooling
+ props.put(basePropName + "." + "filespool.enable", "" + true);
+ props.put(basePropName + "." + "filespool.dir", "target");
+
+ TestConsumer testConsumer = new TestConsumer();
+ testConsumer.isDown = true;
+
+ AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+ queue.init(props, basePropName);
+ queue.start();
+
+ for (int i = 0; i < messageToSend; i++) {
+ queue.log(createEvent());
+
+ }
+ // Let's wait for second
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ queue.waitToComplete(5000);
+ queue.stop();
+ queue.waitToComplete();
+
+ assertEquals("Total count", 0, testConsumer.getCountTotal());
+ assertEquals("Total sum", 0, testConsumer.getSumTotal());
+ assertEquals("Total batch", 0, testConsumer.getBatchCount());
+ assertNull("Event not in sequnce", testConsumer.isInSequence());
+ }
+
+ //@Test
+ public void testAuditBatchProcessorDestDownFlipFlop() {
+ logger.debug("testAuditBatchProcessorDestDown()...");
+ int messageToSend = 10;
+
+ String basePropName = "ranger.test.batch";
+ int batchSize = messageToSend / 3;
+ int expectedBatchSize = batchSize
+ + (batchSize * 3 < messageToSend ? 1 : 0);
+ int queueSize = messageToSend * 2;
+ int intervalMS = 3000; // Deliberately big interval
+ Properties props = new Properties();
+ props.put(
+ basePropName + "." + BaseAuditProvider.PROP_NAME,
+ "testAuditBatchProcessorDestDownFlipFlop_"
+ + MiscUtil.generateUniqueId());
+
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ + batchSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ + queueSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+ "" + intervalMS);
+
+ // Enable File Spooling
+ int destRetryMS = 10;
+ props.put(
+ basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+ "" + true);
+ props.put(
+ basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
+ "target");
+ props.put(basePropName + "."
+ + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, ""
+ + destRetryMS);
+
+ TestConsumer testConsumer = new TestConsumer();
+ testConsumer.isDown = false;
+
+ AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+ queue.init(props, basePropName);
+ queue.start();
+
+ try {
+ queue.log(createEvent());
+ queue.log(createEvent());
+ queue.log(createEvent());
+ Thread.sleep(1000);
+ testConsumer.isDown = true;
+ Thread.sleep(1000);
+ queue.log(createEvent());
+ queue.log(createEvent());
+ queue.log(createEvent());
+ Thread.sleep(1000);
+ testConsumer.isDown = false;
+ Thread.sleep(1000);
+ queue.log(createEvent());
+ queue.log(createEvent());
+ queue.log(createEvent());
+ Thread.sleep(1000);
+ testConsumer.isDown = true;
+ Thread.sleep(1000);
+ queue.log(createEvent());
+ Thread.sleep(1000);
+ testConsumer.isDown = false;
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // Let's wait for second
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ queue.waitToComplete(5000);
+ queue.stop();
+ queue.waitToComplete();
+
+ assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+ assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+ assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+ }
+
+ /**
+ * See if we recover after restart
+ */
+ public void testAuditBatchProcessorDestDownRestart() {
+ logger.debug("testAuditBatchProcessorDestDownRestart()...");
+ int messageToSend = 10;
+
+ String basePropName = "ranger.test.batch";
+ int batchSize = messageToSend / 3;
+ int queueSize = messageToSend * 2;
+ int intervalMS = 3000; // Deliberately big interval
+ int maxArchivedFiles = 1;
+ Properties props = new Properties();
+ props.put(
+ basePropName + "." + BaseAuditProvider.PROP_NAME,
+ "testAuditBatchProcessorDestDownRestart_"
+ + MiscUtil.generateUniqueId());
+
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ + batchSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ + queueSize);
+ props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+ "" + intervalMS);
+
+ // Enable File Spooling
+ int destRetryMS = 10;
+ props.put(
+ basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+ "" + true);
+ props.put(
+ basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
+ "target");
+ props.put(basePropName + "."
+ + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, ""
+ + destRetryMS);
+ props.put(basePropName + "."
+ + AuditFileSpool.PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, ""
+ + maxArchivedFiles);
+
+ TestConsumer testConsumer = new TestConsumer();
+ testConsumer.isDown = true;
+
+ AuditBatchProcessor queue = new AuditBatchProcessor(testConsumer);
+ queue.init(props, basePropName);
+ queue.start();
+
+ for (int i = 0; i < messageToSend; i++) {
+ queue.log(createEvent());
+
+ }
+ // Let's wait for second or two
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ queue.waitToComplete(5000);
+ queue.stop();
+ queue.waitToComplete();
+
+ testConsumer.isDown = true;
+
+ // Let's wait for second or two
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+
+ // Let's now recreate the objects
+ testConsumer = new TestConsumer();
+
+ queue = new AuditBatchProcessor(testConsumer);
+ queue.init(props, basePropName);
+ queue.start();
+
+ // Let's wait for second
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ queue.waitToComplete(5000);
+ queue.stop();
+ queue.waitToComplete();
+
+ assertEquals("Total count", messageToSend, testConsumer.getCountTotal());
+ assertEquals("Total sum", messageToSend, testConsumer.getSumTotal());
+ assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+ }
+
+ @Test
+ public void testFileDestination() {
+ logger.debug("testFileDestination()...");
+
+ int messageToSend = 10;
+ int batchSize = messageToSend / 3;
+ int queueSize = messageToSend * 2;
+ int intervalMS = 500; // Should be less than final sleep time
+
+ String logFolderName = "target/testFileDestination";
+ File logFolder = new File(logFolderName);
+ String logFileName = "test_ranger_audit.log";
+ File logFile = new File(logFolder, logFileName);
+
+ Properties props = new Properties();
+ // Destination
+ String filePropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".file";
+ props.put(filePropPrefix, "enable");
+ props.put(filePropPrefix + "." + BaseAuditProvider.PROP_NAME, "file");
+ props.put(filePropPrefix + "."
+ + FileAuditDestination.PROP_FILE_LOCAL_DIR, logFolderName);
+ props.put(filePropPrefix + "."
+ + FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT,
+ "%app-type%_ranger_audit.log");
+ props.put(filePropPrefix + "."
+ + FileAuditDestination.PROP_FILE_FILE_ROLLOVER, "" + 10);
+
+ props.put(filePropPrefix + "." + BaseAuditProvider.PROP_QUEUE, "batch");
+ String batchPropPrefix = filePropPrefix + "." + "batch";
+
+ props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_SIZE, ""
+ + batchSize);
+ props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_QUEUE_SIZE, ""
+ + queueSize);
+ props.put(
+ batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_INTERVAL,
+ "" + intervalMS);
+
+ // Enable File Spooling
+ int destRetryMS = 10;
+ props.put(batchPropPrefix + "."
+ + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, "" + true);
+ props.put(batchPropPrefix + "."
+ + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, "target");
+ props.put(batchPropPrefix + "."
+ + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, ""
+ + destRetryMS);
+
+ AuditProviderFactory factory = AuditProviderFactory.getInstance();
+ factory.init(props, "test");
+
+ // FileAuditDestination fileDest = new FileAuditDestination();
+ // fileDest.init(props, filePropPrefix);
+ //
+ // AuditBatchProcessor queue = new AuditBatchProcessor(fileDest);
+ // queue.init(props, batchPropPrefix);
+ // queue.start();
+
+ AuditProvider queue = factory.getProvider();
+
+ for (int i = 0; i < messageToSend; i++) {
+ queue.log(createEvent());
+ }
+ // Let's wait for second
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ queue.waitToComplete();
+ queue.stop();
+ queue.waitToComplete();
+
+ assertTrue("File created", logFile.exists());
+ try {
+ List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>();
+ int totalSum = 0;
+ BufferedReader br = new BufferedReader(new FileReader(logFile));
+ String line;
+ int lastSeq = -1;
+ boolean outOfSeq = false;
+ while ((line = br.readLine()) != null) {
+ AuthzAuditEvent event = MiscUtil.fromJson(line,
+ AuthzAuditEvent.class);
+ eventList.add(event);
+ totalSum += event.getFrequencyCount();
+ if (event.getSeqNum() <= lastSeq) {
+ outOfSeq = true;
+ }
+ }
+ br.close();
+ assertEquals("Total count", messageToSend, eventList.size());
+ assertEquals("Total sum", messageToSend, totalSum);
+ assertFalse("Event not in sequnce", outOfSeq);
+
+ } catch (Throwable e) {
+ logger.error("Error opening file for reading.", e);
+ assertTrue("Error reading file. fileName=" + logFile + ", error="
+ + e.toString(), true);
+ }
+
+ }
+
+ private AuthzAuditEvent createEvent() {
+ AuthzAuditEvent event = new AuthzAuditEvent();
+ event.setSeqNum(++seqNum);
+ return event;
+ }
+
+ class TestConsumer extends AuditDestination {
+
+ int countTotal = 0;
+ int sumTotal = 0;
+ int batchCount = 0;
+ String providerName = getClass().getName();
+ boolean isDown = false;
+ int batchSize = 3;
+
+ List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
+ * .audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ if (isDown) {
+ return false;
+ }
+ countTotal++;
+ if (event instanceof AuthzAuditEvent) {
+ AuthzAuditEvent azEvent = (AuthzAuditEvent) event;
+ sumTotal += azEvent.getFrequencyCount();
+ logger.info("EVENT:" + event);
+ eventList.add(azEvent);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ if (isDown) {
+ return false;
+ }
+ batchCount++;
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(String jsonStr) {
+ if (isDown) {
+ return false;
+ }
+ countTotal++;
+ AuthzAuditEvent event = MiscUtil.fromJson(jsonStr,
+ AuthzAuditEvent.class);
+ sumTotal += event.getFrequencyCount();
+ logger.info("JSON:" + jsonStr);
+ eventList.add(event);
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ if (isDown) {
+ return false;
+ }
+ for (String event : events) {
+ logJSON(event);
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
+ * )
+ */
+ @Override
+ public void init(Properties prop) {
+ // Nothing to do here
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ public void start() {
+ // Nothing to do here
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+ */
+ @Override
+ public void stop() {
+ // Nothing to do here
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+ */
+ @Override
+ public void waitToComplete() {
+ }
+
+ @Override
+ public int getMaxBatchSize() {
+ return batchSize;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ */
+ @Override
+ public boolean isFlushPending() {
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
+ */
+ @Override
+ public long getLastFlushTime() {
+ return 0;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+ */
+ @Override
+ public void flush() {
+ // Nothing to do here
+ }
+
+ public int getCountTotal() {
+ return countTotal;
+ }
+
+ public int getSumTotal() {
+ return sumTotal;
+ }
+
+ public int getBatchCount() {
+ return batchCount;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
+ * , java.lang.String)
+ */
+ @Override
+ public void init(Properties prop, String basePropertyName) {
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
+ */
+ @Override
+ public void waitToComplete(long timeout) {
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#getName()
+ */
+ @Override
+ public String getName() {
+ return providerName;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
+ */
+ @Override
+ public boolean isDrain() {
+ return false;
+ }
+
+ // Local methods
+ public AuthzAuditEvent isInSequence() {
+ int lastSeq = -1;
+ for (AuthzAuditEvent event : eventList) {
+ if (event.getSeqNum() <= lastSeq) {
+ return event;
+ }
+ }
+ return null;
+ }
+ }
+}
[3/4] incubator-ranger git commit: RANGER-397 - Implement reliable
streaming audits to configurable destinations
Posted by bo...@apache.org.
RANGER-397 - Implement reliable streaming audits to configurable
destinations
First cut with HDFS and File destination working
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/eb1e9b5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/eb1e9b5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/eb1e9b5c
Branch: refs/heads/master
Commit: eb1e9b5c447dc3960d132bd2dc5adfbb33d5cb2c
Parents: 917833c
Author: Don Bosco Durai <bo...@apache.org>
Authored: Tue Apr 14 18:00:46 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Tue Apr 14 18:02:57 2015 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../ranger/audit/model/AuditEventBase.java | 14 +-
.../ranger/audit/model/AuthzAuditEvent.java | 41 +-
.../audit/provider/AsyncAuditProvider.java | 5 +-
.../ranger/audit/provider/AuditAsyncQueue.java | 167 ++++
.../audit/provider/AuditBatchProcessor.java | 327 +++++++
.../ranger/audit/provider/AuditDestination.java | 70 ++
.../ranger/audit/provider/AuditFileSpool.java | 875 +++++++++++++++++++
.../audit/provider/AuditMessageException.java | 67 ++
.../ranger/audit/provider/AuditProvider.java | 22 +-
.../audit/provider/AuditProviderFactory.java | 386 +++++---
.../audit/provider/BaseAuditProvider.java | 400 +++++++--
.../audit/provider/BufferedAuditProvider.java | 32 +-
.../ranger/audit/provider/DbAuditProvider.java | 47 +-
.../audit/provider/DummyAuditProvider.java | 76 +-
.../audit/provider/FileAuditDestination.java | 230 +++++
.../audit/provider/HDFSAuditDestination.java | 243 +++++
.../audit/provider/LocalFileLogBuffer.java | 17 +-
.../audit/provider/Log4jAuditProvider.java | 57 +-
.../ranger/audit/provider/LogDestination.java | 16 +-
.../apache/ranger/audit/provider/MiscUtil.java | 291 ++++--
.../audit/provider/MultiDestAuditProvider.java | 153 +++-
.../audit/provider/hdfs/HdfsAuditProvider.java | 3 +-
.../audit/provider/hdfs/HdfsLogDestination.java | 48 +-
.../provider/kafka/KafkaAuditProvider.java | 46 +-
.../audit/provider/solr/SolrAuditProvider.java | 52 +-
security-admin/.gitignore | 3 +
security-admin/pom.xml | 6 +-
.../apache/ranger/audit/TestAuditProcessor.java | 786 +++++++++++++++++
29 files changed, 4081 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 2c746ed..dd4e2c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
*.class
*.iml
+.pydevproject
.settings/
.metadata
.classpath
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
index 82fcab8..a44e047 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
@@ -17,24 +17,26 @@
* under the License.
*/
- package org.apache.ranger.audit.model;
+package org.apache.ranger.audit.model;
import org.apache.ranger.audit.dao.DaoManager;
-
public abstract class AuditEventBase {
+
protected AuditEventBase() {
}
public abstract void persist(DaoManager daoManager);
-
+
protected String trim(String str, int len) {
- String ret = str ;
+ String ret = str;
if (str != null) {
if (str.length() > len) {
- ret = str.substring(0,len) ;
+ ret = str.substring(0, len);
}
}
- return ret ;
+ return ret;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
index d0c1526..af89f60 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
@@ -24,6 +24,7 @@ import java.util.Date;
import org.apache.ranger.audit.dao.DaoManager;
import org.apache.ranger.audit.entity.AuthzAuditEventDbObj;
+import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
@@ -33,7 +34,6 @@ public class AuthzAuditEvent extends AuditEventBase {
protected static final int MAX_ACTION_FIELD_SIZE = 1800 ;
protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800 ;
-
@SerializedName("repoType")
protected int repositoryType = 0;
@@ -94,6 +94,17 @@ public class AuthzAuditEvent extends AuditEventBase {
@SerializedName("id")
protected String eventId = null;
+ /**
+ * This to ensure order within a session. Order not guaranteed across processes and hosts
+ */
+ @SerializedName("seq_num")
+ protected long seqNum = 0;
+
+ @SerializedName("freq_count")
+ protected long frequencyCount = 1;
+
+ @SerializedName("freq_dur_ms")
+ protected long frequencyDurationMS = 0;
public AuthzAuditEvent() {
super();
@@ -400,6 +411,31 @@ public class AuthzAuditEvent extends AuditEventBase {
}
+
+ public long getSeqNum() {
+ return seqNum;
+ }
+
+ public void setSeqNum(long seqNum) {
+ this.seqNum = seqNum;
+ }
+
+ public long getFrequencyCount() {
+ return frequencyCount;
+ }
+
+ public void setFrequencyCount(long frequencyCount) {
+ this.frequencyCount = frequencyCount;
+ }
+
+ public long getFrequencyDurationMS() {
+ return frequencyDurationMS;
+ }
+
+ public void setFrequencyDurationMS(long frequencyDurationMS) {
+ this.frequencyDurationMS = frequencyDurationMS;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -432,6 +468,9 @@ public class AuthzAuditEvent extends AuditEventBase {
.append("agentHostname=").append(agentHostname).append(FIELD_SEPARATOR)
.append("logType=").append(logType).append(FIELD_SEPARATOR)
.append("eventId=").append(eventId).append(FIELD_SEPARATOR)
+ .append("seq_num=").append(seqNum).append(FIELD_SEPARATOR)
+ .append("freq_count=").append(frequencyCount).append(FIELD_SEPARATOR)
+ .append("freq_dur_ms=").append(frequencyDurationMS).append(FIELD_SEPARATOR)
;
return sb;
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index 5da5064..53adc86 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -90,10 +90,11 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
}
@Override
- public void log(AuditEventBase event) {
+ public boolean log(AuditEventBase event) {
LOG.debug("AsyncAuditProvider.logEvent(AuditEventBase)");
queueEvent(event);
+ return true;
}
@Override
@@ -230,7 +231,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
return mQueue.isEmpty();
}
- private void waitToComplete(long maxWaitSeconds) {
+ public void waitToComplete(long maxWaitSeconds) {
LOG.debug("==> AsyncAuditProvider.waitToComplete()");
for (long waitTime = 0; !isEmpty()
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
new file mode 100644
index 0000000..5553bcc
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedTransferQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+/**
+ * This is a non-blocking queue with no limit on capacity.
+ */
+public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+ private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class);
+
+ LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+ Thread consumerThread = null;
+
+ static int threadCount = 0;
+ static final String DEFAULT_NAME = "async";
+
+ public AuditAsyncQueue() {
+ setName(DEFAULT_NAME);
+ }
+
+ public AuditAsyncQueue(AuditProvider consumer) {
+ super(consumer);
+ setName(DEFAULT_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+ * audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ // Add to the queue and return ASAP
+ if (queue.size() >= getMaxQueueSize()) {
+ return false;
+ }
+ queue.add(event);
+ addLifeTimeInLogCount(1);
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ public void start() {
+ if(consumer != null) {
+ consumer.start();
+ }
+
+ consumerThread = new Thread(this, this.getClass().getName()
+ + (threadCount++));
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+ */
+ @Override
+ public void stop() {
+ setDrain(true);
+ try {
+ consumerThread.interrupt();
+ } catch (Throwable t) {
+ // ignore any exception
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ */
+ @Override
+ public boolean isFlushPending() {
+ if (queue.isEmpty()) {
+ return consumer.isFlushPending();
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ AuditEventBase event = null;
+ if (!isDrain()) {
+ // For Transfer queue take() is blocking
+ event = queue.take();
+ } else {
+ // For Transfer queue poll() is non blocking
+ event = queue.poll();
+ }
+ if (event != null) {
+ Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+ eventList.add(event);
+ // TODO: Put a limit. Hard coding to 1000 (use batch size
+ // property)
+ queue.drainTo(eventList, 1000 - 1);
+ consumer.log(eventList);
+ eventList.clear();
+ }
+ } catch (InterruptedException e) {
+ logger.info(
+ "Caught exception in consumer thread. Mostly to about loop",
+ e);
+ } catch (Throwable t) {
+ logger.error("Caught error during processing request.", t);
+ }
+ if (isDrain() && queue.isEmpty()) {
+ break;
+ }
+ }
+ try {
+ // Call stop on the consumer
+ consumer.stop();
+ } catch (Throwable t) {
+ logger.error("Error while calling stop on consumer.", t);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
new file mode 100644
index 0000000..58d122a
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+public class AuditBatchProcessor extends BaseAuditProvider implements Runnable {
+ private static final Log logger = LogFactory
+ .getLog(AuditBatchProcessor.class);
+
+ private BlockingQueue<AuditEventBase> queue = null;
+ private Collection<AuditEventBase> localBatchBuffer = new ArrayList<AuditEventBase>();
+
+ Thread consumerThread = null;
+ static int threadCount = 0;
+
+ public AuditBatchProcessor() {
+ }
+
+ public AuditBatchProcessor(AuditProvider consumer) {
+ super(consumer);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+ * audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ // Add to batchQueue. Block if full
+ queue.add(event);
+ addLifeTimeInLogCount(1);
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ @Override
+ public void init(Properties prop, String basePropertyName) {
+ String propPrefix = "xasecure.audit.batch";
+ if (basePropertyName != null) {
+ propPrefix = basePropertyName;
+ }
+
+ super.init(prop, propPrefix);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ synchronized public void start() {
+ if (consumerThread != null) {
+ logger.error("Provider is already started. name=" + getName());
+ return;
+ }
+ logger.info("Creating ArrayBlockingQueue with maxSize="
+ + getMaxQueueSize());
+ queue = new ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize());
+
+ // Start the consumer first
+ consumer.start();
+
+ // Then the FileSpooler
+ if (fileSpoolerEnabled) {
+ fileSpooler.start();
+ }
+
+ // Finally the queue listener
+ consumerThread = new Thread(this, this.getClass().getName()
+ + (threadCount++));
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+ */
+ @Override
+ public void stop() {
+ setDrain(true);
+ flush();
+ try {
+ consumerThread.interrupt();
+ } catch (Throwable t) {
+ // ignore any exception
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+ */
+ @Override
+ public void waitToComplete() {
+ int defaultTimeOut = -1;
+ waitToComplete(defaultTimeOut);
+ consumer.waitToComplete(defaultTimeOut);
+ }
+
+ @Override
+ public void waitToComplete(long timeout) {
+ setDrain(true);
+ flush();
+ long sleepTime = 1000;
+ long startTime = System.currentTimeMillis();
+ int prevQueueSize = -1;
+ int staticLoopCount = 0;
+ while ((queue.size() > 0 || localBatchBuffer.size() > 0)) {
+ if (prevQueueSize == queue.size()) {
+ logger.error("Queue size is not changing. " + getName()
+ + ".size=" + queue.size());
+ staticLoopCount++;
+ if (staticLoopCount > 5) {
+ logger.error("Aborting writing to consumer. Some logs will be discarded."
+ + getName() + ".size=" + queue.size());
+ }
+ } else {
+ staticLoopCount = 0;
+ }
+ consumerThread.interrupt();
+ try {
+ Thread.sleep(sleepTime);
+ if (timeout > 0
+ && (System.currentTimeMillis() - startTime > timeout)) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ consumer.waitToComplete(timeout);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ */
+ @Override
+ public boolean isFlushPending() {
+ if (queue.isEmpty()) {
+ return consumer.isFlushPending();
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+ */
+ @Override
+ public void flush() {
+ if (fileSpoolerEnabled) {
+ fileSpooler.flush();
+ }
+ consumer.flush();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ long lastDispatchTime = System.currentTimeMillis();
+ boolean isDestActive = true;
+ while (true) {
+ // Time to next dispatch
+ long nextDispatchDuration = lastDispatchTime
+ - System.currentTimeMillis() + getMaxBatchInterval();
+
+ boolean isToSpool = false;
+ boolean fileSpoolDrain = false;
+ try {
+ if (fileSpoolerEnabled && fileSpooler.isPending()) {
+ int percentUsed = (getMaxQueueSize() - queue.size()) * 100
+ / getMaxQueueSize();
+ long lastAttemptDelta = fileSpooler
+ .getLastAttemptTimeDelta();
+
+ fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime;
+ // If we should even read from queue?
+ if (!isDrain() && !fileSpoolDrain
+ && percentUsed < fileSpoolDrainThresholdPercent) {
+ // Since some files are still under progress and it is
+ // not in drain mode, lets wait and retry
+ if (nextDispatchDuration > 0) {
+ Thread.sleep(nextDispatchDuration);
+ }
+ continue;
+ }
+ isToSpool = true;
+ }
+
+ AuditEventBase event = null;
+
+ if (!isToSpool && !isDrain() && !fileSpoolDrain
+ && nextDispatchDuration > 0) {
+ event = queue.poll(nextDispatchDuration,
+ TimeUnit.MILLISECONDS);
+
+ } else {
+ // For poll() is non blocking
+ event = queue.poll();
+ }
+ if (event != null) {
+ localBatchBuffer.add(event);
+ if (getMaxBatchSize() >= localBatchBuffer.size()) {
+ queue.drainTo(localBatchBuffer, getMaxBatchSize()
+ - localBatchBuffer.size());
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.info(
+ "Caught exception in consumer thread. Mostly to abort loop",
+ e);
+ } catch (Throwable t) {
+ logger.error("Caught error during processing request.", t);
+ }
+
+ if (localBatchBuffer.size() > 0 && isToSpool) {
+ // Let spool to the file directly
+ if (isDestActive) {
+ logger.info("Switching to file spool. Queue=" + getName()
+ + ", dest=" + consumer.getName());
+ }
+ isDestActive = false;
+ fileSpooler.stashLogs(localBatchBuffer);
+ localBatchBuffer.clear();
+ // Reset all variables
+ lastDispatchTime = System.currentTimeMillis();
+ } else if (localBatchBuffer.size() > 0
+ && (isDrain()
+ || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
+ if (fileSpoolerEnabled && !isDestActive) {
+ logger.info("Switching to writing to destination. Queue="
+ + getName() + ", dest=" + consumer.getName());
+ }
+ boolean ret = consumer.log(localBatchBuffer);
+ if (!ret) {
+ if (fileSpoolerEnabled) {
+ logger.info("Switching to file spool. Queue="
+ + getName() + ", dest=" + consumer.getName());
+ // Transient error. Stash and move on
+ fileSpooler.stashLogs(localBatchBuffer);
+ isDestActive = false;
+ } else {
+ // We need to drop this event
+ logFailedEvent(localBatchBuffer, null);
+ }
+ } else {
+ isDestActive = true;
+ }
+ localBatchBuffer.clear();
+ // Reset all variables
+ lastDispatchTime = System.currentTimeMillis();
+ }
+
+ if (isDrain()) {
+ if (!queue.isEmpty() || localBatchBuffer.size() > 0) {
+ logger.info("Queue is not empty. Will retry. queue.size)="
+ + queue.size() + ", localBatchBuffer.size()="
+ + localBatchBuffer.size());
+ } else {
+ break;
+ }
+ }
+ }
+
+ logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
+ + consumer.getName());
+ try {
+ // Call stop on the consumer
+ consumer.stop();
+ if (fileSpoolerEnabled) {
+ fileSpooler.stop();
+ }
+ } catch (Throwable t) {
+ logger.error("Error while calling stop on consumer.", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
new file mode 100644
index 0000000..11c32ca
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class needs to be extended by anyone who wants to build custom
+ * destination
+ */
+public abstract class AuditDestination extends BaseAuditProvider {
+ private static final Log logger = LogFactory.getLog(AuditDestination.class);
+
+ public AuditDestination() {
+ logger.info("AuditDestination() enter");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties,
+ * java.lang.String)
+ */
+ @Override
+ public void init(Properties prop, String basePropertyName) {
+ super.init(prop, basePropertyName);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ */
+ @Override
+ public boolean isFlushPending() {
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+ */
+ @Override
+ public void flush() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
new file mode 100644
index 0000000..8b006de
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * This class temporarily stores logs in file system if the destination is
+ * overloaded or down
+ */
+public class AuditFileSpool implements Runnable {
+ private static final Log logger = LogFactory.getLog(AuditFileSpool.class);
+
+ public enum SPOOL_FILE_STATUS {
+ pending, write_inprogress, read_inprogress, done
+ }
+
+ public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir";
+ public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format";
+ public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir";
+ public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files";
+ public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix";
+ public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec";
+ public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename";
+ // public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE =
+ // "filespool.index.done_filename";
+ public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms";
+
+ AuditProvider queueProvider = null;
+ AuditProvider consumerProvider = null;
+
+ BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>();
+
+ // Folder and File attributes
+ File logFolder = null;
+ String logFileNameFormat = null;
+ File archiveFolder = null;
+ String fileNamePrefix = null;
+ String indexFileName = null;
+ File indexFile = null;
+ String indexDoneFileName = null;
+ File indexDoneFile = null;
+ int retryDestinationMS = 30 * 1000; // Default 30 seconds
+ int fileRolloverSec = 24 * 60 * 60; // In seconds
+ int maxArchiveFiles = 100;
+
+ int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+ long lastErrorLogMS = 0;
+
+ List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>();
+
+ boolean isPending = false;
+ long lastAttemptTime = 0;
+ boolean initDone = false;
+
+ PrintWriter logWriter = null;
+ AuditIndexRecord currentWriterIndexRecord = null;
+ AuditIndexRecord currentConsumerIndexRecord = null;
+
+ BufferedReader logReader = null;
+
+ Thread destinationThread = null;
+
+ boolean isWriting = true;
+ boolean isDrain = false;
+ boolean isDestDown = true;
+
+ private static Gson gson = null;
+
+ public AuditFileSpool(AuditProvider queueProvider,
+ AuditProvider consumerProvider) {
+ this.queueProvider = queueProvider;
+ this.consumerProvider = consumerProvider;
+ }
+
+ public void init(Properties prop) {
+ init(prop, null);
+ }
+
+ public void init(Properties props, String basePropertyName) {
+ if (initDone) {
+ logger.error("init() called more than once. queueProvider="
+ + queueProvider.getName() + ", consumerProvider="
+ + consumerProvider.getName());
+ return;
+ }
+ String propPrefix = "xasecure.audit.filespool";
+ if (basePropertyName != null) {
+ propPrefix = basePropertyName;
+ }
+
+ try {
+ gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+ .create();
+
+ // Initial folder and file properties
+ String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_LOCAL_DIR);
+ logFileNameFormat = MiscUtil.getStringProperty(props,
+ basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME);
+ String archiveFolderProp = MiscUtil.getStringProperty(props,
+ propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR);
+ fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_FILENAME_PREFIX);
+ indexFileName = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_INDEX_FILE);
+ retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS);
+ fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec);
+ maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles);
+
+ logger.info("retryDestinationMS=" + retryDestinationMS
+ + ", queueName=" + queueProvider.getName());
+ logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName="
+ + queueProvider.getName());
+ logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName="
+ + queueProvider.getName());
+
+ if (logFolderProp == null || logFolderProp.isEmpty()) {
+ logger.error("Audit spool folder is not configured. Please set "
+ + propPrefix
+ + "."
+ + PROP_FILE_SPOOL_LOCAL_DIR
+ + ". queueName=" + queueProvider.getName());
+ return;
+ }
+ logFolder = new File(logFolderProp);
+ if (!logFolder.isDirectory()) {
+ logFolder.mkdirs();
+ if (!logFolder.isDirectory()) {
+ logger.error("File Spool folder not found and can't be created. folder="
+ + logFolder.getAbsolutePath()
+ + ", queueName="
+ + queueProvider.getName());
+ return;
+ }
+ }
+ logger.info("logFolder=" + logFolder + ", queueName="
+ + queueProvider.getName());
+
+ if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+ logFileNameFormat = "spool_" + "%app-type%" + "_"
+ + "%time:yyyyMMdd-HHmm.ss%.log";
+ }
+ logger.info("logFileNameFormat=" + logFileNameFormat
+ + ", queueName=" + queueProvider.getName());
+
+ if (archiveFolderProp == null || archiveFolderProp.isEmpty()) {
+ archiveFolder = new File(logFolder, "archive");
+ } else {
+ archiveFolder = new File(archiveFolderProp);
+ }
+ if (!archiveFolder.isDirectory()) {
+ archiveFolder.mkdirs();
+ if (!archiveFolder.isDirectory()) {
+ logger.error("File Spool archive folder not found and can't be created. folder="
+ + archiveFolder.getAbsolutePath()
+ + ", queueName="
+ + queueProvider.getName());
+ return;
+ }
+ }
+ logger.info("archiveFolder=" + archiveFolder + ", queueName="
+ + queueProvider.getName());
+
+ if (indexFileName == null || indexFileName.isEmpty()) {
+ indexFileName = "index_" + fileNamePrefix + ".json";
+ }
+
+ indexFile = new File(logFolder, indexFileName);
+ if (!indexFile.exists()) {
+ indexFile.createNewFile();
+ }
+ logger.info("indexFile=" + indexFile + ", queueName="
+ + queueProvider.getName());
+
+ int lastDot = indexFileName.lastIndexOf('.');
+ indexDoneFileName = indexFileName.substring(0, lastDot)
+ + "_closed.json";
+ indexDoneFile = new File(logFolder, indexDoneFileName);
+ if (!indexDoneFile.exists()) {
+ indexDoneFile.createNewFile();
+ }
+ logger.info("indexDoneFile=" + indexDoneFile + ", queueName="
+ + queueProvider.getName());
+
+ // Load index file
+ loadIndexFile();
+ for (AuditIndexRecord auditIndexRecord : indexRecords) {
+ if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) {
+ isPending = true;
+ }
+ if (auditIndexRecord.status
+ .equals(SPOOL_FILE_STATUS.write_inprogress)) {
+ currentWriterIndexRecord = auditIndexRecord;
+ logger.info("currentWriterIndexRecord="
+ + currentWriterIndexRecord.filePath
+ + ", queueName=" + queueProvider.getName());
+ }
+ if (auditIndexRecord.status
+ .equals(SPOOL_FILE_STATUS.read_inprogress)) {
+ indexQueue.add(auditIndexRecord);
+ }
+ }
+ printIndex();
+ // One more loop to add the rest of the pending records in reverse
+ // order
+ for (int i = 0; i < indexRecords.size(); i++) {
+ AuditIndexRecord auditIndexRecord = indexRecords.get(i);
+ if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
+ File consumerFile = new File(auditIndexRecord.filePath);
+ if (!consumerFile.exists()) {
+ logger.error("INIT: Consumer file="
+ + consumerFile.getPath() + " not found.");
+ System.exit(1);
+ }
+ indexQueue.add(auditIndexRecord);
+ }
+ }
+
+ } catch (Throwable t) {
+ logger.fatal("Error initializing File Spooler. queue="
+ + queueProvider.getName(), t);
+ return;
+ }
+ initDone = true;
+ }
+
+ /**
+ * Start looking for outstanding logs and update status according.
+ */
+ public void start() {
+ if (!initDone) {
+ logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName="
+ + queueProvider.getName());
+ return;
+ }
+
+ logger.info("Starting writerThread, queueName="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+
+ // Let's start the thread to read
+ destinationThread = new Thread(this, queueProvider.getName()
+ + "_destWriter");
+ destinationThread.setDaemon(true);
+ destinationThread.start();
+ }
+
+ public void stop() {
+ if (!initDone) {
+ logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName="
+ + queueProvider.getName());
+ return;
+ }
+ logger.info("Stop called, queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+
+ isDrain = true;
+ flush();
+
+ PrintWriter out = getOpenLogFileStream();
+ if (out != null) {
+ // If write is still going on, then let's give it enough time to
+ // complete
+ for (int i = 0; i < 3; i++) {
+ if (isWriting) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ continue;
+ }
+ try {
+ logger.info("Closing open file, queueName="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+
+ out.flush();
+ out.close();
+ } catch (Throwable t) {
+ logger.debug("Error closing spool out file.", t);
+ }
+ }
+ }
+ try {
+ destinationThread.interrupt();
+ } catch (Throwable e) {
+ // ignore
+ }
+ }
+
+ public void flush() {
+ if (!initDone) {
+ logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName="
+ + queueProvider.getName());
+ return;
+ }
+ PrintWriter out = getOpenLogFileStream();
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /**
+ * If any files are still not processed. Also, if the destination is not
+ * reachable
+ *
+ * @return
+ */
+ public boolean isPending() {
+ if (!initDone) {
+ logError("isPending(): File Spooler not initialized. queueName="
+ + queueProvider.getName());
+ return false;
+ }
+
+ return isPending;
+ }
+
+ /**
+ * Milliseconds from last attempt time
+ *
+ * @return
+ */
+ public long getLastAttemptTimeDelta() {
+ if (lastAttemptTime == 0) {
+ return 0;
+ }
+ return System.currentTimeMillis() - lastAttemptTime;
+ }
+
+ synchronized public void stashLogs(AuditEventBase event) {
+ if (isDrain) {
+ // Stop has been called, so this method shouldn't be called
+ logger.error("stashLogs() is called after stop is called. event="
+ + event);
+ return;
+ }
+ try {
+ isWriting = true;
+ PrintWriter logOut = getLogFileStream();
+ // Convert event to json
+ String jsonStr = MiscUtil.stringify(event);
+ logOut.println(jsonStr);
+ isPending = true;
+ } catch (Exception ex) {
+ logger.error("Error writing to file. event=" + event, ex);
+ } finally {
+ isWriting = false;
+ }
+
+ }
+
+ synchronized public void stashLogs(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ stashLogs(event);
+ }
+ flush();
+ }
+
+ synchronized public void stashLogsString(String event) {
+ if (isDrain) {
+ // Stop has been called, so this method shouldn't be called
+ logger.error("stashLogs() is called after stop is called. event="
+ + event);
+ return;
+ }
+ try {
+ isWriting = true;
+ PrintWriter logOut = getLogFileStream();
+ logOut.println(event);
+ } catch (Exception ex) {
+ logger.error("Error writing to file. event=" + event, ex);
+ } finally {
+ isWriting = false;
+ }
+
+ }
+
+ synchronized public void stashLogsString(Collection<String> events) {
+ for (String event : events) {
+ stashLogsString(event);
+ }
+ flush();
+ }
+
+ /**
+ * This return the current file. If there are not current open output file,
+ * then it will return null
+ *
+ * @return
+ * @throws Exception
+ */
+ synchronized private PrintWriter getOpenLogFileStream() {
+ return logWriter;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ synchronized private PrintWriter getLogFileStream() throws Exception {
+ closeFileIfNeeded();
+
+ // Either there are no open log file or the previous one has been rolled
+ // over
+ if (currentWriterIndexRecord == null) {
+ Date currentTime = new Date();
+ // Create a new file
+ String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+ currentTime.getTime());
+ String newFileName = fileName;
+ File outLogFile = null;
+ int i = 0;
+ while (true) {
+ outLogFile = new File(logFolder, newFileName);
+ File archiveLogFile = new File(archiveFolder, newFileName);
+ if (!outLogFile.exists() && !archiveLogFile.exists()) {
+ break;
+ }
+ i++;
+ int lastDot = fileName.lastIndexOf('.');
+ String baseName = fileName.substring(0, lastDot);
+ String extension = fileName.substring(lastDot);
+ newFileName = baseName + "." + i + extension;
+ }
+ fileName = newFileName;
+ logger.info("Creating new file. queueName="
+ + queueProvider.getName() + ", fileName=" + fileName);
+ // Open the file
+ logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+ outLogFile)));
+
+ AuditIndexRecord tmpIndexRecord = new AuditIndexRecord();
+
+ tmpIndexRecord.id = MiscUtil.generateUniqueId();
+ tmpIndexRecord.filePath = outLogFile.getPath();
+ tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress;
+ tmpIndexRecord.fileCreateTime = currentTime;
+ tmpIndexRecord.lastAttempt = true;
+ currentWriterIndexRecord = tmpIndexRecord;
+ indexRecords.add(currentWriterIndexRecord);
+ saveIndexFile();
+
+ } else {
+ if (logWriter == null) {
+ // This means the process just started. We need to open the file
+ // in append mode.
+ logger.info("Opening existing file for append. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+ currentWriterIndexRecord.filePath, true)));
+ }
+ }
+ return logWriter;
+ }
+
+ synchronized private void closeFileIfNeeded() throws FileNotFoundException,
+ IOException {
+ // Is there file open to write or there are no pending file, then close
+ // the active file
+ if (currentWriterIndexRecord != null) {
+ // Check whether the file needs to rolled
+ boolean closeFile = false;
+ if (indexRecords.size() == 1) {
+ closeFile = true;
+ logger.info("Closing file. Only one open file. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ } else if (System.currentTimeMillis()
+ - currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) {
+ closeFile = true;
+ logger.info("Closing file. Rolling over. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ }
+ if (closeFile) {
+ // Roll the file
+ if (logWriter != null) {
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ }
+ currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending;
+ currentWriterIndexRecord.writeCompleteTime = new Date();
+ saveIndexFile();
+ logger.info("Adding file to queue. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ indexQueue.add(currentWriterIndexRecord);
+ currentWriterIndexRecord = null;
+ }
+ }
+ }
+
+ /**
+ * Load the index file
+ *
+ * @throws IOException
+ */
+ void loadIndexFile() throws IOException {
+ logger.info("Loading index file. fileName=" + indexFile.getPath());
+ BufferedReader br = new BufferedReader(new FileReader(indexFile));
+ indexRecords.clear();
+ String line;
+ while ((line = br.readLine()) != null) {
+ if (!line.isEmpty() && !line.startsWith("#")) {
+ AuditIndexRecord record = gson.fromJson(line,
+ AuditIndexRecord.class);
+ indexRecords.add(record);
+ }
+ }
+ br.close();
+ }
+
+ synchronized void printIndex() {
+ logger.info("INDEX printIndex() ==== START");
+ Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+ while (iter.hasNext()) {
+ AuditIndexRecord record = iter.next();
+ logger.info("INDEX=" + record + ", isFileExist="
+ + (new File(record.filePath).exists()));
+ }
+ logger.info("INDEX printIndex() ==== END");
+ }
+
+ synchronized void removeIndexRecord(AuditIndexRecord indexRecord)
+ throws FileNotFoundException, IOException {
+ Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+ while (iter.hasNext()) {
+ AuditIndexRecord record = iter.next();
+ if (record.id.equals(indexRecord.id)) {
+ logger.info("Removing file from index. file=" + record.filePath
+ + ", queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+
+ iter.remove();
+ appendToDoneFile(record);
+ }
+ }
+ saveIndexFile();
+ }
+
+ synchronized void saveIndexFile() throws FileNotFoundException, IOException {
+ PrintWriter out = new PrintWriter(indexFile);
+ for (AuditIndexRecord auditIndexRecord : indexRecords) {
+ out.println(gson.toJson(auditIndexRecord));
+ }
+ out.close();
+ // printIndex();
+
+ }
+
+ void appendToDoneFile(AuditIndexRecord indexRecord)
+ throws FileNotFoundException, IOException {
+ logger.info("Moving to done file. " + indexRecord.filePath
+ + ", queueName=" + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+ String line = gson.toJson(indexRecord);
+ PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(
+ indexDoneFile, true)));
+ out.println(line);
+ out.flush();
+ out.close();
+
+ // Move to archive folder
+ File logFile = null;
+ File archiveFile = null;
+ try {
+ logFile = new File(indexRecord.filePath);
+ String fileName = logFile.getName();
+ archiveFile = new File(archiveFolder, fileName);
+ logger.info("Moving logFile " + logFile + " to " + archiveFile);
+ logFile.renameTo(archiveFile);
+ } catch (Throwable t) {
+ logger.error("Error moving log file to archive folder. logFile="
+ + logFile + ", archiveFile=" + archiveFile, t);
+ }
+
+ archiveFile = null;
+ try {
+ // Remove old files
+ File[] logFiles = archiveFolder.listFiles(new FileFilter() {
+ public boolean accept(File pathname) {
+ return pathname.getName().toLowerCase().endsWith(".log");
+ }
+ });
+
+ if (logFiles.length > maxArchiveFiles) {
+ int filesToDelete = logFiles.length - maxArchiveFiles;
+ BufferedReader br = new BufferedReader(new FileReader(
+ indexDoneFile));
+ try {
+ int filesDeletedCount = 0;
+ while ((line = br.readLine()) != null) {
+ if (!line.isEmpty() && !line.startsWith("#")) {
+ AuditIndexRecord record = gson.fromJson(line,
+ AuditIndexRecord.class);
+ logFile = new File(record.filePath);
+ String fileName = logFile.getName();
+ archiveFile = new File(archiveFolder, fileName);
+ if (archiveFile.exists()) {
+ logger.info("Deleting archive file "
+ + archiveFile);
+ boolean ret = archiveFile.delete();
+ if (!ret) {
+ logger.error("Error deleting archive file. archiveFile="
+ + archiveFile);
+ }
+ filesDeletedCount++;
+ if (filesDeletedCount >= filesToDelete) {
+ logger.info("Deleted " + filesDeletedCount
+ + " files");
+ break;
+ }
+ }
+ }
+ }
+ } finally {
+ br.close();
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Error deleting older archive file. archiveFile="
+ + archiveFile, t);
+ }
+
+ }
+
+ void logError(String msg) {
+ long currTimeMS = System.currentTimeMillis();
+ if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+ logger.error(msg);
+ lastErrorLogMS = currTimeMS;
+ }
+ }
+
+ class AuditIndexRecord {
+ String id;
+ String filePath;
+ int linePosition = 0;
+ SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress;
+ Date fileCreateTime;
+ Date writeCompleteTime;
+ Date doneCompleteTime;
+ Date lastSuccessTime;
+ Date lastFailedTime;
+ int failedAttemptCount = 0;
+ boolean lastAttempt = false;
+
+ @Override
+ public String toString() {
+ return "AuditIndexRecord [id=" + id + ", filePath=" + filePath
+ + ", linePosition=" + linePosition + ", status=" + status
+ + ", fileCreateTime=" + fileCreateTime
+ + ", writeCompleteTime=" + writeCompleteTime
+ + ", doneCompleteTime=" + doneCompleteTime
+ + ", lastSuccessTime=" + lastSuccessTime
+ + ", lastFailedTime=" + lastFailedTime
+ + ", failedAttemptCount=" + failedAttemptCount
+ + ", lastAttempt=" + lastAttempt + "]";
+ }
+
+ }
+
+ class AuditFileSpoolAttempt {
+ Date attemptTime;
+ String status;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ // Let's pause between each iteration
+ if (currentConsumerIndexRecord == null) {
+ currentConsumerIndexRecord = indexQueue.poll(
+ retryDestinationMS, TimeUnit.MILLISECONDS);
+ } else {
+ Thread.sleep(retryDestinationMS);
+ }
+
+ if (isDrain) {
+ // Need to exit
+ break;
+ }
+ if (currentConsumerIndexRecord == null) {
+ closeFileIfNeeded();
+ continue;
+ }
+
+ boolean isRemoveIndex = false;
+ File consumerFile = new File(
+ currentConsumerIndexRecord.filePath);
+ if (!consumerFile.exists()) {
+ logger.error("Consumer file=" + consumerFile.getPath()
+ + " not found.");
+ printIndex();
+ isRemoveIndex = true;
+ } else {
+ // Let's open the file to write
+ BufferedReader br = new BufferedReader(new FileReader(
+ currentConsumerIndexRecord.filePath));
+ try {
+ int startLine = currentConsumerIndexRecord.linePosition;
+ String line;
+ int currLine = 0;
+ boolean isResumed = false;
+ List<String> lines = new ArrayList<String>();
+ while ((line = br.readLine()) != null) {
+ currLine++;
+ if (currLine < startLine) {
+ continue;
+ }
+ lines.add(line);
+ if (lines.size() == queueProvider.getMaxBatchSize()) {
+ boolean ret = sendEvent(lines,
+ currentConsumerIndexRecord, currLine);
+ if (!ret) {
+ throw new Exception("Destination down");
+ } else {
+ if (!isResumed) {
+ logger.info("Started writing to destination. file="
+ + currentConsumerIndexRecord.filePath
+ + ", queueName="
+ + queueProvider.getName()
+ + ", consumer="
+ + consumerProvider.getName());
+ }
+ }
+ lines.clear();
+ }
+ }
+ if (lines.size() > 0) {
+ boolean ret = sendEvent(lines,
+ currentConsumerIndexRecord, currLine);
+ if (!ret) {
+ throw new Exception("Destination down");
+ } else {
+ if (!isResumed) {
+ logger.info("Started writing to destination. file="
+ + currentConsumerIndexRecord.filePath
+ + ", queueName="
+ + queueProvider.getName()
+ + ", consumer="
+ + consumerProvider.getName());
+ }
+ }
+ lines.clear();
+ }
+ logger.info("Done reading file. file="
+ + currentConsumerIndexRecord.filePath
+ + ", queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+ // The entire file is read
+ currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done;
+ currentConsumerIndexRecord.doneCompleteTime = new Date();
+ currentConsumerIndexRecord.lastAttempt = true;
+
+ isRemoveIndex = true;
+ } catch (Exception ex) {
+ isDestDown = true;
+ logError("Destination down. queueName="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+ lastAttemptTime = System.currentTimeMillis();
+ // Update the index file
+ currentConsumerIndexRecord.lastFailedTime = new Date();
+ currentConsumerIndexRecord.failedAttemptCount++;
+ currentConsumerIndexRecord.lastAttempt = false;
+ saveIndexFile();
+ } finally {
+ br.close();
+ }
+ }
+ if (isRemoveIndex) {
+ // Remove this entry from index
+ removeIndexRecord(currentConsumerIndexRecord);
+ currentConsumerIndexRecord = null;
+ closeFileIfNeeded();
+ }
+ } catch (Throwable t) {
+ logger.error("Exception in destination writing thread.", t);
+ }
+ }
+ logger.info("Exiting file spooler. provider=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+ }
+
+ private boolean sendEvent(List<String> lines, AuditIndexRecord indexRecord,
+ int currLine) {
+ boolean ret = true;
+ try {
+ ret = consumerProvider.logJSON(lines);
+ if (!ret) {
+ // Need to log error after fixed interval
+ logError("Error sending logs to consumer. provider="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+ } else {
+ // Update index and save
+ indexRecord.linePosition = currLine;
+ indexRecord.status = SPOOL_FILE_STATUS.read_inprogress;
+ indexRecord.lastSuccessTime = new Date();
+ indexRecord.lastAttempt = true;
+ saveIndexFile();
+
+ if (isDestDown) {
+ isDestDown = false;
+ logger.info("Destination up now. " + indexRecord.filePath
+ + ", queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Error while sending logs to consumer. provider="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName() + ", log=" + lines, t);
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
new file mode 100644
index 0000000..3ef3e30
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+/**
+ * This exception should be thrown only when there is an error in the message
+ * itself. E.g. invalid field type, etc. Don't throw this exception if there is
+ * a transient error
+ */
+public class AuditMessageException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public AuditMessageException() {
+ }
+
+ /**
+ * @param message
+ */
+ public AuditMessageException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param cause
+ */
+ public AuditMessageException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param message
+ * @param cause
+ */
+ public AuditMessageException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * @param message
+ * @param cause
+ * @param enableSuppression
+ * @param writableStackTrace
+ */
+ public AuditMessageException(String message, Throwable cause,
+ boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
index 47c2d7f..0e38624 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
@@ -18,18 +18,38 @@
package org.apache.ranger.audit.provider;
+import java.util.Collection;
import java.util.Properties;
import org.apache.ranger.audit.model.AuditEventBase;
public interface AuditProvider {
- public void log(AuditEventBase event);
+ public boolean log(AuditEventBase event);
+ public boolean log(Collection<AuditEventBase> events);
+
+ public boolean logJSON(String event);
+ public boolean logJSON(Collection<String> events);
public void init(Properties prop);
+ public void init(Properties prop, String basePropertyName);
public void start();
public void stop();
public void waitToComplete();
+ public void waitToComplete(long timeout);
+
+ /**
+ * Name for this provider. Used only during logging. Uniqueness is not guaranteed
+ */
+ public String getName();
+ /**
+ * If this AuditProvider in the state of shutdown
+ * @return
+ */
+ public boolean isDrain();
+
+ public int getMaxBatchSize();
+ public int getMaxBatchInterval();
public boolean isFlushPending();
public long getLastFlushTime();
public void flush();
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index bb8fa6d..13b3142 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -28,7 +28,6 @@ import org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider;
import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
-
/*
* TODO:
* 1) Flag to enable/disable audit logging
@@ -37,22 +36,25 @@ import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
*/
public class AuditProviderFactory {
- private static final Log LOG = LogFactory.getLog(AuditProviderFactory.class);
-
- private static final String AUDIT_IS_ENABLED_PROP = "xasecure.audit.is.enabled" ;
- private static final String AUDIT_DB_IS_ENABLED_PROP = "xasecure.audit.db.is.enabled" ;
- private static final String AUDIT_HDFS_IS_ENABLED_PROP = "xasecure.audit.hdfs.is.enabled";
- private static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled" ;
- private static final String AUDIT_KAFKA_IS_ENABLED_PROP = "xasecure.audit.kafka.is.enabled";
- private static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled";
-
- private static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024;
- private static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000;
-
+ private static final Log LOG = LogFactory
+ .getLog(AuditProviderFactory.class);
+
+ public static final String AUDIT_IS_ENABLED_PROP = "xasecure.audit.is.enabled";
+ public static final String AUDIT_DB_IS_ENABLED_PROP = "xasecure.audit.db.is.enabled";
+ public static final String AUDIT_HDFS_IS_ENABLED_PROP = "xasecure.audit.hdfs.is.enabled";
+ public static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled";
+ public static final String AUDIT_KAFKA_IS_ENABLED_PROP = "xasecure.audit.kafka.is.enabled";
+ public static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled";
+
+ public static final String AUDIT_DEST_BASE = "xasecure.audit.destination";
+
+ public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024;
+ public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000;
+
private static AuditProviderFactory sFactory;
private AuditProvider mProvider = null;
- private boolean mInitDone = false;
+ private boolean mInitDone = false;
private AuditProviderFactory() {
LOG.info("AuditProviderFactory: creating..");
@@ -61,9 +63,9 @@ public class AuditProviderFactory {
}
public static AuditProviderFactory getInstance() {
- if(sFactory == null) {
- synchronized(AuditProviderFactory.class) {
- if(sFactory == null) {
+ if (sFactory == null) {
+ synchronized (AuditProviderFactory.class) {
+ if (sFactory == null) {
sFactory = new AuditProviderFactory();
}
}
@@ -75,7 +77,7 @@ public class AuditProviderFactory {
public static AuditProvider getAuditProvider() {
return AuditProviderFactory.getInstance().getProvider();
}
-
+
public AuditProvider getProvider() {
return mProvider;
}
@@ -86,133 +88,301 @@ public class AuditProviderFactory {
public synchronized void init(Properties props, String appType) {
LOG.info("AuditProviderFactory: initializing..");
-
- if(mInitDone) {
- LOG.warn("AuditProviderFactory.init(): already initialized!", new Exception());
+
+ if (mInitDone) {
+ LOG.warn("AuditProviderFactory.init(): already initialized!",
+ new Exception());
return;
}
mInitDone = true;
-
- MiscUtil.setApplicationType(appType);
- boolean isEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_IS_ENABLED_PROP, false);
- boolean isAuditToDbEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false);
- boolean isAuditToHdfsEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false);
- boolean isAuditToLog4jEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false);
- boolean isAuditToKafkaEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_KAFKA_IS_ENABLED_PROP, false);
- boolean isAuditToSolrEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_SOLR_IS_ENABLED_PROP, false);
+ MiscUtil.setApplicationType(appType);
- if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToKafkaEnabled || isAuditToLog4jEnabled || isAuditToSolrEnabled)) {
- LOG.info("AuditProviderFactory: Audit not enabled..");
+ boolean isEnabled = MiscUtil.getBooleanProperty(props,
+ AUDIT_IS_ENABLED_PROP, false);
+ boolean isAuditToDbEnabled = MiscUtil.getBooleanProperty(props,
+ AUDIT_DB_IS_ENABLED_PROP, false);
+ boolean isAuditToHdfsEnabled = MiscUtil.getBooleanProperty(props,
+ AUDIT_HDFS_IS_ENABLED_PROP, false);
+ boolean isAuditToLog4jEnabled = MiscUtil.getBooleanProperty(props,
+ AUDIT_LOG4J_IS_ENABLED_PROP, false);
+ boolean isAuditToKafkaEnabled = MiscUtil.getBooleanProperty(props,
+ AUDIT_KAFKA_IS_ENABLED_PROP, false);
+ boolean isAuditToSolrEnabled = MiscUtil.getBooleanProperty(props,
+ AUDIT_SOLR_IS_ENABLED_PROP, false);
- mProvider = getDefaultProvider();
+ List<AuditProvider> providers = new ArrayList<AuditProvider>();
- return;
+ // TODO: Delete me
+ for (Object propNameObj : props.keySet()) {
+ LOG.info("DELETE ME: " + propNameObj.toString() + "="
+ + props.getProperty(propNameObj.toString()));
}
- List<AuditProvider> providers = new ArrayList<AuditProvider>();
+ // Process new audit configurations
+ List<String> destNameList = new ArrayList<String>();
- if(isAuditToDbEnabled) {
- LOG.info("DbAuditProvider is enabled");
- DbAuditProvider dbProvider = new DbAuditProvider();
-
- boolean isAuditToDbAsync = BaseAuditProvider.getBooleanProperty(props, DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false);
+ for (Object propNameObj : props.keySet()) {
+ String propName = propNameObj.toString();
+ if (propName.length() <= AUDIT_DEST_BASE.length() + 1) {
+ continue;
+ }
+ String destName = propName.substring(AUDIT_DEST_BASE.length() + 1);
+ List<String> splits = MiscUtil.toArray(destName, ".");
+ if (splits.size() > 1) {
+ continue;
+ }
+ String value = props.getProperty(propName);
+ if (value.equalsIgnoreCase("enable")
+ || value.equalsIgnoreCase("true")) {
+ destNameList.add(destName);
+ LOG.info("Audit destination " + propName + " is set to "
+ + value);
+ }
+ }
- if(isAuditToDbAsync) {
- int maxQueueSize = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
- int maxFlushInterval = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+ for (String destName : destNameList) {
+ String destPropPrefix = AUDIT_DEST_BASE + "." + destName;
+ AuditProvider destProvider = getProviderFromConfig(props,
+ destPropPrefix, destName);
- AsyncAuditProvider asyncProvider = new AsyncAuditProvider("DbAuditProvider", maxQueueSize, maxFlushInterval, dbProvider);
+ if (destProvider != null) {
+ destProvider.init(props, destPropPrefix);
- providers.add(asyncProvider);
- } else {
- providers.add(dbProvider);
+ String queueName = MiscUtil.getStringProperty(props,
+ destPropPrefix + "." + BaseAuditProvider.PROP_QUEUE);
+ if( queueName == null || queueName.isEmpty()) {
+ queueName = "batch";
+ }
+ if (queueName != null && !queueName.isEmpty()
+ && !queueName.equalsIgnoreCase("none")) {
+ String queuePropPrefix = destPropPrefix + "." + queueName;
+ AuditProvider queueProvider = getProviderFromConfig(props,
+ queuePropPrefix, queueName);
+ if (queueProvider != null) {
+ if (queueProvider instanceof BaseAuditProvider) {
+ BaseAuditProvider qProvider = (BaseAuditProvider) queueProvider;
+ qProvider.setConsumer(destProvider);
+ qProvider.init(props, queuePropPrefix);
+ providers.add(queueProvider);
+ } else {
+ LOG.fatal("Provider queue doesn't extend BaseAuditProvider destination "
+ + destName
+ + " can't be created. queueName="
+ + queueName);
+ }
+ } else {
+ LOG.fatal("Queue provider for destination " + destName
+ + " can't be created. queueName=" + queueName);
+ }
+ } else {
+ LOG.info("Audit destination " + destProvider.getName()
+ + " added to provider list");
+ providers.add(destProvider);
+ }
}
}
+ if (providers.size() > 0) {
+ LOG.info("Using v2 audit configuration");
+ AuditAsyncQueue asyncQueue = new AuditAsyncQueue();
+ String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "async";
+ asyncQueue.init(props, propPrefix);
+
+ if (providers.size() == 1) {
+ asyncQueue.setConsumer(providers.get(0));
+ } else {
+ MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
+ multiDestProvider.init(props);
+ multiDestProvider.addAuditProviders(providers);
+ asyncQueue.setConsumer(multiDestProvider);
+ }
- if(isAuditToHdfsEnabled) {
- LOG.info("HdfsAuditProvider is enabled");
+ mProvider = asyncQueue;
+ mProvider.start();
+ } else {
+ LOG.info("No v2 audit configuration found. Trying v1 audit configurations");
+ if (!isEnabled
+ || !(isAuditToDbEnabled || isAuditToHdfsEnabled
+ || isAuditToKafkaEnabled || isAuditToLog4jEnabled
+ || isAuditToSolrEnabled || providers.size() == 0)) {
+ LOG.info("AuditProviderFactory: Audit not enabled..");
- HdfsAuditProvider hdfsProvider = new HdfsAuditProvider();
+ mProvider = getDefaultProvider();
- boolean isAuditToHdfsAsync = BaseAuditProvider.getBooleanProperty(props, HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false);
+ return;
+ }
- if(isAuditToHdfsAsync) {
- int maxQueueSize = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
- int maxFlushInterval = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+ if (isAuditToDbEnabled) {
+ LOG.info("DbAuditProvider is enabled");
+ DbAuditProvider dbProvider = new DbAuditProvider();
- AsyncAuditProvider asyncProvider = new AsyncAuditProvider("HdfsAuditProvider", maxQueueSize, maxFlushInterval, hdfsProvider);
+ boolean isAuditToDbAsync = MiscUtil.getBooleanProperty(props,
+ DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false);
- providers.add(asyncProvider);
- } else {
- providers.add(hdfsProvider);
- }
- }
+ if (isAuditToDbAsync) {
+ int maxQueueSize = MiscUtil.getIntProperty(props,
+ DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP,
+ AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+ int maxFlushInterval = MiscUtil.getIntProperty(props,
+ DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP,
+ AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
- if(isAuditToKafkaEnabled) {
- LOG.info("KafkaAuditProvider is enabled");
- KafkaAuditProvider kafkaProvider = new KafkaAuditProvider();
- kafkaProvider.init(props);
-
- if( kafkaProvider.isAsync()) {
- AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MyKafkaAuditProvider", kafkaProvider.getMaxQueueSize(), kafkaProvider.getMaxFlushInterval(), kafkaProvider);
- providers.add(asyncProvider);
- } else {
- providers.add(kafkaProvider);
- }
- }
+ AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+ "DbAuditProvider", maxQueueSize, maxFlushInterval,
+ dbProvider);
- if(isAuditToSolrEnabled) {
- LOG.info("SolrAuditProvider is enabled");
- SolrAuditProvider solrProvider = new SolrAuditProvider();
- solrProvider.init(props);
-
- if( solrProvider.isAsync()) {
- AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MySolrAuditProvider", solrProvider.getMaxQueueSize(), solrProvider.getMaxFlushInterval(), solrProvider);
- providers.add(asyncProvider);
- } else {
- providers.add(solrProvider);
+ providers.add(asyncProvider);
+ } else {
+ providers.add(dbProvider);
+ }
}
- }
- if(isAuditToLog4jEnabled) {
- Log4jAuditProvider log4jProvider = new Log4jAuditProvider();
+ if (isAuditToHdfsEnabled) {
+ LOG.info("HdfsAuditProvider is enabled");
- boolean isAuditToLog4jAsync = BaseAuditProvider.getBooleanProperty(props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP, false);
+ HdfsAuditProvider hdfsProvider = new HdfsAuditProvider();
- if(isAuditToLog4jAsync) {
- int maxQueueSize = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
- int maxFlushInterval = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+ boolean isAuditToHdfsAsync = MiscUtil.getBooleanProperty(props,
+ HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false);
- AsyncAuditProvider asyncProvider = new AsyncAuditProvider("Log4jAuditProvider", maxQueueSize, maxFlushInterval, log4jProvider);
+ if (isAuditToHdfsAsync) {
+ int maxQueueSize = MiscUtil.getIntProperty(props,
+ HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP,
+ AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+ int maxFlushInterval = MiscUtil
+ .getIntProperty(
+ props,
+ HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP,
+ AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+
+ AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+ "HdfsAuditProvider", maxQueueSize,
+ maxFlushInterval, hdfsProvider);
+
+ providers.add(asyncProvider);
+ } else {
+ providers.add(hdfsProvider);
+ }
+ }
- providers.add(asyncProvider);
+ if (isAuditToKafkaEnabled) {
+ LOG.info("KafkaAuditProvider is enabled");
+ KafkaAuditProvider kafkaProvider = new KafkaAuditProvider();
+ kafkaProvider.init(props);
+
+ if (kafkaProvider.isAsync()) {
+ AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+ "MyKafkaAuditProvider",
+ kafkaProvider.getMaxQueueSize(),
+ kafkaProvider.getMaxBatchInterval(), kafkaProvider);
+ providers.add(asyncProvider);
+ } else {
+ providers.add(kafkaProvider);
+ }
+ }
+
+ if (isAuditToSolrEnabled) {
+ LOG.info("SolrAuditProvider is enabled");
+ SolrAuditProvider solrProvider = new SolrAuditProvider();
+ solrProvider.init(props);
+
+ if (solrProvider.isAsync()) {
+ AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+ "MySolrAuditProvider",
+ solrProvider.getMaxQueueSize(),
+ solrProvider.getMaxBatchInterval(), solrProvider);
+ providers.add(asyncProvider);
+ } else {
+ providers.add(solrProvider);
+ }
+ }
+
+ if (isAuditToLog4jEnabled) {
+ Log4jAuditProvider log4jProvider = new Log4jAuditProvider();
+
+ boolean isAuditToLog4jAsync = MiscUtil.getBooleanProperty(
+ props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP,
+ false);
+
+ if (isAuditToLog4jAsync) {
+ int maxQueueSize = MiscUtil.getIntProperty(props,
+ Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP,
+ AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT);
+ int maxFlushInterval = MiscUtil
+ .getIntProperty(
+ props,
+ Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP,
+ AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT);
+
+ AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
+ "Log4jAuditProvider", maxQueueSize,
+ maxFlushInterval, log4jProvider);
+
+ providers.add(asyncProvider);
+ } else {
+ providers.add(log4jProvider);
+ }
+ }
+ if (providers.size() == 0) {
+ mProvider = getDefaultProvider();
+ } else if (providers.size() == 1) {
+ mProvider = providers.get(0);
} else {
- providers.add(log4jProvider);
+ MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
+
+ multiDestProvider.addAuditProviders(providers);
+
+ mProvider = multiDestProvider;
}
- }
- if(providers.size() == 0) {
- mProvider = getDefaultProvider();
- } else if(providers.size() == 1) {
- mProvider = providers.get(0);
- } else {
- MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
-
- multiDestProvider.addAuditProviders(providers);
-
- mProvider = multiDestProvider;
+ mProvider.init(props);
+ mProvider.start();
}
-
- mProvider.init(props);
- mProvider.start();
JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider);
- Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+ Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+ }
+
+ private AuditProvider getProviderFromConfig(Properties props,
+ String propPrefix, String providerName) {
+ AuditProvider provider = null;
+ String className = MiscUtil.getStringProperty(props, propPrefix + "."
+ + BaseAuditProvider.PROP_CLASS_NAME);
+ if (className != null && !className.isEmpty()) {
+ try {
+ provider = (AuditProvider) Class.forName(className)
+ .newInstance();
+ } catch (Exception e) {
+ LOG.fatal("Can't instantiate audit class for providerName="
+ + providerName + ", className=" + className);
+ }
+ } else {
+ if (providerName.equals("file")) {
+ provider = new FileAuditDestination();
+ } else if (providerName.equalsIgnoreCase("hdfs")) {
+ provider = new HDFSAuditDestination();
+ } else if (providerName.equals("solr")) {
+ provider = new SolrAuditProvider();
+ } else if (providerName.equals("kafka")) {
+ provider = new KafkaAuditProvider();
+ } else if (providerName.equals("db")) {
+ provider = new DbAuditProvider();
+ } else if (providerName.equals("log4j")) {
+ provider = new Log4jAuditProvider();
+ } else if (providerName.equals("batch")) {
+ provider = new AuditBatchProcessor();
+ } else if (providerName.equals("async")) {
+ provider = new AuditAsyncQueue();
+ } else {
+ LOG.error("Provider name doesn't have any class associated with it. providerName="
+ + providerName);
+ }
+ }
+ return provider;
}
-
+
private AuditProvider getDefaultProvider() {
return new DummyAuditProvider();
}
@@ -227,6 +397,6 @@ public class AuditProviderFactory {
public void run() {
mProvider.waitToComplete();
mProvider.stop();
- }
+ }
}
}
[4/4] incubator-ranger git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-ranger.git
Posted by bo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ranger.git
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/44f403ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/44f403ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/44f403ea
Branch: refs/heads/master
Commit: 44f403ead9f920495af247e97450eb9e6a3c29b8
Parents: eb1e9b5 4a20a9d
Author: Don Bosco Durai <bo...@apache.org>
Authored: Sun Apr 19 15:10:57 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Sun Apr 19 15:10:57 2015 -0700
----------------------------------------------------------------------
.../plugin/policyengine/RangerPolicyEngine.java | 8 -
.../policyengine/RangerPolicyEngineImpl.java | 21 --
.../RangerDefaultPolicyEvaluator.java | 2 +-
.../RangerAbstractResourceMatcher.java | 20 +-
.../RangerDefaultResourceMatcher.java | 13 +-
.../RangerPathResourceMatcher.java | 67 ++--
.../resourcematcher/RangerResourceMatcher.java | 9 +-
.../ranger/plugin/service/RangerBasePlugin.java | 38 +-
.../resourcematcher/TestResourceMatcher.java | 136 +++++++
.../test_resourcematcher_default.json | 327 +++++++++++++++++
.../test_resourcematcher_path.json | 290 +++++++++++++++
.../hbase/AuthorizationSession.java | 16 +-
.../authorization/hbase/HbaseAuditHandler.java | 5 +
.../hbase/HbaseAuditHandlerImpl.java | 21 +-
.../authorization/hbase/HbaseAuthUtilsImpl.java | 1 -
.../authorization/hbase/HbaseFactory.java | 6 +
.../authorization/hbase/HbaseUserUtils.java | 7 +
.../authorization/hbase/HbaseUserUtilsImpl.java | 50 ++-
.../hbase/RangerAuthorizationCoprocessor.java | 44 +--
.../hadoop/RangerHdfsAuthorizer.java | 52 ++-
kms/scripts/db_setup.py | 21 +-
kms/scripts/dba_script.py | 58 ++-
kms/scripts/install.properties | 109 +++++-
kms/scripts/kms-initd | 78 ----
kms/scripts/ranger-kms | 48 ++-
kms/scripts/ranger-kms-services.sh | 4 +-
kms/scripts/setup.sh | 61 +++-
.../apache/hadoop/crypto/key/RangerKMSDB.java | 2 +
.../crypto/key/RangerKeyStoreProvider.java | 12 +-
plugin-kms/scripts/enable-kms-plugin.sh | 39 +-
plugin-kms/scripts/install.properties | 112 ------
plugin-kms/scripts/install.sh | 364 -------------------
.../scripts/kms-plugin-install.properties | 23 --
plugin-kms/scripts/uninstall.sh | 70 ----
pom.xml | 2 +-
security-admin/scripts/db_setup.py | 17 +-
security-admin/scripts/dba_script.py | 19 +-
.../java/org/apache/ranger/biz/AssetMgr.java | 2 +-
.../org/apache/ranger/biz/ServiceDBStore.java | 31 ++
.../java/org/apache/ranger/biz/ServiceMgr.java | 19 +-
.../org/apache/ranger/common/ServiceUtil.java | 161 +++++---
.../org/apache/ranger/rest/ServiceREST.java | 23 +-
.../ranger/service/RangerPolicyServiceBase.java | 3 +-
.../ranger/service/RangerServiceService.java | 29 +-
.../service/RangerServiceServiceBase.java | 3 +-
.../ranger/solr/SolrAccessAuditsService.java | 11 +-
.../main/webapp/scripts/prelogin/XAPrelogin.js | 1 +
.../views/policies/RangerPolicyTableLayout.js | 6 +-
.../rest/TestServiceRESTForValidation.java | 120 +++---
.../src/test/resources/log4j.properties | 36 ++
src/main/assembly/kms.xml | 25 +-
51 files changed, 1607 insertions(+), 1035 deletions(-)
----------------------------------------------------------------------
[2/4] incubator-ranger git commit: RANGER-397 - Implement reliable
streaming audits to configurable destinations
Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
index a068b8f..576176c 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
@@ -17,71 +17,296 @@
* under the License.
*/
package org.apache.ranger.audit.provider;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+
+import com.google.gson.GsonBuilder;
+
import java.util.concurrent.atomic.AtomicLong;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.Properties;
public abstract class BaseAuditProvider implements AuditProvider {
private static final Log LOG = LogFactory.getLog(BaseAuditProvider.class);
private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
- public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024;
- public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000;
+ public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
+ public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
+ public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+
+ private AtomicLong lifeTimeInLogCount = new AtomicLong(0);
- private int mLogFailureReportMinIntervalInMs = 60 * 1000;
+ private int mLogFailureReportMinIntervalInMs = 60 * 1000;
- private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
+ private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
- private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
- private int maxQueueSize = AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT;
- private int maxFlushInterval = AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT;
+ private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
+
+ public static final String PROP_NAME = "name";
+ public static final String PROP_CLASS_NAME = "classname";
+ public static final String PROP_QUEUE = "queue";
+
+ public static final String PROP_BATCH_SIZE = "batch.size";
+ public static final String PROP_QUEUE_SIZE = "queue.size";
+ public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
+
+ public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
+ public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms";
+ public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent";
+
+ public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider";
+
+ private boolean isDrain = false;
+ private String providerName = null;
+
+ private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
+ private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
+ private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
+
+ protected int failedRetryTimes = 3;
+ protected int failedRetrySleep = 3 * 1000;
+
+ protected AuditProvider consumer = null;
+ protected AuditFileSpool fileSpooler = null;
+
+ protected boolean fileSpoolerEnabled = false;
+ protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
+ protected int fileSpoolDrainThresholdPercent = 80;
+
+ int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+ long lastErrorLogMS = 0;
protected Properties props = null;
public BaseAuditProvider() {
}
-
+
+ public BaseAuditProvider(AuditProvider consumer) {
+ this.consumer = consumer;
+ }
+
@Override
public void init(Properties props) {
+ init(props, null);
+ }
+
+ @Override
+ public void init(Properties props, String basePropertyName) {
LOG.info("BaseAuditProvider.init()");
this.props = props;
-
- mLogFailureReportMinIntervalInMs = getIntProperty(props, AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
+ String propPrefix = PROP_DEFAULT_PREFIX;
+ if (basePropertyName != null) {
+ propPrefix = basePropertyName;
+ }
+ LOG.info("propPrefix=" + propPrefix);
+ // Get final token
+ List<String> tokens = MiscUtil.toArray(propPrefix, ".");
+ String finalToken = tokens.get(tokens.size() - 1);
+
+ String name = MiscUtil.getStringProperty(props, basePropertyName + "."
+ + PROP_NAME);
+ if (name != null && !name.isEmpty()) {
+ providerName = name;
+ }
+ if (providerName == null) {
+ providerName = finalToken;
+ LOG.info("Using providerName from property prefix. providerName="
+ + providerName);
+ }
+ LOG.info("providerName=" + providerName);
+
+ setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_BATCH_SIZE, getMaxBatchSize()));
+ setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_QUEUE_SIZE, getMaxQueueSize()));
+ setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_BATCH_INTERVAL, getMaxBatchInterval()));
+
+ fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_ENABLE, false);
+ String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
+ if (fileSpoolerEnabled || logFolderProp != null) {
+ LOG.info("File spool is enabled for " + getName()
+ + ", logFolderProp=" + logFolderProp + ", " + propPrefix
+ + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
+ + fileSpoolerEnabled);
+ fileSpoolerEnabled = true;
+ fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
+ fileSpoolMaxWaitTime);
+ fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props,
+ propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD,
+ fileSpoolDrainThresholdPercent);
+ fileSpooler = new AuditFileSpool(this, consumer);
+ fileSpooler.init(props, basePropertyName);
+ } else {
+ LOG.info("File spool is disabled for " + getName());
+ }
+
+ try {
+ new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+ } catch (Throwable excp) {
+ LOG.warn(
+ "Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json",
+ excp);
+ }
+ mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props,
+ AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
+
+ }
+
+ public AuditProvider getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(AuditProvider consumer) {
+ this.consumer = consumer;
}
public void logFailedEvent(AuditEventBase event) {
logFailedEvent(event, null);
- }
+ }
public void logFailedEvent(AuditEventBase event, Throwable excp) {
long now = System.currentTimeMillis();
-
- long timeSinceLastReport = now - mFailedLogLastReportTime.get();
- long countSinceLastReport = mFailedLogCountSinceLastReport.incrementAndGet();
- long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
- if(timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+ long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+ long countSinceLastReport = mFailedLogCountSinceLastReport
+ .incrementAndGet();
+ long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+ if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
mFailedLogLastReportTime.set(now);
mFailedLogCountSinceLastReport.set(0);
- if(excp != null) {
- LOG.warn("failed to log audit event: " + MiscUtil.stringify(event), excp);
+ if (excp != null) {
+ LOG.warn(
+ "failed to log audit event: "
+ + MiscUtil.stringify(event), excp);
} else {
- LOG.warn("failed to log audit event: " + MiscUtil.stringify(event));
+ LOG.warn("failed to log audit event: "
+ + MiscUtil.stringify(event));
+ }
+
+ if (countLifeTime > 1) { // no stats to print for the 1st failure
+ LOG.warn("Log failure count: " + countSinceLastReport
+ + " in past "
+ + formatIntervalForLog(timeSinceLastReport) + "; "
+ + countLifeTime + " during process lifetime");
+ }
+ }
+ }
+
+ public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) {
+ for (AuditEventBase event : events) {
+ logFailedEvent(event, excp);
+ }
+ }
+
+ public void logFailedEventJSON(String event, Throwable excp) {
+ long now = System.currentTimeMillis();
+
+ long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+ long countSinceLastReport = mFailedLogCountSinceLastReport
+ .incrementAndGet();
+ long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+ if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+ mFailedLogLastReportTime.set(now);
+ mFailedLogCountSinceLastReport.set(0);
+
+ if (excp != null) {
+ LOG.warn("failed to log audit event: " + event, excp);
+ } else {
+ LOG.warn("failed to log audit event: " + event);
+ }
+
+ if (countLifeTime > 1) { // no stats to print for the 1st failure
+ LOG.warn("Log failure count: " + countSinceLastReport
+ + " in past "
+ + formatIntervalForLog(timeSinceLastReport) + "; "
+ + countLifeTime + " during process lifetime");
}
+ }
+ }
- if(countLifeTime > 1) { // no stats to print for the 1st failure
- LOG.warn("Log failure count: " + countSinceLastReport + " in past " + formatIntervalForLog(timeSinceLastReport) + "; " + countLifeTime + " during process lifetime");
+ public void logFailedEventJSON(Collection<String> events, Throwable excp) {
+ for (String event : events) {
+ logFailedEventJSON(event, excp);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+ * audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ List<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+ eventList.add(event);
+ return log(eventList);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
+ */
+ @Override
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
+ * )
+ */
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ boolean ret = true;
+ for (String event : events) {
+ ret = logJSON(event);
+ if (!ret) {
+ break;
}
}
- }
+ return ret;
+ }
+
+ public void setName(String name) {
+ providerName = name;
+ }
+
+ @Override
+ public String getName() {
+ return providerName;
+ }
+
+ @Override
+ public boolean isDrain() {
+ return isDrain;
+ }
+
+ public void setDrain(boolean isDrain) {
+ this.isDrain = isDrain;
+ }
-
public int getMaxQueueSize() {
return maxQueueSize;
}
@@ -90,84 +315,95 @@ public abstract class BaseAuditProvider implements AuditProvider {
this.maxQueueSize = maxQueueSize;
}
- public int getMaxFlushInterval() {
- return maxFlushInterval;
+ @Override
+ public int getMaxBatchInterval() {
+ return maxBatchInterval;
}
- public void setMaxFlushInterval(int maxFlushInterval) {
- this.maxFlushInterval = maxFlushInterval;
+ public void setMaxBatchInterval(int maxBatchInterval) {
+ this.maxBatchInterval = maxBatchInterval;
}
- public static Map<String, String> getPropertiesWithPrefix(Properties props, String prefix) {
- Map<String, String> prefixedProperties = new HashMap<String, String>();
-
- if(props != null && prefix != null) {
- for(String key : props.stringPropertyNames()) {
- if(key == null) {
- continue;
- }
-
- String val = props.getProperty(key);
+ @Override
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
- if(key.startsWith(prefix)) {
- key = key.substring(prefix.length());
+ public void setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
- if(key == null) {
- continue;
- }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+ */
+ @Override
+ public void waitToComplete() {
+ if (consumer != null) {
+ consumer.waitToComplete(-1);
+ }
+ }
- prefixedProperties.put(key, val);
- }
- }
+ @Override
+ public void waitToComplete(long timeout) {
+ if (consumer != null) {
+ consumer.waitToComplete(timeout);
}
+ }
- return prefixedProperties;
+ @Override
+ public boolean isFlushPending() {
+ return false;
}
-
- public static boolean getBooleanProperty(Properties props, String propName, boolean defValue) {
- boolean ret = defValue;
- if(props != null && propName != null) {
- String val = props.getProperty(propName);
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
+ */
+ @Override
+ public long getLastFlushTime() {
+ if (consumer != null) {
+ return consumer.getLastFlushTime();
+ }
+ return 0;
+ }
- if(val != null) {
- ret = Boolean.valueOf(val);
- }
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+ */
+ @Override
+ public void flush() {
+ if (consumer != null) {
+ consumer.flush();
}
+ }
- return ret;
+ public AtomicLong getLifeTimeInLogCount() {
+ return lifeTimeInLogCount;
}
-
- public static int getIntProperty(Properties props, String propName, int defValue) {
- int ret = defValue;
- if(props != null && propName != null) {
- String val = props.getProperty(propName);
+ public long addLifeTimeInLogCount(long count) {
+ return lifeTimeInLogCount.addAndGet(count);
+ }
- if(val != null) {
- try {
- ret = Integer.parseInt(val);
- } catch(NumberFormatException excp) {
- ret = defValue;
- }
- }
+ public void logError(String msg) {
+ long currTimeMS = System.currentTimeMillis();
+ if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+ LOG.error(msg);
+ lastErrorLogMS = currTimeMS;
}
-
- return ret;
}
-
-
- public static String getStringProperty(Properties props, String propName) {
- String ret = null;
- if(props != null && propName != null) {
- String val = props.getProperty(propName);
- if ( val != null){
- ret = val;
- }
+ public void logError(String msg, Throwable ex) {
+ long currTimeMS = System.currentTimeMillis();
+ if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+ LOG.error(msg, ex);
+ lastErrorLogMS = currTimeMS;
}
-
- return ret;
}
public String getTimeDiffStr(long time1, long time2) {
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
index be32519..ec5e9a8 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
@@ -17,6 +17,7 @@
*/
package org.apache.ranger.audit.provider;
+import java.util.Collection;
import java.util.Properties;
import org.apache.ranger.audit.model.AuditEventBase;
@@ -32,7 +33,7 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
}
@Override
- public void log(AuditEventBase event) {
+ public boolean log(AuditEventBase event) {
if(event instanceof AuthzAuditEvent) {
AuthzAuditEvent authzEvent = (AuthzAuditEvent)event;
@@ -52,6 +53,30 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
if(! mBuffer.add(event)) {
logFailedEvent(event);
}
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ for (String event : events) {
+ logJSON(event);
+ }
+ return false;
}
@Override
@@ -68,6 +93,11 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
public void waitToComplete() {
}
+
+ @Override
+ public void waitToComplete(long timeout) {
+ }
+
@Override
public boolean isFlushPending() {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index 7414a7a..f4976fb 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -19,6 +19,7 @@
package org.apache.ranger.audit.provider;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
import java.util.Properties;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.dao.DaoManager;
import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
@@ -38,7 +40,7 @@ import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
* NOTE:
* - Instances of this class are not thread-safe.
*/
-public class DbAuditProvider extends BaseAuditProvider {
+public class DbAuditProvider extends AuditDestination {
private static final Log LOG = LogFactory.getLog(DbAuditProvider.class);
@@ -73,17 +75,17 @@ public class DbAuditProvider extends BaseAuditProvider {
super.init(props);
- mDbProperties = BaseAuditProvider.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
- mCommitBatchSize = BaseAuditProvider.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
- mDbRetryMinIntervalMs = BaseAuditProvider.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15 * 1000);
+ mDbProperties = MiscUtil.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
+ mCommitBatchSize = MiscUtil.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
+ mDbRetryMinIntervalMs = MiscUtil.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15 * 1000);
- boolean isAsync = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
+ boolean isAsync = MiscUtil.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
if(! isAsync) {
mCommitBatchSize = 1; // Batching not supported in sync mode
}
- String jdbcPassword = getCredentialString(BaseAuditProvider.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
+ String jdbcPassword = getCredentialString(MiscUtil.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
mDbProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
@@ -91,7 +93,7 @@ public class DbAuditProvider extends BaseAuditProvider {
}
@Override
- public void log(AuditEventBase event) {
+ public boolean log(AuditEventBase event) {
LOG.debug("DbAuditProvider.log()");
boolean isSuccess = false;
@@ -113,6 +115,30 @@ public class DbAuditProvider extends BaseAuditProvider {
logFailedEvent(event);
}
}
+ return isSuccess;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ for (String event : events) {
+ logJSON(event);
+ }
+ return false;
}
@Override
@@ -132,6 +158,13 @@ public class DbAuditProvider extends BaseAuditProvider {
@Override
public void waitToComplete() {
LOG.info("DbAuditProvider.waitToComplete()");
+ waitToComplete(-1);
+ }
+
+ @Override
+ public void waitToComplete(long timeout) {
+ LOG.info("DbAuditProvider.waitToComplete():timeout=" + timeout);
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
index a6e3ef1..619a99d 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
@@ -17,9 +17,11 @@
*/
package org.apache.ranger.audit.provider;
+import java.util.Collection;
import java.util.Properties;
import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
public class DummyAuditProvider implements AuditProvider {
@@ -29,8 +31,32 @@ public class DummyAuditProvider implements AuditProvider {
}
@Override
- public void log(AuditEventBase event) {
+ public boolean log(AuditEventBase event) {
// intentionally left empty
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ for (String event : events) {
+ logJSON(event);
+ }
+ return false;
}
@Override
@@ -48,6 +74,13 @@ public class DummyAuditProvider implements AuditProvider {
// intentionally left empty
}
+
+ @Override
+ public int getMaxBatchSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
@Override
public boolean isFlushPending() {
return false;
@@ -62,4 +95,45 @@ public class DummyAuditProvider implements AuditProvider {
public void flush() {
// intentionally left empty
}
+
+ /* (non-Javadoc)
+ * @see org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties, java.lang.String)
+ */
+ @Override
+ public void init(Properties prop, String basePropertyName) {
+ // intentionally left empty
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
+ */
+ @Override
+ public void waitToComplete(long timeout) {
+ // intentionally left empty
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ranger.audit.provider.AuditProvider#getName()
+ */
+ @Override
+ public String getName() {
+ return this.getClass().getName();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
+ */
+ @Override
+ public boolean isDrain() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ranger.audit.provider.AuditProvider#getMaxBatchInterval()
+ */
+ @Override
+ public int getMaxBatchInterval() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
new file mode 100644
index 0000000..62ecab1
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+/**
+ * This class write the logs to local file
+ */
+public class FileAuditDestination extends AuditDestination {
+ private static final Log logger = LogFactory
+ .getLog(FileAuditDestination.class);
+
+ public static final String PROP_FILE_LOCAL_DIR = "dir";
+ public static final String PROP_FILE_LOCAL_FILE_NAME_FORMAT = "filename.format";
+ public static final String PROP_FILE_FILE_ROLLOVER = "file.rollover.sec";
+
+ String baseFolder = null;
+ String fileFormat = null;
+ int fileRolloverSec = 24 * 60 * 60; // In seconds
+ private String logFileNameFormat;
+
+ boolean initDone = false;
+
+ private File logFolder;
+ PrintWriter logWriter = null;
+
+ private Date fileCreateTime = null;
+
+ private String currentFileName;
+
+ private boolean isStopped = false;
+
+ @Override
+ public void init(Properties prop, String propPrefix) {
+ super.init(prop, propPrefix);
+
+ // Initialize properties for this class
+ // Initial folder and file properties
+ String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ + "." + PROP_FILE_LOCAL_DIR);
+ logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_FILE_LOCAL_FILE_NAME_FORMAT);
+ fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_FILE_FILE_ROLLOVER, fileRolloverSec);
+
+ if (logFolderProp == null || logFolderProp.isEmpty()) {
+ logger.error("File destination folder is not configured. Please set "
+ + propPrefix
+ + "."
+ + PROP_FILE_LOCAL_DIR
+ + ". name="
+ + getName());
+ return;
+ }
+ logFolder = new File(logFolderProp);
+ if (!logFolder.isDirectory()) {
+ logFolder.mkdirs();
+ if (!logFolder.isDirectory()) {
+ logger.error("FileDestination folder not found and can't be created. folder="
+ + logFolder.getAbsolutePath() + ", name=" + getName());
+ return;
+ }
+ }
+ logger.info("logFolder=" + logFolder + ", name=" + getName());
+
+ if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+ logFileNameFormat = "%app-type%_ranger_audit.log";
+ }
+
+ logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
+ + getName());
+
+ initDone = true;
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ try {
+ PrintWriter out = getLogFileStream();
+ for (String event : events) {
+ out.println(event);
+ }
+ out.flush();
+ } catch (Throwable t) {
+ logError("Error writing to log file.", t);
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
+ */
+ @Override
+ synchronized public boolean log(Collection<AuditEventBase> events) {
+ if (isStopped) {
+ logError("log() called after stop was requested. name=" + getName());
+ return false;
+ }
+ List<String> jsonList = new ArrayList<String>();
+ for (AuditEventBase event : events) {
+ try {
+ jsonList.add(MiscUtil.stringify(event));
+ } catch (Throwable t) {
+ logger.error("Error converting to JSON. event=" + event);
+ }
+ }
+ return logJSON(jsonList);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ public void start() {
+ // Nothing to do here. We will open the file when the first log request
+ // comes
+ }
+
+ @Override
+ synchronized public void stop() {
+ if (logWriter != null) {
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ isStopped = true;
+ }
+ }
+
+ // Helper methods in this class
+ synchronized private PrintWriter getLogFileStream() throws Exception {
+ closeFileIfNeeded();
+
+ // Either there are no open log file or the previous one has been rolled
+ // over
+ if (logWriter == null) {
+ Date currentTime = new Date();
+ // Create a new file
+ String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+ currentTime.getTime());
+ File outLogFile = new File(logFolder, fileName);
+ if (outLogFile.exists()) {
+ // Let's try to get the next available file
+ int i = 0;
+ while (true) {
+ i++;
+ int lastDot = fileName.lastIndexOf('.');
+ String baseName = fileName.substring(0, lastDot);
+ String extension = fileName.substring(lastDot);
+ String newFileName = baseName + "." + i + extension;
+ File newLogFile = new File(logFolder, newFileName);
+ if (!newLogFile.exists()) {
+ // Move the file
+ if (!outLogFile.renameTo(newLogFile)) {
+ logger.error("Error renameing file. " + outLogFile
+ + " to " + newLogFile);
+ }
+ break;
+ }
+ }
+ }
+ if (!outLogFile.exists()) {
+ logger.info("Creating new file. destName=" + getName()
+ + ", fileName=" + fileName);
+ // Open the file
+ logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+ outLogFile)));
+ } else {
+ logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+ outLogFile, true)));
+ }
+ fileCreateTime = new Date();
+ currentFileName = outLogFile.getPath();
+ }
+ return logWriter;
+ }
+
+ private void closeFileIfNeeded() throws FileNotFoundException, IOException {
+ if (logWriter == null) {
+ return;
+ }
+ if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
+ logger.info("Closing file. Rolling over. name=" + getName()
+ + ", fileName=" + currentFileName);
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ currentFileName = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
new file mode 100644
index 0000000..a36c40f
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+/**
+ * This class write the logs to local file
+ */
+public class HDFSAuditDestination extends AuditDestination {
+ private static final Log logger = LogFactory
+ .getLog(HDFSAuditDestination.class);
+
+ public static final String PROP_HDFS_DIR = "dir";
+ public static final String PROP_HDFS_SUBDIR = "subdir";
+ public static final String PROP_HDFS_FILE_NAME_FORMAT = "filename.format";
+ public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec";
+
+ String baseFolder = null;
+ String fileFormat = null;
+ int fileRolloverSec = 24 * 60 * 60; // In seconds
+ private String logFileNameFormat;
+
+ boolean initDone = false;
+
+ private String logFolder;
+ PrintWriter logWriter = null;
+
+ private Date fileCreateTime = null;
+
+ private String currentFileName;
+
+ private boolean isStopped = false;
+
+ @Override
+ public void init(Properties prop, String propPrefix) {
+ super.init(prop, propPrefix);
+
+ // Initialize properties for this class
+ // Initial folder and file properties
+ String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ + "." + PROP_HDFS_DIR);
+ String logSubFolder = MiscUtil.getStringProperty(props, propPrefix
+ + "." + PROP_HDFS_SUBDIR);
+ if (logSubFolder == null || logSubFolder.isEmpty()) {
+ logSubFolder = "%app-type%/%time:yyyyMMdd%";
+ }
+
+ logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_HDFS_FILE_NAME_FORMAT);
+ fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_HDFS_ROLLOVER, fileRolloverSec);
+
+ if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+ logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log";
+ }
+
+ if (logFolderProp == null || logFolderProp.isEmpty()) {
+ logger.fatal("File destination folder is not configured. Please set "
+ + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
+ return;
+ }
+
+ logFolder = logFolderProp + "/" + logSubFolder;
+ logger.info("logFolder=" + logFolder + ", destName=" + getName());
+ logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
+ + getName());
+
+ initDone = true;
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ try {
+ PrintWriter out = getLogFileStream();
+ for (String event : events) {
+ out.println(event);
+ }
+ out.flush();
+ } catch (Throwable t) {
+ logError("Error writing to log file.", t);
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
+ */
+ @Override
+ synchronized public boolean log(Collection<AuditEventBase> events) {
+ if (isStopped) {
+ logError("log() called after stop was requested. name=" + getName());
+ return false;
+ }
+ List<String> jsonList = new ArrayList<String>();
+ for (AuditEventBase event : events) {
+ try {
+ jsonList.add(MiscUtil.stringify(event));
+ } catch (Throwable t) {
+ logger.error("Error converting to JSON. event=" + event);
+ }
+ }
+ return logJSON(jsonList);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ public void start() {
+ // Nothing to do here. We will open the file when the first log request
+ // comes
+ }
+
+ @Override
+ synchronized public void stop() {
+ try {
+ if (logWriter != null) {
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ isStopped = true;
+ }
+ } catch (Throwable t) {
+ logger.error("Error closing HDFS file.", t);
+ }
+ }
+
+ // Helper methods in this class
+ synchronized private PrintWriter getLogFileStream() throws Throwable {
+ closeFileIfNeeded();
+
+ // Either there are no open log file or the previous one has been rolled
+ // over
+ if (logWriter == null) {
+ Date currentTime = new Date();
+ // Create a new file
+ String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+ currentTime.getTime());
+ String parentFolder = MiscUtil.replaceTokens(logFolder,
+ currentTime.getTime());
+ Configuration conf = new Configuration();
+
+ String fullPath = parentFolder
+ + org.apache.hadoop.fs.Path.SEPARATOR + fileName;
+ String defaultPath = fullPath;
+ URI uri = URI.create(fullPath);
+ FileSystem fileSystem = FileSystem.get(uri, conf);
+
+ Path hdfPath = new Path(fullPath);
+ logger.info("Checking whether log file exists. hdfPath=" + fullPath);
+ int i = 0;
+ while (fileSystem.exists(hdfPath)) {
+ i++;
+ int lastDot = defaultPath.lastIndexOf('.');
+ String baseName = defaultPath.substring(0, lastDot);
+ String extension = defaultPath.substring(lastDot);
+ fullPath = baseName + "." + i + extension;
+ hdfPath = new Path(fullPath);
+ logger.info("Checking whether log file exists. hdfPath=" + fullPath);
+ }
+ logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath);
+ // Create parent folders
+ createParents(hdfPath, fileSystem);
+
+ // Create the file to write
+ logger.info("Creating new log file. hdfPath=" + fullPath);
+ FSDataOutputStream ostream = fileSystem.create(hdfPath);
+ logWriter = new PrintWriter(ostream);
+ fileCreateTime = new Date();
+ currentFileName = fullPath;
+ }
+ return logWriter;
+ }
+
+ private void createParents(Path pathLogfile, FileSystem fileSystem)
+ throws Throwable {
+ logger.info("Creating parent folder for " + pathLogfile);
+ Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;
+
+ if (parentPath != null && fileSystem != null
+ && !fileSystem.exists(parentPath)) {
+ fileSystem.mkdirs(parentPath);
+ }
+ }
+
+ private void closeFileIfNeeded() throws FileNotFoundException, IOException {
+ if (logWriter == null) {
+ return;
+ }
+ // TODO: Close the file on absolute time. Currently it is implemented as
+ // relative time
+ if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
+ logger.info("Closing file. Rolling over. name=" + getName()
+ + ", fileName=" + currentFileName);
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ currentFileName = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
index cdc4d53..83eb324 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
@@ -502,13 +502,18 @@ class DestinationDispatcherThread<T> extends Thread {
break;
}
- // loop until log is sent successfully
- while(!mStopThread && !mDestination.sendStringified(log)) {
- try {
- Thread.sleep(destinationPollIntervalInMs);
- } catch(InterruptedException excp) {
- throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed while waiting for destination to be available", excp);
+ try {
+ // loop until log is sent successfully
+ while(!mStopThread && !mDestination.sendStringified(log)) {
+ try {
+ Thread.sleep(destinationPollIntervalInMs);
+ } catch(InterruptedException excp) {
+ throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed while waiting for destination to be available", excp);
+ }
}
+ } catch ( AuditMessageException msgError) {
+ mLogger.error("Error in log message:" + log);
+ //If there is error in log message, then it will be skipped
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
index 0d0809a..a5a52a0 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -18,17 +18,18 @@
package org.apache.ranger.audit.provider;
+import java.util.Collection;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import com.sun.tools.hat.internal.util.Misc;
-public class Log4jAuditProvider extends BaseAuditProvider {
+public class Log4jAuditProvider extends AuditDestination {
private static final Log LOG = LogFactory.getLog(Log4jAuditProvider.class);
private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class.getName());
@@ -37,7 +38,6 @@ public class Log4jAuditProvider extends BaseAuditProvider {
public static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP = "xasecure.audit.log4j.async.max.queue.size" ;
public static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
- private Gson mGsonBuilder = null;
public Log4jAuditProvider() {
LOG.info("Log4jAuditProvider: creating..");
@@ -48,53 +48,54 @@ public class Log4jAuditProvider extends BaseAuditProvider {
LOG.info("Log4jAuditProvider.init()");
super.init(props);
-
- try {
- mGsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
- } catch(Throwable excp) {
- LOG.warn("Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json", excp);
- }
}
@Override
- public void log(AuditEventBase event) {
+ public boolean log(AuditEventBase event) {
if(! AUDITLOG.isInfoEnabled())
- return;
+ return true;
if(event != null) {
- String eventStr = mGsonBuilder != null ? mGsonBuilder.toJson(event) : event.toString();
-
+ String eventStr = MiscUtil.stringify(event);
AUDITLOG.info(eventStr);
}
+ return true;
}
@Override
- public void start() {
- // intentionally left empty
- }
-
- @Override
- public void stop() {
- // intentionally left empty
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
}
@Override
- public void waitToComplete() {
- // intentionally left empty
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
}
@Override
- public boolean isFlushPending() {
+ public boolean logJSON(Collection<String> events) {
+ for (String event : events) {
+ logJSON(event);
+ }
return false;
}
-
+
@Override
- public long getLastFlushTime() {
- return 0;
+ public void start() {
+ // intentionally left empty
}
@Override
- public void flush() {
+ public void stop() {
// intentionally left empty
}
+
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
index 44e94ad..d6f87cf 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
@@ -18,6 +18,7 @@
*/
package org.apache.ranger.audit.provider;
+import org.apache.ranger.audit.model.AuditEventBase;
public interface LogDestination<T> {
public void start();
@@ -26,9 +27,20 @@ public interface LogDestination<T> {
boolean isAvailable();
- public boolean send(T log);
+ public boolean send(AuditEventBase log) throws AuditMessageException;
- public boolean sendStringified(String log);
+ public boolean send(AuditEventBase[] logs) throws AuditMessageException;
+
+ public boolean sendStringified(String log) throws AuditMessageException;
+
+ public boolean sendStringified(String[] logs) throws AuditMessageException;
public boolean flush();
+
+ /**
+ * Name for the destination
+ *
+ * @return
+ */
+ public String getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
index 17230b2..487da5a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -20,6 +20,12 @@ import java.io.File;
import java.net.InetAddress;
import java.rmi.dgc.VMID;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
import java.util.UUID;
import org.apache.log4j.helpers.LogLog;
@@ -28,88 +34,96 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class MiscUtil {
- public static final String TOKEN_START = "%";
- public static final String TOKEN_END = "%";
- public static final String TOKEN_HOSTNAME = "hostname";
- public static final String TOKEN_APP_TYPE = "app-type";
+ public static final String TOKEN_START = "%";
+ public static final String TOKEN_END = "%";
+ public static final String TOKEN_HOSTNAME = "hostname";
+ public static final String TOKEN_APP_TYPE = "app-type";
public static final String TOKEN_JVM_INSTANCE = "jvm-instance";
- public static final String TOKEN_TIME = "time:";
- public static final String TOKEN_PROPERTY = "property:";
- public static final String TOKEN_ENV = "env:";
- public static final String ESCAPE_STR = "\\";
+ public static final String TOKEN_TIME = "time:";
+ public static final String TOKEN_PROPERTY = "property:";
+ public static final String TOKEN_ENV = "env:";
+ public static final String ESCAPE_STR = "\\";
static VMID sJvmID = new VMID();
public static String LINE_SEPARATOR = System.getProperty("line.separator");
- private static Gson sGsonBuilder = null;
+ private static Gson sGsonBuilder = null;
private static String sApplicationType = null;
static {
try {
- sGsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS").create();
- } catch(Throwable excp) {
- LogLog.warn("failed to create GsonBuilder object. stringigy() will return obj.toString(), instead of Json", excp);
+ sGsonBuilder = new GsonBuilder().setDateFormat(
+ "yyyy-MM-dd HH:mm:ss.SSS").create();
+ } catch (Throwable excp) {
+ LogLog.warn(
+ "failed to create GsonBuilder object. stringigy() will return obj.toString(), instead of Json",
+ excp);
}
}
public static String replaceTokens(String str, long time) {
- if(str == null) {
+ if (str == null) {
return str;
}
- if(time <= 0) {
+ if (time <= 0) {
time = System.currentTimeMillis();
}
- for(int startPos = 0; startPos < str.length(); ) {
- int tagStartPos = str.indexOf(TOKEN_START, startPos);
-
- if(tagStartPos == -1) {
- break;
- }
-
- int tagEndPos = str.indexOf(TOKEN_END, tagStartPos + TOKEN_START.length());
-
- if(tagEndPos == -1) {
- break;
- }
-
- String tag = str.substring(tagStartPos, tagEndPos+TOKEN_END.length());
- String token = tag.substring(TOKEN_START.length(), tag.lastIndexOf(TOKEN_END));
- String val = "";
-
- if(token != null) {
- if(token.equals(TOKEN_HOSTNAME)) {
- val = getHostname();
- } else if(token.equals(TOKEN_APP_TYPE)) {
- val = getApplicationType();
- } else if(token.equals(TOKEN_JVM_INSTANCE)) {
- val = getJvmInstanceId();
- } else if(token.startsWith(TOKEN_PROPERTY)) {
- String propertyName = token.substring(TOKEN_PROPERTY.length());
-
- val = getSystemProperty(propertyName);
- } else if(token.startsWith(TOKEN_ENV)) {
- String envName = token.substring(TOKEN_ENV.length());
-
- val = getEnv(envName);
- } else if(token.startsWith(TOKEN_TIME)) {
- String dtFormat = token.substring(TOKEN_TIME.length());
-
- val = getFormattedTime(time, dtFormat);
- }
- }
-
- if(val == null) {
- val = "";
- }
-
- str = str.substring(0, tagStartPos) + val + str.substring(tagEndPos + TOKEN_END.length());
- startPos = tagStartPos + val.length();
- }
-
- return str;
+ for (int startPos = 0; startPos < str.length();) {
+ int tagStartPos = str.indexOf(TOKEN_START, startPos);
+
+ if (tagStartPos == -1) {
+ break;
+ }
+
+ int tagEndPos = str.indexOf(TOKEN_END,
+ tagStartPos + TOKEN_START.length());
+
+ if (tagEndPos == -1) {
+ break;
+ }
+
+ String tag = str.substring(tagStartPos,
+ tagEndPos + TOKEN_END.length());
+ String token = tag.substring(TOKEN_START.length(),
+ tag.lastIndexOf(TOKEN_END));
+ String val = "";
+
+ if (token != null) {
+ if (token.equals(TOKEN_HOSTNAME)) {
+ val = getHostname();
+ } else if (token.equals(TOKEN_APP_TYPE)) {
+ val = getApplicationType();
+ } else if (token.equals(TOKEN_JVM_INSTANCE)) {
+ val = getJvmInstanceId();
+ } else if (token.startsWith(TOKEN_PROPERTY)) {
+ String propertyName = token.substring(TOKEN_PROPERTY
+ .length());
+
+ val = getSystemProperty(propertyName);
+ } else if (token.startsWith(TOKEN_ENV)) {
+ String envName = token.substring(TOKEN_ENV.length());
+
+ val = getEnv(envName);
+ } else if (token.startsWith(TOKEN_TIME)) {
+ String dtFormat = token.substring(TOKEN_TIME.length());
+
+ val = getFormattedTime(time, dtFormat);
+ }
+ }
+
+ if (val == null) {
+ val = "";
+ }
+
+ str = str.substring(0, tagStartPos) + val
+ + str.substring(tagEndPos + TOKEN_END.length());
+ startPos = tagStartPos + val.length();
+ }
+
+ return str;
}
public static String getHostname() {
@@ -142,7 +156,8 @@ public class MiscUtil {
String ret = null;
try {
- ret = propertyName != null ? System.getProperty(propertyName) : null;
+ ret = propertyName != null ? System.getProperty(propertyName)
+ : null;
} catch (Exception excp) {
LogLog.warn("getSystemProperty(" + propertyName + ") failed", excp);
}
@@ -166,9 +181,9 @@ public class MiscUtil {
String ret = null;
try {
- SimpleDateFormat sdf = new SimpleDateFormat(format);
+ SimpleDateFormat sdf = new SimpleDateFormat(format);
- ret = sdf.format(time);
+ ret = sdf.format(time);
} catch (Exception excp) {
LogLog.warn("SimpleDateFormat.format() failed: " + format, excp);
}
@@ -177,15 +192,16 @@ public class MiscUtil {
}
public static void createParents(File file) {
- if(file != null) {
+ if (file != null) {
String parentName = file.getParent();
if (parentName != null) {
File parentDir = new File(parentName);
- if(!parentDir.exists()) {
- if(! parentDir.mkdirs()) {
- LogLog.warn("createParents(): failed to create " + parentDir.getAbsolutePath());
+ if (!parentDir.exists()) {
+ if (!parentDir.mkdirs()) {
+ LogLog.warn("createParents(): failed to create "
+ + parentDir.getAbsolutePath());
}
}
}
@@ -195,14 +211,16 @@ public class MiscUtil {
public static long getNextRolloverTime(long lastRolloverTime, long interval) {
long now = System.currentTimeMillis() / 1000 * 1000; // round to second
- if(lastRolloverTime <= 0) {
- // should this be set to the next multiple-of-the-interval from start of the day?
+ if (lastRolloverTime <= 0) {
+ // should this be set to the next multiple-of-the-interval from
+ // start of the day?
return now + interval;
- } else if(lastRolloverTime <= now) {
+ } else if (lastRolloverTime <= now) {
long nextRolloverTime = now + interval;
// keep it at 'interval' boundary
- long trimInterval = (nextRolloverTime - lastRolloverTime) % interval;
+ long trimInterval = (nextRolloverTime - lastRolloverTime)
+ % interval;
return nextRolloverTime - trimInterval;
} else {
@@ -211,23 +229,24 @@ public class MiscUtil {
}
public static long getRolloverStartTime(long nextRolloverTime, long interval) {
- return (nextRolloverTime <= interval) ? System.currentTimeMillis() : nextRolloverTime - interval;
+ return (nextRolloverTime <= interval) ? System.currentTimeMillis()
+ : nextRolloverTime - interval;
}
public static int parseInteger(String str, int defValue) {
int ret = defValue;
- if(str != null) {
+ if (str != null) {
try {
ret = Integer.parseInt(str);
- } catch(Exception excp) {
+ } catch (Exception excp) {
// ignore
}
}
return ret;
}
-
+
public static String generateUniqueId() {
return UUID.randomUUID().toString();
}
@@ -235,10 +254,10 @@ public class MiscUtil {
public static <T> String stringify(T log) {
String ret = null;
- if(log != null) {
- if(log instanceof String) {
- ret = (String)log;
- } else if(MiscUtil.sGsonBuilder != null) {
+ if (log != null) {
+ if (log instanceof String) {
+ ret = (String) log;
+ } else if (MiscUtil.sGsonBuilder != null) {
ret = MiscUtil.sGsonBuilder.toJson(log);
} else {
ret = log.toString();
@@ -247,4 +266,114 @@ public class MiscUtil {
return ret;
}
+
+ static public <T> T fromJson(String jsonStr, Class<T> clazz) {
+ return sGsonBuilder.fromJson(jsonStr, clazz);
+ }
+
+ public static String getStringProperty(Properties props, String propName) {
+ String ret = null;
+
+ if (props != null && propName != null) {
+ String val = props.getProperty(propName);
+ if (val != null) {
+ ret = val;
+ }
+ }
+
+ return ret;
+ }
+
+ public static boolean getBooleanProperty(Properties props, String propName,
+ boolean defValue) {
+ boolean ret = defValue;
+
+ if (props != null && propName != null) {
+ String val = props.getProperty(propName);
+
+ if (val != null) {
+ ret = Boolean.valueOf(val);
+ }
+ }
+
+ return ret;
+ }
+
+ public static int getIntProperty(Properties props, String propName,
+ int defValue) {
+ int ret = defValue;
+
+ if (props != null && propName != null) {
+ String val = props.getProperty(propName);
+ if (val != null) {
+ try {
+ ret = Integer.parseInt(val);
+ } catch (NumberFormatException excp) {
+ ret = defValue;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ public static long getLongProperty(Properties props, String propName,
+ long defValue) {
+ long ret = defValue;
+
+ if (props != null && propName != null) {
+ String val = props.getProperty(propName);
+ if (val != null) {
+ try {
+ ret = Long.parseLong(val);
+ } catch (NumberFormatException excp) {
+ ret = defValue;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ public static Map<String, String> getPropertiesWithPrefix(Properties props,
+ String prefix) {
+ Map<String, String> prefixedProperties = new HashMap<String, String>();
+
+ if (props != null && prefix != null) {
+ for (String key : props.stringPropertyNames()) {
+ if (key == null) {
+ continue;
+ }
+
+ String val = props.getProperty(key);
+
+ if (key.startsWith(prefix)) {
+ key = key.substring(prefix.length());
+
+ if (key == null) {
+ continue;
+ }
+
+ prefixedProperties.put(key, val);
+ }
+ }
+ }
+
+ return prefixedProperties;
+ }
+
+ /**
+ * @param destListStr
+ * @param delim
+ * @return
+ */
+ public static List<String> toArray(String destListStr, String delim) {
+ List<String> list = new ArrayList<String>();
+ StringTokenizer tokenizer = new StringTokenizer(destListStr, delim);
+ while (tokenizer.hasMoreTokens()) {
+ list.add(tokenizer.nextToken());
+ }
+ return list;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 1eec345..57ac0a0 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -18,6 +18,7 @@
package org.apache.ranger.audit.provider;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Properties;
@@ -25,14 +26,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
-
public class MultiDestAuditProvider extends BaseAuditProvider {
- private static final Log LOG = LogFactory.getLog(MultiDestAuditProvider.class);
+ private static final Log LOG = LogFactory
+ .getLog(MultiDestAuditProvider.class);
protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>();
-
public MultiDestAuditProvider() {
LOG.info("MultiDestAuditProvider: creating..");
}
@@ -47,109 +47,166 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
super.init(props);
- for(AuditProvider provider : mProviders) {
- try {
- provider.init(props);
- } catch(Throwable excp) {
- LOG.info("MultiDestAuditProvider.init(): failed " + provider.getClass().getCanonicalName() + ")", excp);
- }
- }
+ for (AuditProvider provider : mProviders) {
+ try {
+ provider.init(props);
+ } catch (Throwable excp) {
+ LOG.info("MultiDestAuditProvider.init(): failed "
+ + provider.getClass().getCanonicalName() + ")", excp);
+ }
+ }
}
public void addAuditProvider(AuditProvider provider) {
- if(provider != null) {
- LOG.info("MultiDestAuditProvider.addAuditProvider(providerType=" + provider.getClass().getCanonicalName() + ")");
+ if (provider != null) {
+ LOG.info("MultiDestAuditProvider.addAuditProvider(providerType="
+ + provider.getClass().getCanonicalName() + ")");
mProviders.add(provider);
}
}
public void addAuditProviders(List<AuditProvider> providers) {
- if(providers != null) {
- for(AuditProvider provider : providers) {
+ if (providers != null) {
+ for (AuditProvider provider : providers) {
addAuditProvider(provider);
}
}
}
@Override
- public void log(AuditEventBase event) {
- for(AuditProvider provider : mProviders) {
- try {
- provider.log(event);
- } catch(Throwable excp) {
- logFailedEvent(event, excp);
- }
- }
+ public boolean log(AuditEventBase event) {
+ for (AuditProvider provider : mProviders) {
+ try {
+ provider.log(event);
+ } catch (Throwable excp) {
+ logFailedEvent(event, excp);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditProvider provider : mProviders) {
+ try {
+ provider.log(events);
+ } catch (Throwable excp) {
+ logFailedEvent(events, excp);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(String event) {
+ for (AuditProvider provider : mProviders) {
+ try {
+ provider.logJSON(event);
+ } catch (Throwable excp) {
+ logFailedEventJSON(event, excp);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ for (AuditProvider provider : mProviders) {
+ try {
+ provider.logJSON(events);
+ } catch (Throwable excp) {
+ logFailedEventJSON(events, excp);
+ }
+ }
+ return true;
}
@Override
public void start() {
- for(AuditProvider provider : mProviders) {
- try {
+ for (AuditProvider provider : mProviders) {
+ try {
provider.start();
- } catch(Throwable excp) {
- LOG.error("AsyncAuditProvider.start(): failed for provider { " + provider.getClass().getName() + " }", excp);
- }
+ } catch (Throwable excp) {
+ LOG.error("AsyncAuditProvider.start(): failed for provider { "
+ + provider.getClass().getName() + " }", excp);
+ }
}
}
@Override
public void stop() {
- for(AuditProvider provider : mProviders) {
+ for (AuditProvider provider : mProviders) {
try {
provider.stop();
- } catch(Throwable excp) {
- LOG.error("AsyncAuditProvider.stop(): failed for provider { " + provider.getClass().getName() + " }", excp);
+ } catch (Throwable excp) {
+ LOG.error("AsyncAuditProvider.stop(): failed for provider { "
+ + provider.getClass().getName() + " }", excp);
}
}
}
@Override
- public void waitToComplete() {
- for(AuditProvider provider : mProviders) {
+ public void waitToComplete() {
+ for (AuditProvider provider : mProviders) {
try {
provider.waitToComplete();
- } catch(Throwable excp) {
- LOG.error("AsyncAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName() + " }", excp);
+ } catch (Throwable excp) {
+ LOG.error(
+ "AsyncAuditProvider.waitToComplete(): failed for provider { "
+ + provider.getClass().getName() + " }", excp);
+ }
+ }
+ }
+
+ @Override
+ public void waitToComplete(long timeout) {
+ for (AuditProvider provider : mProviders) {
+ try {
+ provider.waitToComplete(timeout);
+ } catch (Throwable excp) {
+ LOG.error(
+ "AsyncAuditProvider.waitToComplete(): failed for provider { "
+ + provider.getClass().getName() + " }", excp);
}
}
}
-
+
@Override
public boolean isFlushPending() {
- for(AuditProvider provider : mProviders) {
- if(provider.isFlushPending()) {
+ for (AuditProvider provider : mProviders) {
+ if (provider.isFlushPending()) {
return true;
}
}
-
+
return false;
}
-
+
@Override
public long getLastFlushTime() {
long lastFlushTime = 0;
- for(AuditProvider provider : mProviders) {
+ for (AuditProvider provider : mProviders) {
long flushTime = provider.getLastFlushTime();
-
- if(flushTime != 0) {
- if(lastFlushTime == 0 || lastFlushTime > flushTime) {
+
+ if (flushTime != 0) {
+ if (lastFlushTime == 0 || lastFlushTime > flushTime) {
lastFlushTime = flushTime;
}
}
}
-
+
return lastFlushTime;
}
-
+
@Override
public void flush() {
- for(AuditProvider provider : mProviders) {
+ for (AuditProvider provider : mProviders) {
try {
provider.flush();
- } catch(Throwable excp) {
- LOG.error("AsyncAuditProvider.flush(): failed for provider { " + provider.getClass().getName() + " }", excp);
+ } catch (Throwable excp) {
+ LOG.error("AsyncAuditProvider.flush(): failed for provider { "
+ + provider.getClass().getName() + " }", excp);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
index 620951c..a18e3e9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
@@ -22,7 +22,6 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
import org.apache.ranger.audit.provider.BufferedAuditProvider;
import org.apache.ranger.audit.provider.DebugTracer;
import org.apache.ranger.audit.provider.LocalFileLogBuffer;
@@ -44,7 +43,7 @@ public class HdfsAuditProvider extends BufferedAuditProvider {
super.init(props);
- Map<String, String> hdfsProps = BaseAuditProvider.getPropertiesWithPrefix(props, "xasecure.audit.hdfs.config.");
+ Map<String, String> hdfsProps = MiscUtil.getPropertiesWithPrefix(props, "xasecure.audit.hdfs.config.");
String encoding = hdfsProps.get("encoding");
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
index 6b5cb4b..49f4e65 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.provider.DebugTracer;
import org.apache.ranger.audit.provider.LogDestination;
import org.apache.ranger.audit.provider.MiscUtil;
@@ -36,6 +37,8 @@ import org.apache.ranger.audit.provider.MiscUtil;
public class HdfsLogDestination<T> implements LogDestination<T> {
public final static String EXCP_MSG_FILESYSTEM_CLOSED = "Filesystem closed";
+ private String name = getClass().getName();
+
private String mDirectory = null;
private String mFile = null;
private int mFlushIntervalSeconds = 1 * 60;
@@ -57,6 +60,20 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
mLogger = tracer;
}
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.ranger.audit.provider.LogDestination#getName()
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
public String getDirectory() {
return mDirectory;
}
@@ -133,11 +150,11 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
}
@Override
- public boolean send(T log) {
- boolean ret = false;
+ public boolean send(AuditEventBase log) {
+ boolean ret = true;
if(log != null) {
- String msg = log.toString();
+ String msg = MiscUtil.stringify(log);
ret = sendStringified(msg);
}
@@ -145,6 +162,18 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
return ret;
}
+
+ @Override
+ public boolean send(AuditEventBase[] logs) {
+ for(int i = 0; i < logs.length; i++) {
+ boolean ret = send(logs[i]);
+ if(!ret) {
+ return ret;
+ }
+ }
+ return true;
+ }
+
@Override
public boolean sendStringified(String log) {
boolean ret = false;
@@ -169,6 +198,18 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
}
@Override
+ public boolean sendStringified(String[] logs) {
+ for(int i = 0; i < logs.length; i++) {
+ boolean ret = sendStringified(logs[i]);
+ if(!ret) {
+ return ret;
+ }
+ }
+ return true;
+ }
+
+
+ @Override
public boolean flush() {
mLogger.debug("==> HdfsLogDestination.flush()");
@@ -448,4 +489,5 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
return sb.toString();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
index 0ec8790..5f39e69 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
@@ -16,6 +16,7 @@
*/
package org.apache.ranger.audit.provider.kafka;
+import java.util.Collection;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
@@ -46,12 +47,12 @@ public class KafkaAuditProvider extends BaseAuditProvider {
LOG.info("init() called");
super.init(props);
- setMaxQueueSize(BaseAuditProvider.getIntProperty(props,
- AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT));
- setMaxFlushInterval(BaseAuditProvider.getIntProperty(props,
+ setMaxQueueSize(MiscUtil.getIntProperty(props,
+ AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
+ setMaxBatchInterval(MiscUtil.getIntProperty(props,
AUDIT_MAX_QUEUE_SIZE_PROP,
- AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT));
- topic = BaseAuditProvider.getStringProperty(props,
+ AUDIT_BATCH_INTERVAL_DEFAULT_MS));
+ topic = MiscUtil.getStringProperty(props,
AUDIT_KAFKA_TOPIC_NAME);
if (topic == null || topic.isEmpty()) {
topic = "ranger_audits";
@@ -59,7 +60,7 @@ public class KafkaAuditProvider extends BaseAuditProvider {
try {
if (!initDone) {
- String brokerList = BaseAuditProvider.getStringProperty(props,
+ String brokerList = MiscUtil.getStringProperty(props,
AUDIT_KAFKA_BROKER_LIST);
if (brokerList == null || brokerList.isEmpty()) {
brokerList = "localhost:9092";
@@ -87,7 +88,7 @@ public class KafkaAuditProvider extends BaseAuditProvider {
}
@Override
- public void log(AuditEventBase event) {
+ public boolean log(AuditEventBase event) {
if (event instanceof AuthzAuditEvent) {
AuthzAuditEvent authzEvent = (AuthzAuditEvent) event;
@@ -118,7 +119,32 @@ public class KafkaAuditProvider extends BaseAuditProvider {
} catch (Throwable t) {
LOG.error("Error sending message to Kafka topic. topic=" + topic
+ ", message=" + message, t);
+ return false;
}
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
+ }
+
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ for (String event : events) {
+ logJSON(event);
+ }
+ return false;
}
@Override
@@ -143,8 +169,10 @@ public class KafkaAuditProvider extends BaseAuditProvider {
@Override
public void waitToComplete() {
LOG.info("waitToComplete() called");
- // TODO Auto-generated method stub
-
+ }
+
+ @Override
+ public void waitToComplete(long timeout) {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
index 1b463e6..9ee4ec0 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
@@ -19,6 +19,7 @@
package org.apache.ranger.audit.provider.solr;
+import java.util.Collection;
import java.util.Date;
import java.util.Properties;
@@ -55,12 +56,12 @@ public class SolrAuditProvider extends BaseAuditProvider {
LOG.info("init() called");
super.init(props);
- setMaxQueueSize(BaseAuditProvider.getIntProperty(props,
- AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT));
- setMaxFlushInterval(BaseAuditProvider.getIntProperty(props,
+ setMaxQueueSize(MiscUtil.getIntProperty(props,
+ AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
+ setMaxBatchInterval(MiscUtil.getIntProperty(props,
AUDIT_MAX_QUEUE_SIZE_PROP,
- AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT));
- retryWaitTime = BaseAuditProvider.getIntProperty(props,
+ AUDIT_BATCH_INTERVAL_DEFAULT_MS));
+ retryWaitTime = MiscUtil.getIntProperty(props,
AUDIT_RETRY_WAIT_PROP, retryWaitTime);
}
@@ -68,7 +69,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
if (solrClient == null) {
synchronized (lock) {
if (solrClient == null) {
- String solrURL = BaseAuditProvider.getStringProperty(props,
+ String solrURL = MiscUtil.getStringProperty(props,
"xasecure.audit.solr.solr_url");
if (lastConnectTime != null) {
@@ -118,11 +119,11 @@ public class SolrAuditProvider extends BaseAuditProvider {
* audit.model.AuditEventBase)
*/
@Override
- public void log(AuditEventBase event) {
+ public boolean log(AuditEventBase event) {
if (!(event instanceof AuthzAuditEvent)) {
LOG.error(event.getClass().getName()
+ " audit event class type is not supported");
- return;
+ return false;
}
AuthzAuditEvent authzEvent = (AuthzAuditEvent) event;
// TODO: This should be done at a higher level
@@ -144,7 +145,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
connect();
if (solrClient == null) {
// Solr is still not initialized. So need to throw error
- return;
+ return false;
}
}
@@ -155,7 +156,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
LOG.debug("Ignore sending audit. lastConnect=" + diff
+ " ms");
}
- return;
+ return false;
}
}
// Convert AuditEventBase to Solr document
@@ -176,8 +177,32 @@ public class SolrAuditProvider extends BaseAuditProvider {
} catch (Throwable t) {
LOG.error("Error sending message to Solr", t);
+ return false;
}
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ log(event);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
+ }
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ for (String event : events) {
+ logJSON(event);
+ }
+ return false;
}
/*
@@ -208,10 +233,15 @@ public class SolrAuditProvider extends BaseAuditProvider {
*/
@Override
public void waitToComplete() {
- // TODO Auto-generated method stub
}
+
+ @Override
+ public void waitToComplete(long timeout) {
+
+ }
+
/*
* (non-Javadoc)
*
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/security-admin/.gitignore
----------------------------------------------------------------------
diff --git a/security-admin/.gitignore b/security-admin/.gitignore
index 4a3ed53..bf7dc37 100644
--- a/security-admin/.gitignore
+++ b/security-admin/.gitignore
@@ -3,3 +3,6 @@
/bin/
/target
.settings/
+.pydevproject
+log4j.xml
+*.log
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/security-admin/pom.xml
----------------------------------------------------------------------
diff --git a/security-admin/pom.xml b/security-admin/pom.xml
index 3220886..286740c 100644
--- a/security-admin/pom.xml
+++ b/security-admin/pom.xml
@@ -251,6 +251,8 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
@@ -421,13 +423,15 @@
<additionalClasspathElement>${project.basedir}/src/main/webapp/META-INF</additionalClasspathElement>
</additionalClasspathElements>
</configuration>
+<!--
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
- <version>2.17</version>
+ <version>2.18</version>
</dependency>
</dependencies>
+-->
</plugin>
</plugins>