You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/22 19:23:29 UTC
[12/12] incubator-ranger git commit: RANGER-397 - Implement reliable
streaming audits to configurable destinations - Incorporate Review Feedback
RANGER-397 - Implement reliable streaming audits to configurable
destinations - Incorporate Review Feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/4f3cea22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/4f3cea22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/4f3cea22
Branch: refs/heads/master
Commit: 4f3cea223b9bb717577732bf050bc78f16e94a69
Parents: 42a0e25
Author: Don Bosco Durai <bo...@apache.org>
Authored: Wed Apr 22 09:49:01 2015 -0700
Committer: Don Bosco Durai <bo...@apache.org>
Committed: Wed Apr 22 09:49:01 2015 -0700
----------------------------------------------------------------------
.../audit/destination/AuditDestination.java | 32 +-
.../audit/destination/FileAuditDestination.java | 33 +-
.../audit/destination/HDFSAuditDestination.java | 50 ++-
.../ranger/audit/model/AuditEventBase.java | 4 +-
.../audit/provider/AsyncAuditProvider.java | 60 +--
.../ranger/audit/provider/AuditHandler.java | 46 ++
.../ranger/audit/provider/AuditProvider.java | 56 ---
.../audit/provider/AuditProviderFactory.java | 142 +++---
.../ranger/audit/provider/BaseAuditHandler.java | 271 ++++++++++++
.../audit/provider/BaseAuditProvider.java | 432 -------------------
.../audit/provider/BufferedAuditProvider.java | 12 +-
.../ranger/audit/provider/DbAuditProvider.java | 10 -
.../audit/provider/DummyAuditProvider.java | 35 +-
.../audit/provider/Log4jAuditProvider.java | 2 -
.../audit/provider/MultiDestAuditProvider.java | 59 +--
.../provider/kafka/KafkaAuditProvider.java | 22 +-
.../audit/provider/solr/SolrAuditProvider.java | 33 +-
.../ranger/audit/queue/AuditAsyncQueue.java | 34 +-
.../ranger/audit/queue/AuditBatchQueue.java | 26 +-
.../ranger/audit/queue/AuditFileSpool.java | 57 ++-
.../apache/ranger/audit/queue/AuditQueue.java | 174 ++++++++
.../ranger/audit/queue/AuditSummaryQueue.java | 49 +--
.../apache/ranger/audit/test/TestEvents.java | 4 +-
.../org/apache/ranger/audit/TestAuditQueue.java | 98 +++--
.../org/apache/ranger/audit/TestConsumer.java | 46 +-
25 files changed, 815 insertions(+), 972 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
index 25c0220..9db8937 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
@@ -23,13 +23,13 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
/**
* This class needs to be extended by anyone who wants to build custom
* destination
*/
-public abstract class AuditDestination extends BaseAuditProvider {
+public abstract class AuditDestination extends BaseAuditHandler {
private static final Log logger = LogFactory.getLog(AuditDestination.class);
public AuditDestination() {
@@ -51,21 +51,31 @@ public abstract class AuditDestination extends BaseAuditProvider {
/*
* (non-Javadoc)
*
- * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ * @see org.apache.ranger.audit.provider.AuditProvider#flush()
*/
@Override
- public boolean isFlushPending() {
- return false;
+ public void flush() {
+
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#flush()
- */
@Override
- public void flush() {
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+ @Override
+ public void waitToComplete() {
+
}
+ @Override
+ public void waitToComplete(long timeout) {
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
index 1ccfd5f..a132cdf 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
@@ -21,9 +21,7 @@ package org.apache.ranger.audit.destination;
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileWriter;
-import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
@@ -107,7 +105,12 @@ public class FileAuditDestination extends AuditDestination {
}
@Override
- public boolean logJSON(Collection<String> events) {
+ synchronized public boolean logJSON(Collection<String> events) {
+ if (isStopped) {
+ logError("log() called after stop was requested. name=" + getName());
+ return false;
+ }
+
try {
PrintWriter out = getLogFileStream();
for (String event : events) {
@@ -128,7 +131,7 @@ public class FileAuditDestination extends AuditDestination {
* org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
*/
@Override
- synchronized public boolean log(Collection<AuditEventBase> events) {
+ public boolean log(Collection<AuditEventBase> events) {
if (isStopped) {
logError("log() called after stop was requested. name=" + getName());
return false;
@@ -158,11 +161,16 @@ public class FileAuditDestination extends AuditDestination {
@Override
synchronized public void stop() {
+ isStopped = true;
if (logWriter != null) {
- logWriter.flush();
- logWriter.close();
+ try {
+ logWriter.flush();
+ logWriter.close();
+ } catch (Throwable t) {
+ logger.error("Error on closing log writter. Exception will be ignored. name="
+ + getName() + ", fileName=" + currentFileName);
+ }
logWriter = null;
- isStopped = true;
}
}
@@ -214,15 +222,20 @@ public class FileAuditDestination extends AuditDestination {
return logWriter;
}
- private void closeFileIfNeeded() throws FileNotFoundException, IOException {
+ private void closeFileIfNeeded() {
if (logWriter == null) {
return;
}
if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
logger.info("Closing file. Rolling over. name=" + getName()
+ ", fileName=" + currentFileName);
- logWriter.flush();
- logWriter.close();
+ try {
+ logWriter.flush();
+ logWriter.close();
+ } catch (Throwable t) {
+ logger.error("Error on closing log writter. Exception will be ignored. name="
+ + getName() + ", fileName=" + currentFileName);
+ }
logWriter = null;
currentFileName = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 706eb8e..6ca4fce 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -74,6 +74,12 @@ public class HDFSAuditDestination extends AuditDestination {
// Initial folder and file properties
String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ "." + PROP_HDFS_DIR);
+ if (logFolderProp == null || logFolderProp.isEmpty()) {
+ logger.fatal("File destination folder is not configured. Please set "
+ + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
+ return;
+ }
+
String logSubFolder = MiscUtil.getStringProperty(props, propPrefix
+ "." + PROP_HDFS_SUBDIR);
if (logSubFolder == null || logSubFolder.isEmpty()) {
@@ -89,12 +95,6 @@ public class HDFSAuditDestination extends AuditDestination {
logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log";
}
- if (logFolderProp == null || logFolderProp.isEmpty()) {
- logger.fatal("File destination folder is not configured. Please set "
- + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
- return;
- }
-
logFolder = logFolderProp + "/" + logSubFolder;
logger.info("logFolder=" + logFolder + ", destName=" + getName());
logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
@@ -104,7 +104,12 @@ public class HDFSAuditDestination extends AuditDestination {
}
@Override
- public boolean logJSON(Collection<String> events) {
+ synchronized public boolean logJSON(Collection<String> events) {
+ if (isStopped) {
+ logError("log() called after stop was requested. name=" + getName());
+ return false;
+ }
+
try {
PrintWriter out = getLogFileStream();
for (String event : events) {
@@ -125,7 +130,7 @@ public class HDFSAuditDestination extends AuditDestination {
* org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
*/
@Override
- synchronized public boolean log(Collection<AuditEventBase> events) {
+ public boolean log(Collection<AuditEventBase> events) {
if (isStopped) {
logError("log() called after stop was requested. name=" + getName());
return false;
@@ -155,15 +160,16 @@ public class HDFSAuditDestination extends AuditDestination {
@Override
synchronized public void stop() {
- try {
- if (logWriter != null) {
+ isStopped = true;
+ if (logWriter != null) {
+ try {
logWriter.flush();
logWriter.close();
- logWriter = null;
- isStopped = true;
+ } catch (Throwable t) {
+ logger.error("Error on closing log writter. Exception will be ignored. name="
+ + getName() + ", fileName=" + currentFileName);
}
- } catch (Throwable t) {
- logger.error("Error closing HDFS file.", t);
+ logWriter = null;
}
}
@@ -198,9 +204,11 @@ public class HDFSAuditDestination extends AuditDestination {
String extension = defaultPath.substring(lastDot);
fullPath = baseName + "." + i + extension;
hdfPath = new Path(fullPath);
- logger.info("Checking whether log file exists. hdfPath=" + fullPath);
+ logger.info("Checking whether log file exists. hdfPath="
+ + fullPath);
}
- logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath);
+ logger.info("Log file doesn't exists. Will create and use it. hdfPath="
+ + fullPath);
// Create parent folders
createParents(hdfPath, fileSystem);
@@ -234,8 +242,14 @@ public class HDFSAuditDestination extends AuditDestination {
if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
logger.info("Closing file. Rolling over. name=" + getName()
+ ", fileName=" + currentFileName);
- logWriter.flush();
- logWriter.close();
+ try {
+ logWriter.flush();
+ logWriter.close();
+ } catch (Throwable t) {
+ logger.error("Error on closing log writter. Exception will be ignored. name="
+ + getName() + ", fileName=" + currentFileName);
+ }
+
logWriter = null;
currentFileName = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
index 39a2578..2c6a87f 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
@@ -32,8 +32,8 @@ public abstract class AuditEventBase {
public abstract String getEventKey();
public abstract Date getEventTime ();
- public abstract void setEventCount(long frequencyCount);
- public abstract void setEventDurationMS(long frequencyDurationMS);
+ public abstract void setEventCount(long eventCount);
+ public abstract void setEventDurationMS(long eventDurationMS);
protected String trim(String str, int len) {
String ret = str;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index 53adc86..c3a0c78 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -68,7 +68,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
mQueue = new ArrayBlockingQueue<AuditEventBase>(mMaxQueueSize);
}
- public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditProvider provider) {
+ public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider) {
this(name, maxQueueSize, maxFlushInterval);
addAuditProvider(provider);
@@ -174,21 +174,21 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
while(ret == null) {
logSummaryIfRequired();
- if (mMaxFlushInterval > 0 && isFlushPending()) {
- long timeTillNextFlush = getTimeTillNextFlush();
-
- if (timeTillNextFlush <= 0) {
- break; // force flush
- }
-
- ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS);
- } else {
+// if (mMaxFlushInterval > 0 && isFlushPending()) {
+// long timeTillNextFlush = getTimeTillNextFlush();
+//
+// if (timeTillNextFlush <= 0) {
+// break; // force flush
+// }
+//
+// ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS);
+// } else {
// Let's wake up for summary logging
long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime);
waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime;
ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS);
- }
+// }
}
if(ret != null) {
@@ -246,23 +246,23 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements
LOG.debug("<== AsyncAuditProvider.waitToComplete()");
}
- private long getTimeTillNextFlush() {
- long timeTillNextFlush = mMaxFlushInterval;
-
- if (mMaxFlushInterval > 0) {
- long lastFlushTime = getLastFlushTime();
-
- if (lastFlushTime != 0) {
- long timeSinceLastFlush = System.currentTimeMillis()
- - lastFlushTime;
-
- if (timeSinceLastFlush >= mMaxFlushInterval)
- timeTillNextFlush = 0;
- else
- timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush;
- }
- }
-
- return timeTillNextFlush;
- }
+// private long getTimeTillNextFlush() {
+// long timeTillNextFlush = mMaxFlushInterval;
+//
+// if (mMaxFlushInterval > 0) {
+// long lastFlushTime = getLastFlushTime();
+//
+// if (lastFlushTime != 0) {
+// long timeSinceLastFlush = System.currentTimeMillis()
+// - lastFlushTime;
+//
+// if (timeSinceLastFlush >= mMaxFlushInterval)
+// timeTillNextFlush = 0;
+// else
+// timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush;
+// }
+// }
+//
+// return timeTillNextFlush;
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
new file mode 100644
index 0000000..7b51f1d
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.ranger.audit.model.AuditEventBase;
+
+public interface AuditHandler {
+ public boolean log(AuditEventBase event);
+ public boolean log(Collection<AuditEventBase> events);
+
+ public boolean logJSON(String event);
+ public boolean logJSON(Collection<String> events);
+
+ public void init(Properties prop);
+ public void init(Properties prop, String basePropertyName);
+ public void start();
+ public void stop();
+ public void waitToComplete();
+ public void waitToComplete(long timeout);
+
+ /**
+ * Name for this provider. Used only during logging. Uniqueness is not guaranteed
+ */
+ public String getName();
+
+ public void flush();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
deleted file mode 100644
index 0e38624..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.util.Collection;
-import java.util.Properties;
-
-import org.apache.ranger.audit.model.AuditEventBase;
-
-public interface AuditProvider {
- public boolean log(AuditEventBase event);
- public boolean log(Collection<AuditEventBase> events);
-
- public boolean logJSON(String event);
- public boolean logJSON(Collection<String> events);
-
- public void init(Properties prop);
- public void init(Properties prop, String basePropertyName);
- public void start();
- public void stop();
- public void waitToComplete();
- public void waitToComplete(long timeout);
-
- /**
- * Name for this provider. Used only during logging. Uniqueness is not guaranteed
- */
- public String getName();
-
- /**
- * If this AuditProvider in the state of shutdown
- * @return
- */
- public boolean isDrain();
-
- public int getMaxBatchSize();
- public int getMaxBatchInterval();
- public boolean isFlushPending();
- public long getLastFlushTime();
- public void flush();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index a67f7e0..7b2b52b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -31,6 +31,7 @@ import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
import org.apache.ranger.audit.queue.AuditAsyncQueue;
import org.apache.ranger.audit.queue.AuditBatchQueue;
+import org.apache.ranger.audit.queue.AuditQueue;
import org.apache.ranger.audit.queue.AuditSummaryQueue;
/*
@@ -58,7 +59,7 @@ public class AuditProviderFactory {
private static AuditProviderFactory sFactory;
- private AuditProvider mProvider = null;
+ private AuditHandler mProvider = null;
private boolean mInitDone = false;
private AuditProviderFactory() {
@@ -79,11 +80,11 @@ public class AuditProviderFactory {
return sFactory;
}
- public static AuditProvider getAuditProvider() {
+ public static AuditHandler getAuditProvider() {
return AuditProviderFactory.getInstance().getProvider();
}
- public AuditProvider getProvider() {
+ public AuditHandler getProvider() {
return mProvider;
}
@@ -118,7 +119,7 @@ public class AuditProviderFactory {
boolean isAuditToSolrEnabled = MiscUtil.getBooleanProperty(props,
AUDIT_SOLR_IS_ENABLED_PROP, false);
- List<AuditProvider> providers = new ArrayList<AuditProvider>();
+ List<AuditHandler> providers = new ArrayList<AuditHandler>();
// TODO: Delete me
for (Object propNameObj : props.keySet()) {
@@ -150,17 +151,16 @@ public class AuditProviderFactory {
for (String destName : destNameList) {
String destPropPrefix = AUDIT_DEST_BASE + "." + destName;
- AuditProvider destProvider = getProviderFromConfig(props,
- destPropPrefix, destName);
+ AuditHandler destProvider = getProviderFromConfig(props,
+ destPropPrefix, destName, null);
if (destProvider != null) {
destProvider.init(props, destPropPrefix);
String queueName = MiscUtil.getStringProperty(props,
- destPropPrefix + "." + BaseAuditProvider.PROP_QUEUE);
+ destPropPrefix + "." + AuditQueue.PROP_QUEUE);
if (queueName == null || queueName.isEmpty()) {
- LOG.info(destPropPrefix + "."
- + BaseAuditProvider.PROP_QUEUE
+ LOG.info(destPropPrefix + "." + AuditQueue.PROP_QUEUE
+ " is not set. Setting queue to batch for "
+ destName);
queueName = "batch";
@@ -169,16 +169,15 @@ public class AuditProviderFactory {
if (queueName != null && !queueName.isEmpty()
&& !queueName.equalsIgnoreCase("none")) {
String queuePropPrefix = destPropPrefix + "." + queueName;
- AuditProvider queueProvider = getProviderFromConfig(props,
- queuePropPrefix, queueName);
+ AuditHandler queueProvider = getProviderFromConfig(props,
+ queuePropPrefix, queueName, destProvider);
if (queueProvider != null) {
- if (queueProvider instanceof BaseAuditProvider) {
- BaseAuditProvider qProvider = (BaseAuditProvider) queueProvider;
- qProvider.setConsumer(destProvider);
+ if (queueProvider instanceof AuditQueue) {
+ AuditQueue qProvider = (AuditQueue) queueProvider;
qProvider.init(props, queuePropPrefix);
providers.add(queueProvider);
} else {
- LOG.fatal("Provider queue doesn't extend BaseAuditProvider destination "
+ LOG.fatal("Provider queue doesn't extend AuditQueue. Destination="
+ destName
+ " can't be created. queueName="
+ queueName);
@@ -196,51 +195,51 @@ public class AuditProviderFactory {
}
if (providers.size() > 0) {
LOG.info("Using v3 audit configuration");
- AuditAsyncQueue asyncQueue = new AuditAsyncQueue();
- String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
- + "async";
- asyncQueue.init(props, propPrefix);
+ AuditHandler consumer = providers.get(0);
+
+ // Possible pipeline is:
+ // async_queue -> summary_queue -> multidestination -> batch_queue
+ // -> hdfs_destination
+ // -> batch_queue -> solr_destination
+ // -> batch_queue -> kafka_destination
+ // Above, up to multidestination, the providers are same, then it
+ // branches out in parallel.
+
+ // Set the providers in the reverse order e.g.
+
+ if (providers.size() > 1) {
+ // If there are more than one destination, then we need multi
+ // destination to process it in parallel
+ LOG.info("MultiDestAuditProvider is used. Destination count="
+ + providers.size());
+ MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
+ multiDestProvider.init(props);
+ multiDestProvider.addAuditProviders(providers);
+ consumer = multiDestProvider;
+ }
- propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX;
+ // Let's see if Summary is enabled, then summarize before sending it
+ // downstream
+ String propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX;
boolean summaryEnabled = MiscUtil.getBooleanProperty(props,
propPrefix + "." + "summary" + "." + "enabled", false);
AuditSummaryQueue summaryQueue = null;
if (summaryEnabled) {
LOG.info("AuditSummaryQueue is enabled");
- summaryQueue = new AuditSummaryQueue();
+ summaryQueue = new AuditSummaryQueue(consumer);
summaryQueue.init(props, propPrefix);
- asyncQueue.setConsumer(summaryQueue);
+ consumer = summaryQueue;
} else {
LOG.info("AuditSummaryQueue is disabled");
}
- if (providers.size() == 1) {
- if (summaryEnabled) {
- LOG.info("Setting " + providers.get(0).getName()
- + " as consumer to AuditSummaryQueue");
- summaryQueue.setConsumer(providers.get(0));
- } else {
- LOG.info("Setting " + providers.get(0).getName()
- + " as consumer to " + asyncQueue.getName());
- asyncQueue.setConsumer(providers.get(0));
- }
- } else {
- MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider();
- multiDestProvider.init(props);
- multiDestProvider.addAuditProviders(providers);
- if (summaryEnabled) {
- LOG.info("Setting " + multiDestProvider.getName()
- + " as consumer to AuditSummaryQueue");
- summaryQueue.setConsumer(multiDestProvider);
- } else {
- LOG.info("Setting " + multiDestProvider.getName()
- + " as consumer to " + asyncQueue.getName());
- asyncQueue.setConsumer(multiDestProvider);
- }
- }
+ // Create the AsysnQueue
+ AuditAsyncQueue asyncQueue = new AuditAsyncQueue(consumer);
+ propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "async";
+ asyncQueue.init(props, propPrefix);
mProvider = asyncQueue;
- LOG.info("Starting " + mProvider.getName());
+ LOG.info("Starting audit queue " + mProvider.getName());
mProvider.start();
} else {
LOG.info("No v3 audit configuration found. Trying v2 audit configurations");
@@ -315,9 +314,7 @@ public class AuditProviderFactory {
if (kafkaProvider.isAsync()) {
AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
- "MyKafkaAuditProvider",
- kafkaProvider.getMaxQueueSize(),
- kafkaProvider.getMaxBatchInterval(), kafkaProvider);
+ "MyKafkaAuditProvider", 1000, 1000, kafkaProvider);
providers.add(asyncProvider);
} else {
providers.add(kafkaProvider);
@@ -331,9 +328,7 @@ public class AuditProviderFactory {
if (solrProvider.isAsync()) {
AsyncAuditProvider asyncProvider = new AsyncAuditProvider(
- "MySolrAuditProvider",
- solrProvider.getMaxQueueSize(),
- solrProvider.getMaxBatchInterval(), solrProvider);
+ "MySolrAuditProvider", 1000, 1000, solrProvider);
providers.add(asyncProvider);
} else {
providers.add(solrProvider);
@@ -387,18 +382,26 @@ public class AuditProviderFactory {
Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
}
- private AuditProvider getProviderFromConfig(Properties props,
- String propPrefix, String providerName) {
- AuditProvider provider = null;
+ private AuditHandler getProviderFromConfig(Properties props,
+ String propPrefix, String providerName, AuditHandler consumer) {
+ AuditHandler provider = null;
String className = MiscUtil.getStringProperty(props, propPrefix + "."
- + BaseAuditProvider.PROP_CLASS_NAME);
+ + BaseAuditHandler.PROP_CLASS_NAME);
if (className != null && !className.isEmpty()) {
try {
- provider = (AuditProvider) Class.forName(className)
- .newInstance();
+ Class<?> handlerClass = Class.forName(className);
+ if (handlerClass.isAssignableFrom(AuditQueue.class)) {
+ // Queue class needs consumer
+ handlerClass.getDeclaredConstructor(AuditHandler.class)
+ .newInstance(consumer);
+ } else {
+ provider = (AuditHandler) Class.forName(className)
+ .newInstance();
+ }
} catch (Exception e) {
LOG.fatal("Can't instantiate audit class for providerName="
- + providerName + ", className=" + className, e);
+ + providerName + ", className=" + className
+ + ", propertyPrefix=" + propPrefix, e);
}
} else {
if (providerName.equals("file")) {
@@ -414,25 +417,32 @@ public class AuditProviderFactory {
} else if (providerName.equals("log4j")) {
provider = new Log4jAuditProvider();
} else if (providerName.equals("batch")) {
- provider = new AuditBatchQueue();
+ provider = new AuditBatchQueue(consumer);
} else if (providerName.equals("async")) {
- provider = new AuditAsyncQueue();
+ provider = new AuditAsyncQueue(consumer);
} else {
LOG.error("Provider name doesn't have any class associated with it. providerName="
- + providerName);
+ + providerName + ", propertyPrefix=" + propPrefix);
+ }
+ }
+ if (provider != null && provider instanceof AuditQueue) {
+ if (consumer == null) {
+ LOG.fatal("consumer can't be null for AuditQueue. queue="
+ + provider.getName() + ", propertyPrefix=" + propPrefix);
+ provider = null;
}
}
return provider;
}
- private AuditProvider getDefaultProvider() {
+ private AuditHandler getDefaultProvider() {
return new DummyAuditProvider();
}
private static class JVMShutdownHook extends Thread {
- AuditProvider mProvider;
+ AuditHandler mProvider;
- public JVMShutdownHook(AuditProvider provider) {
+ public JVMShutdownHook(AuditHandler provider) {
mProvider = provider;
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
new file mode 100644
index 0000000..601650e
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import com.google.gson.GsonBuilder;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public abstract class BaseAuditHandler implements AuditHandler {
+ private static final Log LOG = LogFactory.getLog(BaseAuditHandler.class);
+
+ private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
+
+ private int mLogFailureReportMinIntervalInMs = 60 * 1000;
+
+ private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
+ private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
+ private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
+
+ public static final String PROP_NAME = "name";
+ public static final String PROP_CLASS_NAME = "classname";
+
+ public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider";
+
+ protected String propPrefix = PROP_DEFAULT_PREFIX;
+
+ protected String providerName = null;
+
+ protected int failedRetryTimes = 3;
+ protected int failedRetrySleep = 3 * 1000;
+
+ int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+ long lastErrorLogMS = 0;
+
+ protected Properties props = null;
+
+ @Override
+ public void init(Properties props) {
+ init(props, null);
+ }
+
+ @Override
+ public void init(Properties props, String basePropertyName) {
+ LOG.info("BaseAuditProvider.init()");
+ this.props = props;
+ if (basePropertyName != null) {
+ propPrefix = basePropertyName;
+ }
+ LOG.info("propPrefix=" + propPrefix);
+ // Get final token
+ List<String> tokens = MiscUtil.toArray(propPrefix, ".");
+ String finalToken = tokens.get(tokens.size() - 1);
+
+ String name = MiscUtil.getStringProperty(props, basePropertyName + "."
+ + PROP_NAME);
+ if (name != null && !name.isEmpty()) {
+ providerName = name;
+ }
+ if (providerName == null) {
+ providerName = finalToken;
+ LOG.info("Using providerName from property prefix. providerName="
+ + providerName);
+ }
+ LOG.info("providerName=" + providerName);
+
+ try {
+ new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+ } catch (Throwable excp) {
+ LOG.warn(
+ "Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json",
+ excp);
+ }
+
+ mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props,
+ AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+ * audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ List<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+ eventList.add(event);
+ return log(eventList);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
+ */
+ @Override
+ public boolean logJSON(String event) {
+ AuditEventBase eventObj = MiscUtil.fromJson(event,
+ AuthzAuditEvent.class);
+ return log(eventObj);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
+ * )
+ */
+ @Override
+ public boolean logJSON(Collection<String> events) {
+ boolean ret = true;
+ for (String event : events) {
+ ret = logJSON(event);
+ if (!ret) {
+ break;
+ }
+ }
+ return ret;
+ }
+
+ public void setName(String name) {
+ providerName = name;
+ }
+
+ @Override
+ public String getName() {
+ return providerName;
+ }
+
+ public void logFailedEvent(AuditEventBase event) {
+ logFailedEvent(event, null);
+ }
+
+ public void logError(String msg) {
+ long currTimeMS = System.currentTimeMillis();
+ if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+ LOG.error(msg);
+ lastErrorLogMS = currTimeMS;
+ }
+ }
+
+ public void logError(String msg, Throwable ex) {
+ long currTimeMS = System.currentTimeMillis();
+ if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+ LOG.error(msg, ex);
+ lastErrorLogMS = currTimeMS;
+ }
+ }
+
+ public String getTimeDiffStr(long time1, long time2) {
+ long timeInMs = Math.abs(time1 - time2);
+ return formatIntervalForLog(timeInMs);
+ }
+
+ public String formatIntervalForLog(long timeInMs) {
+ long hours = timeInMs / (60 * 60 * 1000);
+ long minutes = (timeInMs / (60 * 1000)) % 60;
+ long seconds = (timeInMs % (60 * 1000)) / 1000;
+ long mSeconds = (timeInMs % (1000));
+
+ if (hours > 0)
+ return String.format("%02d:%02d:%02d.%03d hours", hours, minutes,
+ seconds, mSeconds);
+ else if (minutes > 0)
+ return String.format("%02d:%02d.%03d minutes", minutes, seconds,
+ mSeconds);
+ else if (seconds > 0)
+ return String.format("%02d.%03d seconds", seconds, mSeconds);
+ else
+ return String.format("%03d milli-seconds", mSeconds);
+ }
+
+ public void logFailedEvent(AuditEventBase event, Throwable excp) {
+ long now = System.currentTimeMillis();
+
+ long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+ long countSinceLastReport = mFailedLogCountSinceLastReport
+ .incrementAndGet();
+ long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+ if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+ mFailedLogLastReportTime.set(now);
+ mFailedLogCountSinceLastReport.set(0);
+
+ if (excp != null) {
+ LOG.warn(
+ "failed to log audit event: "
+ + MiscUtil.stringify(event), excp);
+ } else {
+ LOG.warn("failed to log audit event: "
+ + MiscUtil.stringify(event));
+ }
+
+ if (countLifeTime > 1) { // no stats to print for the 1st failure
+ LOG.warn("Log failure count: " + countSinceLastReport
+ + " in past "
+ + formatIntervalForLog(timeSinceLastReport) + "; "
+ + countLifeTime + " during process lifetime");
+ }
+ }
+ }
+
+ public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) {
+ for (AuditEventBase event : events) {
+ logFailedEvent(event, excp);
+ }
+ }
+
+ public void logFailedEventJSON(String event, Throwable excp) {
+ long now = System.currentTimeMillis();
+
+ long timeSinceLastReport = now - mFailedLogLastReportTime.get();
+ long countSinceLastReport = mFailedLogCountSinceLastReport
+ .incrementAndGet();
+ long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
+
+ if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
+ mFailedLogLastReportTime.set(now);
+ mFailedLogCountSinceLastReport.set(0);
+
+ if (excp != null) {
+ LOG.warn("failed to log audit event: " + event, excp);
+ } else {
+ LOG.warn("failed to log audit event: " + event);
+ }
+
+ if (countLifeTime > 1) { // no stats to print for the 1st failure
+ LOG.warn("Log failure count: " + countSinceLastReport
+ + " in past "
+ + formatIntervalForLog(timeSinceLastReport) + "; "
+ + countLifeTime + " during process lifetime");
+ }
+ }
+ }
+
+ public void logFailedEventJSON(Collection<String> events, Throwable excp) {
+ for (String event : events) {
+ logFailedEventJSON(event, excp);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
deleted file mode 100644
index 85c207b..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ranger.audit.provider;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.queue.AuditFileSpool;
-
-import com.google.gson.GsonBuilder;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-public abstract class BaseAuditProvider implements AuditProvider {
- private static final Log LOG = LogFactory.getLog(BaseAuditProvider.class);
-
- private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms";
- public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
- public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
- public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
-
- private AtomicLong lifeTimeInLogCount = new AtomicLong(0);
-
- private int mLogFailureReportMinIntervalInMs = 60 * 1000;
-
- private AtomicLong mFailedLogLastReportTime = new AtomicLong(0);
- private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0);
- private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0);
-
- public static final String PROP_NAME = "name";
- public static final String PROP_CLASS_NAME = "classname";
- public static final String PROP_QUEUE = "queue";
-
- public static final String PROP_BATCH_SIZE = "batch.size";
- public static final String PROP_QUEUE_SIZE = "queue.size";
- public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
-
- public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
- public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms";
- public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent";
-
- public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider";
-
- private boolean isDrain = false;
- private String providerName = null;
-
- private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
- private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
- private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
-
- protected int failedRetryTimes = 3;
- protected int failedRetrySleep = 3 * 1000;
-
- protected AuditProvider consumer = null;
- protected AuditFileSpool fileSpooler = null;
-
- protected boolean fileSpoolerEnabled = false;
- protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
- protected int fileSpoolDrainThresholdPercent = 80;
-
- int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
- long lastErrorLogMS = 0;
-
- protected Properties props = null;
-
- public BaseAuditProvider() {
- }
-
- public BaseAuditProvider(AuditProvider consumer) {
- this.consumer = consumer;
- }
-
- @Override
- public void init(Properties props) {
- init(props, null);
- }
-
- @Override
- public void init(Properties props, String basePropertyName) {
- LOG.info("BaseAuditProvider.init()");
- this.props = props;
- String propPrefix = PROP_DEFAULT_PREFIX;
- if (basePropertyName != null) {
- propPrefix = basePropertyName;
- }
- LOG.info("propPrefix=" + propPrefix);
- // Get final token
- List<String> tokens = MiscUtil.toArray(propPrefix, ".");
- String finalToken = tokens.get(tokens.size() - 1);
-
- String name = MiscUtil.getStringProperty(props, basePropertyName + "."
- + PROP_NAME);
- if (name != null && !name.isEmpty()) {
- providerName = name;
- }
- if (providerName == null) {
- providerName = finalToken;
- LOG.info("Using providerName from property prefix. providerName="
- + providerName);
- }
- LOG.info("providerName=" + providerName);
-
- setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
- + PROP_BATCH_SIZE, getMaxBatchSize()));
- setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
- + PROP_QUEUE_SIZE, getMaxQueueSize()));
- setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "."
- + PROP_BATCH_INTERVAL, getMaxBatchInterval()));
-
- fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix
- + "." + PROP_FILE_SPOOL_ENABLE, false);
- String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
- + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
- if (fileSpoolerEnabled || logFolderProp != null) {
- LOG.info("File spool is enabled for " + getName()
- + ", logFolderProp=" + logFolderProp + ", " + propPrefix
- + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
- + fileSpoolerEnabled);
- fileSpoolerEnabled = true;
- fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix
- + "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
- fileSpoolMaxWaitTime);
- fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props,
- propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD,
- fileSpoolDrainThresholdPercent);
- fileSpooler = new AuditFileSpool(this, consumer);
- fileSpooler.init(props, basePropertyName);
- } else {
- LOG.info("File spool is disabled for " + getName());
- }
-
- try {
- new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
- } catch (Throwable excp) {
- LOG.warn(
- "Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json",
- excp);
- }
-
- mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props,
- AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000);
-
- }
-
- public AuditProvider getConsumer() {
- return consumer;
- }
-
- public void setConsumer(AuditProvider consumer) {
- this.consumer = consumer;
- }
-
- public void logFailedEvent(AuditEventBase event) {
- logFailedEvent(event, null);
- }
-
- public void logFailedEvent(AuditEventBase event, Throwable excp) {
- long now = System.currentTimeMillis();
-
- long timeSinceLastReport = now - mFailedLogLastReportTime.get();
- long countSinceLastReport = mFailedLogCountSinceLastReport
- .incrementAndGet();
- long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
-
- if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
- mFailedLogLastReportTime.set(now);
- mFailedLogCountSinceLastReport.set(0);
-
- if (excp != null) {
- LOG.warn(
- "failed to log audit event: "
- + MiscUtil.stringify(event), excp);
- } else {
- LOG.warn("failed to log audit event: "
- + MiscUtil.stringify(event));
- }
-
- if (countLifeTime > 1) { // no stats to print for the 1st failure
- LOG.warn("Log failure count: " + countSinceLastReport
- + " in past "
- + formatIntervalForLog(timeSinceLastReport) + "; "
- + countLifeTime + " during process lifetime");
- }
- }
- }
-
- public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) {
- for (AuditEventBase event : events) {
- logFailedEvent(event, excp);
- }
- }
-
- public void logFailedEventJSON(String event, Throwable excp) {
- long now = System.currentTimeMillis();
-
- long timeSinceLastReport = now - mFailedLogLastReportTime.get();
- long countSinceLastReport = mFailedLogCountSinceLastReport
- .incrementAndGet();
- long countLifeTime = mFailedLogCountLifeTime.incrementAndGet();
-
- if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) {
- mFailedLogLastReportTime.set(now);
- mFailedLogCountSinceLastReport.set(0);
-
- if (excp != null) {
- LOG.warn("failed to log audit event: " + event, excp);
- } else {
- LOG.warn("failed to log audit event: " + event);
- }
-
- if (countLifeTime > 1) { // no stats to print for the 1st failure
- LOG.warn("Log failure count: " + countSinceLastReport
- + " in past "
- + formatIntervalForLog(timeSinceLastReport) + "; "
- + countLifeTime + " during process lifetime");
- }
- }
- }
-
- public void logFailedEventJSON(Collection<String> events, Throwable excp) {
- for (String event : events) {
- logFailedEventJSON(event, excp);
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
- * audit.model.AuditEventBase)
- */
- @Override
- public boolean log(AuditEventBase event) {
- List<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
- eventList.add(event);
- return log(eventList);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String)
- */
- @Override
- public boolean logJSON(String event) {
- AuditEventBase eventObj = MiscUtil.fromJson(event,
- AuthzAuditEvent.class);
- return log(eventObj);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection
- * )
- */
- @Override
- public boolean logJSON(Collection<String> events) {
- boolean ret = true;
- for (String event : events) {
- ret = logJSON(event);
- if (!ret) {
- break;
- }
- }
- return ret;
- }
-
- public void setName(String name) {
- providerName = name;
- }
-
- @Override
- public String getName() {
- return providerName;
- }
-
- @Override
- public boolean isDrain() {
- return isDrain;
- }
-
- public void setDrain(boolean isDrain) {
- this.isDrain = isDrain;
- }
-
- public int getMaxQueueSize() {
- return maxQueueSize;
- }
-
- public void setMaxQueueSize(int maxQueueSize) {
- this.maxQueueSize = maxQueueSize;
- }
-
- @Override
- public int getMaxBatchInterval() {
- return maxBatchInterval;
- }
-
- public void setMaxBatchInterval(int maxBatchInterval) {
- this.maxBatchInterval = maxBatchInterval;
- }
-
- @Override
- public int getMaxBatchSize() {
- return maxBatchSize;
- }
-
- public void setMaxBatchSize(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
- */
- @Override
- public void waitToComplete() {
- if (consumer != null) {
- consumer.waitToComplete(-1);
- }
- }
-
- @Override
- public void waitToComplete(long timeout) {
- if (consumer != null) {
- consumer.waitToComplete(timeout);
- }
- }
-
- @Override
- public boolean isFlushPending() {
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
- */
- @Override
- public long getLastFlushTime() {
- if (consumer != null) {
- return consumer.getLastFlushTime();
- }
- return 0;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#flush()
- */
- @Override
- public void flush() {
- if (consumer != null) {
- consumer.flush();
- }
- }
-
- public AtomicLong getLifeTimeInLogCount() {
- return lifeTimeInLogCount;
- }
-
- public long addLifeTimeInLogCount(long count) {
- return lifeTimeInLogCount.addAndGet(count);
- }
-
- public void logError(String msg) {
- long currTimeMS = System.currentTimeMillis();
- if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
- LOG.error(msg);
- lastErrorLogMS = currTimeMS;
- }
- }
-
- public void logError(String msg, Throwable ex) {
- long currTimeMS = System.currentTimeMillis();
- if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
- LOG.error(msg, ex);
- lastErrorLogMS = currTimeMS;
- }
- }
-
- public String getTimeDiffStr(long time1, long time2) {
- long timeInMs = Math.abs(time1 - time2);
- return formatIntervalForLog(timeInMs);
- }
-
- public String formatIntervalForLog(long timeInMs) {
- long hours = timeInMs / (60 * 60 * 1000);
- long minutes = (timeInMs / (60 * 1000)) % 60;
- long seconds = (timeInMs % (60 * 1000)) / 1000;
- long mSeconds = (timeInMs % (1000));
-
- if (hours > 0)
- return String.format("%02d:%02d:%02d.%03d hours", hours, minutes,
- seconds, mSeconds);
- else if (minutes > 0)
- return String.format("%02d:%02d.%03d minutes", minutes, seconds,
- mSeconds);
- else if (seconds > 0)
- return String.format("%02d.%03d seconds", seconds, mSeconds);
- else
- return String.format("%03d milli-seconds", mSeconds);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
index ab6a74a..ca842f3 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
@@ -23,7 +23,7 @@ import java.util.Properties;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
-public abstract class BufferedAuditProvider extends BaseAuditProvider {
+public abstract class BufferedAuditProvider extends BaseAuditHandler {
private LogBuffer<AuditEventBase> mBuffer = null;
private LogDestination<AuditEventBase> mDestination = null;
@@ -107,16 +107,6 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
}
@Override
- public boolean isFlushPending() {
- return false;
- }
-
- @Override
- public long getLastFlushTime() {
- return 0;
- }
-
- @Override
public void flush() {
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index f4bd90c..d475f89 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -177,16 +177,6 @@ public class DbAuditProvider extends AuditDestination {
}
@Override
- public boolean isFlushPending() {
- return mUncommitted.size() > 0;
- }
-
- @Override
- public long getLastFlushTime() {
- return mLastCommitTime;
- }
-
- @Override
public void flush() {
if(mUncommitted.size() > 0) {
boolean isSuccess = commitTransaction();
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
index 619a99d..05f882f 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
@@ -24,7 +24,7 @@ import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
-public class DummyAuditProvider implements AuditProvider {
+public class DummyAuditProvider implements AuditHandler {
@Override
public void init(Properties prop) {
// intentionally left empty
@@ -74,23 +74,6 @@ public class DummyAuditProvider implements AuditProvider {
// intentionally left empty
}
-
- @Override
- public int getMaxBatchSize() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public boolean isFlushPending() {
- return false;
- }
-
- @Override
- public long getLastFlushTime() {
- return 0;
- }
-
@Override
public void flush() {
// intentionally left empty
@@ -120,20 +103,4 @@ public class DummyAuditProvider implements AuditProvider {
return this.getClass().getName();
}
- /* (non-Javadoc)
- * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
- */
- @Override
- public boolean isDrain() {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.apache.ranger.audit.provider.AuditProvider#getMaxBatchInterval()
- */
- @Override
- public int getMaxBatchInterval() {
- // TODO Auto-generated method stub
- return 0;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
index 040a045..0402de2 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -27,8 +27,6 @@ import org.apache.ranger.audit.destination.AuditDestination;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
-import com.sun.tools.hat.internal.util.Misc;
-
public class Log4jAuditProvider extends AuditDestination {
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 876fa5b..4c1593a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -26,18 +26,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
-public class MultiDestAuditProvider extends BaseAuditProvider {
+public class MultiDestAuditProvider extends BaseAuditHandler {
private static final Log LOG = LogFactory
.getLog(MultiDestAuditProvider.class);
- protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>();
+ protected List<AuditHandler> mProviders = new ArrayList<AuditHandler>();
public MultiDestAuditProvider() {
LOG.info("MultiDestAuditProvider: creating..");
}
- public MultiDestAuditProvider(AuditProvider provider) {
+ public MultiDestAuditProvider(AuditHandler provider) {
addAuditProvider(provider);
}
@@ -47,7 +47,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
super.init(props);
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.init(props);
} catch (Throwable excp) {
@@ -57,7 +57,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
}
}
- public void addAuditProvider(AuditProvider provider) {
+ public void addAuditProvider(AuditHandler provider) {
if (provider != null) {
LOG.info("MultiDestAuditProvider.addAuditProvider(providerType="
+ provider.getClass().getCanonicalName() + ")");
@@ -66,9 +66,9 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
}
}
- public void addAuditProviders(List<AuditProvider> providers) {
+ public void addAuditProviders(List<AuditHandler> providers) {
if (providers != null) {
- for (AuditProvider provider : providers) {
+ for (AuditHandler provider : providers) {
LOG.info("Adding " + provider.getName()
+ " as consumer to MultiDestination " + getName());
addAuditProvider(provider);
@@ -78,7 +78,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public boolean log(AuditEventBase event) {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.log(event);
} catch (Throwable excp) {
@@ -90,7 +90,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public boolean log(Collection<AuditEventBase> events) {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.log(events);
} catch (Throwable excp) {
@@ -102,7 +102,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public boolean logJSON(String event) {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.logJSON(event);
} catch (Throwable excp) {
@@ -114,7 +114,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public boolean logJSON(Collection<String> events) {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.logJSON(events);
} catch (Throwable excp) {
@@ -126,7 +126,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public void start() {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.start();
} catch (Throwable excp) {
@@ -138,7 +138,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public void stop() {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.stop();
} catch (Throwable excp) {
@@ -150,7 +150,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public void waitToComplete() {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.waitToComplete();
} catch (Throwable excp) {
@@ -163,7 +163,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
@Override
public void waitToComplete(long timeout) {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.waitToComplete(timeout);
} catch (Throwable excp) {
@@ -175,35 +175,8 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
}
@Override
- public boolean isFlushPending() {
- for (AuditProvider provider : mProviders) {
- if (provider.isFlushPending()) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- public long getLastFlushTime() {
- long lastFlushTime = 0;
- for (AuditProvider provider : mProviders) {
- long flushTime = provider.getLastFlushTime();
-
- if (flushTime != 0) {
- if (lastFlushTime == 0 || lastFlushTime > flushTime) {
- lastFlushTime = flushTime;
- }
- }
- }
-
- return lastFlushTime;
- }
-
- @Override
public void flush() {
- for (AuditProvider provider : mProviders) {
+ for (AuditHandler provider : mProviders) {
try {
provider.flush();
} catch (Throwable excp) {
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
index 5f39e69..2c77b40 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java
@@ -25,12 +25,12 @@ import kafka.producer.ProducerConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
import org.apache.ranger.audit.provider.MiscUtil;
-public class KafkaAuditProvider extends BaseAuditProvider {
+public class KafkaAuditProvider extends AuditDestination {
private static final Log LOG = LogFactory.getLog(KafkaAuditProvider.class);
public static final String AUDIT_MAX_QUEUE_SIZE_PROP = "xasecure.audit.kafka.async.max.queue.size";
@@ -47,11 +47,6 @@ public class KafkaAuditProvider extends BaseAuditProvider {
LOG.info("init() called");
super.init(props);
- setMaxQueueSize(MiscUtil.getIntProperty(props,
- AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
- setMaxBatchInterval(MiscUtil.getIntProperty(props,
- AUDIT_MAX_QUEUE_SIZE_PROP,
- AUDIT_BATCH_INTERVAL_DEFAULT_MS));
topic = MiscUtil.getStringProperty(props,
AUDIT_KAFKA_TOPIC_NAME);
if (topic == null || topic.isEmpty()) {
@@ -176,19 +171,6 @@ public class KafkaAuditProvider extends BaseAuditProvider {
}
@Override
- public boolean isFlushPending() {
- LOG.info("isFlushPending() called");
- return false;
- }
-
- @Override
- public long getLastFlushTime() {
- LOG.info("getLastFlushTime() called");
-
- return 0;
- }
-
- @Override
public void flush() {
LOG.info("flush() called");
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
index 9ee4ec0..53e4348 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java
@@ -25,16 +25,16 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
-public class SolrAuditProvider extends BaseAuditProvider {
+public class SolrAuditProvider extends AuditDestination {
private static final Log LOG = LogFactory.getLog(SolrAuditProvider.class);
public static final String AUDIT_MAX_QUEUE_SIZE_PROP = "xasecure.audit.solr.async.max.queue.size";
@@ -56,11 +56,6 @@ public class SolrAuditProvider extends BaseAuditProvider {
LOG.info("init() called");
super.init(props);
- setMaxQueueSize(MiscUtil.getIntProperty(props,
- AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT));
- setMaxBatchInterval(MiscUtil.getIntProperty(props,
- AUDIT_MAX_QUEUE_SIZE_PROP,
- AUDIT_BATCH_INTERVAL_DEFAULT_MS));
retryWaitTime = MiscUtil.getIntProperty(props,
AUDIT_RETRY_WAIT_PROP, retryWaitTime);
}
@@ -241,29 +236,7 @@ public class SolrAuditProvider extends BaseAuditProvider {
public void waitToComplete(long timeout) {
}
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
- */
- @Override
- public boolean isFlushPending() {
- // TODO Auto-generated method stub
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
- */
- @Override
- public long getLastFlushTime() {
- // TODO Auto-generated method stub
- return 0;
- }
-
+
/*
* (non-Javadoc)
*
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index a6f291d..d16fff9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -21,32 +21,27 @@ package org.apache.ranger.audit.queue;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
/**
* This is a non-blocking queue with no limit on capacity.
*/
-public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+public class AuditAsyncQueue extends AuditQueue implements Runnable {
private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class);
- LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+ LinkedBlockingQueue<AuditEventBase> queue = new LinkedBlockingQueue<AuditEventBase>();
Thread consumerThread = null;
static final int MAX_DRAIN = 1000;
static int threadCount = 0;
static final String DEFAULT_NAME = "async";
- public AuditAsyncQueue() {
- setName(DEFAULT_NAME);
- }
-
- public AuditAsyncQueue(AuditProvider consumer) {
+ public AuditAsyncQueue(AuditHandler consumer) {
super(consumer);
setName(DEFAULT_NAME);
}
@@ -65,7 +60,6 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
return false;
}
queue.add(event);
- addLifeTimeInLogCount(1);
return true;
}
@@ -90,6 +84,9 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
public void start() {
if (consumer != null) {
consumer.start();
+ } else {
+ logger.error("consumer is not set. Nothing will be sent to any consumer. name="
+ + getName());
}
consumerThread = new Thread(this, this.getClass().getName()
@@ -110,23 +107,10 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
if (consumerThread != null) {
consumerThread.interrupt();
}
- consumerThread = null;
} catch (Throwable t) {
// ignore any exception
}
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
- */
- @Override
- public boolean isFlushPending() {
- if (queue.isEmpty()) {
- return consumer.isFlushPending();
- }
- return true;
+ consumerThread = null;
}
/*
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 5e21efc..8ed07bd 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -29,10 +29,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
-public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
+public class AuditBatchQueue extends AuditQueue implements Runnable {
private static final Log logger = LogFactory.getLog(AuditBatchQueue.class);
private BlockingQueue<AuditEventBase> queue = null;
@@ -41,10 +40,7 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
Thread consumerThread = null;
static int threadCount = 0;
- public AuditBatchQueue() {
- }
-
- public AuditBatchQueue(AuditProvider consumer) {
+ public AuditBatchQueue(AuditHandler consumer) {
super(consumer);
}
@@ -59,7 +55,6 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
public boolean log(AuditEventBase event) {
// Add to batchQueue. Block if full
queue.add(event);
- addLifeTimeInLogCount(1);
return true;
}
@@ -130,10 +125,10 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
if (consumerThread != null) {
consumerThread.interrupt();
}
- consumerThread = null;
} catch (Throwable t) {
// ignore any exception
}
+ consumerThread = null;
}
/*
@@ -187,19 +182,6 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
/*
* (non-Javadoc)
*
- * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
- */
- @Override
- public boolean isFlushPending() {
- if (queue.isEmpty()) {
- return consumer.isFlushPending();
- }
- return true;
- }
-
- /*
- * (non-Javadoc)
- *
* @see org.apache.ranger.audit.provider.AuditProvider#flush()
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
index 66d1573..a1c32b9 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
@@ -35,13 +35,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditHandler;
import org.apache.ranger.audit.provider.MiscUtil;
import com.google.gson.Gson;
@@ -69,10 +69,10 @@ public class AuditFileSpool implements Runnable {
// "filespool.index.done_filename";
public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms";
- AuditProvider queueProvider = null;
- AuditProvider consumerProvider = null;
+ AuditQueue queueProvider = null;
+ AuditHandler consumerProvider = null;
- BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>();
+ BlockingQueue<AuditIndexRecord> indexQueue = new LinkedBlockingQueue<AuditIndexRecord>();
// Folder and File attributes
File logFolder = null;
@@ -108,10 +108,10 @@ public class AuditFileSpool implements Runnable {
boolean isDrain = false;
boolean isDestDown = true;
- private static Gson gson = null;
+ private Gson gson = null;
- public AuditFileSpool(AuditProvider queueProvider,
- AuditProvider consumerProvider) {
+ public AuditFileSpool(AuditQueue queueProvider,
+ AuditHandler consumerProvider) {
this.queueProvider = queueProvider;
this.consumerProvider = consumerProvider;
}
@@ -120,12 +120,12 @@ public class AuditFileSpool implements Runnable {
init(prop, null);
}
- public void init(Properties props, String basePropertyName) {
+ public boolean init(Properties props, String basePropertyName) {
if (initDone) {
logger.error("init() called more than once. queueProvider="
+ queueProvider.getName() + ", consumerProvider="
+ consumerProvider.getName());
- return;
+ return true;
}
String propPrefix = "xasecure.audit.filespool";
if (basePropertyName != null) {
@@ -162,22 +162,22 @@ public class AuditFileSpool implements Runnable {
+ queueProvider.getName());
if (logFolderProp == null || logFolderProp.isEmpty()) {
- logger.error("Audit spool folder is not configured. Please set "
+ logger.fatal("Audit spool folder is not configured. Please set "
+ propPrefix
+ "."
+ PROP_FILE_SPOOL_LOCAL_DIR
+ ". queueName=" + queueProvider.getName());
- return;
+ return false;
}
logFolder = new File(logFolderProp);
if (!logFolder.isDirectory()) {
logFolder.mkdirs();
if (!logFolder.isDirectory()) {
- logger.error("File Spool folder not found and can't be created. folder="
+ logger.fatal("File Spool folder not found and can't be created. folder="
+ logFolder.getAbsolutePath()
+ ", queueName="
+ queueProvider.getName());
- return;
+ return false;
}
}
logger.info("logFolder=" + logFolder + ", queueName="
@@ -202,7 +202,7 @@ public class AuditFileSpool implements Runnable {
+ archiveFolder.getAbsolutePath()
+ ", queueName="
+ queueProvider.getName());
- return;
+ return false;
}
}
logger.info("archiveFolder=" + archiveFolder + ", queueName="
@@ -218,17 +218,30 @@ public class AuditFileSpool implements Runnable {
indexFile = new File(logFolder, indexFileName);
if (!indexFile.exists()) {
- indexFile.createNewFile();
+ boolean ret = indexFile.createNewFile();
+ if (!ret) {
+ logger.fatal("Error creating index file. fileName="
+ + indexDoneFile.getPath());
+ return false;
+ }
}
logger.info("indexFile=" + indexFile + ", queueName="
+ queueProvider.getName());
int lastDot = indexFileName.lastIndexOf('.');
+ if (lastDot < 0) {
+ lastDot = indexFileName.length() - 1;
+ }
indexDoneFileName = indexFileName.substring(0, lastDot)
+ "_closed.json";
indexDoneFile = new File(logFolder, indexDoneFileName);
if (!indexDoneFile.exists()) {
- indexDoneFile.createNewFile();
+ boolean ret = indexDoneFile.createNewFile();
+ if (!ret) {
+ logger.fatal("Error creating index done file. fileName="
+ + indexDoneFile.getPath());
+ return false;
+ }
}
logger.info("indexDoneFile=" + indexDoneFile + ", queueName="
+ queueProvider.getName());
@@ -252,8 +265,6 @@ public class AuditFileSpool implements Runnable {
}
}
printIndex();
- // One more loop to add the rest of the pending records in reverse
- // order
for (int i = 0; i < indexRecords.size(); i++) {
AuditIndexRecord auditIndexRecord = indexRecords.get(i);
if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
@@ -261,18 +272,19 @@ public class AuditFileSpool implements Runnable {
if (!consumerFile.exists()) {
logger.error("INIT: Consumer file="
+ consumerFile.getPath() + " not found.");
- System.exit(1);
+ } else {
+ indexQueue.add(auditIndexRecord);
}
- indexQueue.add(auditIndexRecord);
}
}
} catch (Throwable t) {
logger.fatal("Error initializing File Spooler. queue="
+ queueProvider.getName(), t);
- return;
+ return false;
}
initDone = true;
+ return true;
}
/**
@@ -328,6 +340,7 @@ public class AuditFileSpool implements Runnable {
out.flush();
out.close();
+ break;
} catch (Throwable t) {
logger.debug("Error closing spool out file.", t);
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
new file mode 100644
index 0000000..4c3ac5f
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.provider.AuditHandler;
+import org.apache.ranger.audit.provider.BaseAuditHandler;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public abstract class AuditQueue extends BaseAuditHandler {
+ private static final Log LOG = LogFactory.getLog(AuditQueue.class);
+
+ public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
+ public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
+ public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+
+ private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
+ private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
+ private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
+
+ public static final String PROP_QUEUE = "queue";
+
+ public static final String PROP_BATCH_SIZE = "batch.size";
+ public static final String PROP_QUEUE_SIZE = "queue.size";
+ public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
+
+ public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
+ public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms";
+ public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent";
+
+ final protected AuditHandler consumer;
+ protected AuditFileSpool fileSpooler = null;
+
+ private boolean isDrain = false;
+
+ protected boolean fileSpoolerEnabled = false;
+ protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
+ protected int fileSpoolDrainThresholdPercent = 80;
+
+ /**
+ * @param consumer
+ */
+ public AuditQueue(AuditHandler consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void init(Properties props, String basePropertyName) {
+ LOG.info("BaseAuditProvider.init()");
+ super.init(props, basePropertyName);
+
+ setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_BATCH_SIZE, getMaxBatchSize()));
+ setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_QUEUE_SIZE, getMaxQueueSize()));
+ setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_BATCH_INTERVAL, getMaxBatchInterval()));
+
+ fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_ENABLE, false);
+ String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
+ if (fileSpoolerEnabled || logFolderProp != null) {
+ LOG.info("File spool is enabled for " + getName()
+ + ", logFolderProp=" + logFolderProp + ", " + propPrefix
+ + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
+ + fileSpoolerEnabled);
+ fileSpoolerEnabled = true;
+ fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
+ fileSpoolMaxWaitTime);
+ fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props,
+ propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD,
+ fileSpoolDrainThresholdPercent);
+ fileSpooler = new AuditFileSpool(this, consumer);
+ if (!fileSpooler.init(props, basePropertyName)) {
+ fileSpoolerEnabled = false;
+ LOG.fatal("Couldn't initialize file spooler. Disabling it. queue="
+ + getName() + ", consumer=" + consumer.getName());
+ }
+ } else {
+ LOG.info("File spool is disabled for " + getName());
+ }
+
+ }
+
+ public AuditHandler getConsumer() {
+ return consumer;
+ }
+
+ public boolean isDrain() {
+ return isDrain;
+ }
+
+ public void setDrain(boolean isDrain) {
+ this.isDrain = isDrain;
+ }
+
+ public int getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ public void setMaxQueueSize(int maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ public int getMaxBatchInterval() {
+ return maxBatchInterval;
+ }
+
+ public void setMaxBatchInterval(int maxBatchInterval) {
+ this.maxBatchInterval = maxBatchInterval;
+ }
+
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ public void setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+ */
+ @Override
+ public void waitToComplete() {
+ if (consumer != null) {
+ consumer.waitToComplete(-1);
+ }
+ }
+
+ @Override
+ public void waitToComplete(long timeout) {
+ if (consumer != null) {
+ consumer.waitToComplete(timeout);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+ */
+ @Override
+ public void flush() {
+ if (consumer != null) {
+ consumer.flush();
+ }
+ }
+
+}