You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2014/09/22 20:29:20 UTC
[06/10] git commit: ARGUS-5: added support for sending audit logs to
HDFS
ARGUS-5: added support for sending audit logs to HDFS
Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/5ccf382d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/5ccf382d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/5ccf382d
Branch: refs/heads/master
Commit: 5ccf382d13d046179d2b1bbd20449917cfc7b8c1
Parents: 886e9c2
Author: mneethiraj <mn...@hortonworks.com>
Authored: Sun Sep 21 00:37:28 2014 -0700
Committer: mneethiraj <mn...@hortonworks.com>
Committed: Sun Sep 21 00:37:28 2014 -0700
----------------------------------------------------------------------
.../audit/provider/AuditProviderFactory.java | 110 ++--
.../audit/provider/BufferedAuditProvider.java | 65 ++
.../audit/provider/DummyAuditProvider.java | 13 +-
.../audit/provider/LocalFileLogBuffer.java | 601 +++++++++++++++++++
.../com/xasecure/audit/provider/LogBuffer.java | 30 +
.../xasecure/audit/provider/LogDestination.java | 32 +
.../com/xasecure/audit/provider/MiscUtil.java | 139 +++++
.../audit/provider/hdfs/HdfsAuditProvider.java | 48 ++
.../audit/provider/hdfs/HdfsLogDestination.java | 315 ++++++++++
hbase-agent/conf/xasecure-audit.xml | 72 +++
hdfs-agent/conf/xasecure-audit.xml | 141 ++++-
hive-agent/conf/xasecure-audit.xml | 72 +++
knox-agent/conf/xasecure-audit.xml | 72 +++
storm-agent/conf/xasecure-audit.xml | 72 +++
14 files changed, 1708 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
index 7bed0f7..bb31e7c 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/AuditProviderFactory.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.xasecure.audit.provider.hdfs.HdfsAuditProvider;
import com.xasecure.authorization.hadoop.utils.XaSecureCredentialProvider;
@@ -41,22 +42,32 @@ public class AuditProviderFactory {
private static final Log LOG = LogFactory.getLog(AuditProviderFactory.class);
- private static final String AUDIT_JPA_CONFIG_PROP_PREFIX = "xasecure.audit.jpa.";
private static final String AUDIT_IS_ENABLED_PROP = "xasecure.audit.is.enabled" ;
- private static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled" ;
- private static final String AUDIT_LOG4J_IS_ASYNC_PROP = "xasecure.audit.log4j.is.async" ;
- private static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP = "xasecure.audit.log4j.async.max.queue.size" ;
- private static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
+
private static final String AUDIT_DB_IS_ENABLED_PROP = "xasecure.audit.db.is.enabled" ;
- private static final String AUDIT_DB_IS_ASYNC_PROP = "xasecure.audit.db.is.async" ;
+ private static final String AUDIT_DB_IS_ASYNC_PROP = "xasecure.audit.db.is.async";
private static final String AUDIT_DB_MAX_QUEUE_SIZE_PROP = "xasecure.audit.db.async.max.queue.size" ;
- private static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.db.async.max.flush.interval.ms";
private static final String AUDIT_DB_RESUME_QUEUE_SIZE__PROP = "xasecure.audit.db.async.resume.queue.size" ;
+ private static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.db.async.max.flush.interval.ms";
private static final String AUDIT_DB_BATCH_SIZE_PROP = "xasecure.audit.db.batch.size" ;
+ private static final String AUDIT_JPA_CONFIG_PROP_PREFIX = "xasecure.audit.jpa.";
private static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE = "xasecure.audit.credential.provider.file";
private static final String AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS = "auditDBCred";
private static final String AUDIT_JPA_JDBC_PASSWORD = "javax.persistence.jdbc.password";
+ private static final String AUDIT_HDFS_IS_ENABLED_PROP = "xasecure.audit.hdfs.is.enabled";
+ private static final String AUDIT_HDFS_IS_ASYNC_PROP = "xasecure.audit.hdfs.is.async";
+ private static final String AUDIT_HDFS_MAX_QUEUE_SIZE_PROP = "xasecure.audit.hdfs.async.max.queue.size" ;
+ private static final String AUDIT_HDFS_RESUME_QUEUE_SIZE__PROP = "xasecure.audit.hdfs.async.resume.queue.size" ;
+ private static final String AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.hdfs.async.max.flush.interval.ms";
+ private static final String AUDIT_HDFS_CONFIG_PREFIX_PROP = "xasecure.audit.hdfs.config.";
+
+ private static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled" ;
+ private static final String AUDIT_LOG4J_IS_ASYNC_PROP = "xasecure.audit.log4j.is.async";
+ private static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP = "xasecure.audit.log4j.async.max.queue.size" ;
+ private static final String AUDIT_LOG4J_RESUME_QUEUE_SIZE__PROP = "xasecure.audit.log4j.async.resume.queue.size" ;
+ private static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
+
private static AuditProviderFactory sFactory;
private AuditProvider mProvider = null;
@@ -89,32 +100,35 @@ public class AuditProviderFactory {
public void init(Properties props) {
LOG.info("AuditProviderFactory: initializing..");
-
+
boolean isEnabled = getBooleanProperty(props, AUDIT_IS_ENABLED_PROP, false);
- boolean isAuditToLog4jEnabled = getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false);
- boolean isAuditToLog4jAsync = getBooleanProperty(props, AUDIT_LOG4J_IS_ASYNC_PROP, false);
boolean isAuditToDbEnabled = getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false);
- boolean isAuditToDbAsync = getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
-
- List<AuditProvider> providers = new ArrayList<AuditProvider>();
-
+ boolean isAuditToHdfsEnabled = getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false);
+ boolean isAuditToLog4jEnabled = getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false);
- if(!isEnabled || (!isAuditToDbEnabled && !isAuditToLog4jEnabled)) {
+ if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToLog4jEnabled)) {
LOG.info("AuditProviderFactory: Audit not enabled..");
-
+
mProvider = getDefaultProvider();
return;
}
-
+
+ List<AuditProvider> providers = new ArrayList<AuditProvider>();
+
if(isAuditToDbEnabled) {
-
-
- Map<String, String> jpaInitProperties = getJpaProperties(props);
-
+ Map<String, String> jpaInitProperties = getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
+
+ String jdbcPassword = getCredentialString(getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
+
+ if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
+ jpaInitProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
+ }
+
LOG.info("AuditProviderFactory: found " + jpaInitProperties.size() + " Audit JPA properties");
- int dbBatchSize = getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
+ int dbBatchSize = getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
+ boolean isAuditToDbAsync = getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
if(! isAuditToDbAsync) {
dbBatchSize = 1; // Batching not supported in sync mode; need to address multiple threads making audit calls
@@ -142,17 +156,51 @@ public class AuditProviderFactory {
}
}
+ if(isAuditToHdfsEnabled) {
+ Map<String, String> hdfsInitProperties = getPropertiesWithPrefix(props, AUDIT_HDFS_CONFIG_PREFIX_PROP);
+
+ LOG.info("AuditProviderFactory: found " + hdfsInitProperties.size() + " Audit HDFS properties");
+
+ HdfsAuditProvider hdfsProvider = new HdfsAuditProvider();
+
+ hdfsProvider.init(hdfsInitProperties);
+
+ boolean isAuditToHdfsAsync = getBooleanProperty(props, AUDIT_HDFS_IS_ASYNC_PROP, false);
+
+ if(isAuditToHdfsAsync) {
+ AsyncAuditProvider asyncProvider = new AsyncAuditProvider();
+
+ int maxQueueSize = getIntProperty(props, AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, -1);
+ int maxFlushInterval = getIntProperty(props, AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, -1);
+ int resumeQueueSize = getIntProperty(props, AUDIT_HDFS_RESUME_QUEUE_SIZE__PROP, 0);
+
+ asyncProvider.setMaxQueueSize(maxQueueSize);
+ asyncProvider.setMaxFlushInterval(maxFlushInterval);
+ asyncProvider.setResumeQueueSize(resumeQueueSize);
+
+ asyncProvider.addAuditProvider(hdfsProvider);
+
+ providers.add(asyncProvider);
+ } else {
+ providers.add(hdfsProvider);
+ }
+ }
+
if(isAuditToLog4jEnabled) {
Log4jAuditProvider log4jProvider = new Log4jAuditProvider();
+
+ boolean isAuditToLog4jAsync = getBooleanProperty(props, AUDIT_LOG4J_IS_ASYNC_PROP, false);
if(isAuditToLog4jAsync) {
AsyncAuditProvider asyncProvider = new AsyncAuditProvider();
int maxQueueSize = getIntProperty(props, AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, -1);
int maxFlushInterval = getIntProperty(props, AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, -1);
+ int resumeQueueSize = getIntProperty(props, AUDIT_LOG4J_RESUME_QUEUE_SIZE__PROP, 0);
asyncProvider.setMaxQueueSize(maxQueueSize);
asyncProvider.setMaxFlushInterval(maxFlushInterval);
+ asyncProvider.setResumeQueueSize(resumeQueueSize);
asyncProvider.addAuditProvider(log4jProvider);
@@ -181,8 +229,8 @@ public class AuditProviderFactory {
Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
}
- private Map<String, String> getJpaProperties(Properties props) {
- Map<String, String> jpaInitProperties = new HashMap<String, String>();
+ private Map<String, String> getPropertiesWithPrefix(Properties props, String prefix) {
+ Map<String, String> prefixedProperties = new HashMap<String, String>();
for(String key : props.stringPropertyNames()) {
if(key == null) {
@@ -191,25 +239,19 @@ public class AuditProviderFactory {
String val = props.getProperty(key);
- if(key.startsWith(AuditProviderFactory.AUDIT_JPA_CONFIG_PROP_PREFIX)) {
- key = key.substring(AuditProviderFactory.AUDIT_JPA_CONFIG_PROP_PREFIX.length());
+ if(key.startsWith(prefix)) {
+ key = key.substring(prefix.length());
if(key == null) {
continue;
}
- jpaInitProperties.put(key, val);
+ prefixedProperties.put(key, val);
}
}
- String jdbcPassword = getCredentialString(getStringProperty(props,AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
-
- if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
- jpaInitProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
- }
-
- return jpaInitProperties;
+ return prefixedProperties;
}
private boolean getBooleanProperty(Properties props, String propName, boolean defValue) {
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
new file mode 100644
index 0000000..9b8cb40
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/BufferedAuditProvider.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.xasecure.audit.provider;
+
+import com.xasecure.audit.model.AuditEventBase;
+
+public abstract class BufferedAuditProvider implements AuditProvider {
+ private LogBuffer<AuditEventBase> mBuffer = null;
+ private LogDestination<AuditEventBase> mDestination = null;
+
+
+ @Override
+ public void log(AuditEventBase event) {
+ mBuffer.add(event);
+ }
+
+ @Override
+ public void start() {
+ mBuffer.start(mDestination);
+ }
+
+ @Override
+ public void stop() {
+ mBuffer.stop();
+ }
+
+ @Override
+ public void waitToComplete() {
+ }
+
+ @Override
+ public boolean isFlushPending() {
+ return false;
+ }
+
+ @Override
+ public long getLastFlushTime() {
+ return 0;
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ protected void setBufferAndDestination(LogBuffer<AuditEventBase> buffer,
+ LogDestination<AuditEventBase> destination) {
+ mBuffer = buffer;
+ mDestination = destination;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
index ecf25b1..777f740 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/DummyAuditProvider.java
@@ -1,12 +1,3 @@
-package com.xasecure.audit.provider;
-
-import com.xasecure.audit.model.AuditEventBase;
-import com.xasecure.audit.model.HBaseAuditEvent;
-import com.xasecure.audit.model.HdfsAuditEvent;
-import com.xasecure.audit.model.HiveAuditEvent;
-import com.xasecure.audit.model.KnoxAuditEvent;
-import com.xasecure.audit.model.StormAuditEvent;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -24,6 +15,10 @@ import com.xasecure.audit.model.StormAuditEvent;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package com.xasecure.audit.provider;
+
+import com.xasecure.audit.model.AuditEventBase;
+
public class DummyAuditProvider implements AuditProvider {
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
new file mode 100644
index 0000000..9acae11
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.xasecure.audit.provider;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+import org.apache.log4j.helpers.LogLog;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+
+public class LocalFileLogBuffer<T> implements LogBuffer<T> {
+ private String mDirectory = null;
+ private String mFile = null;
+ private String mEncoding = null;
+ private boolean mIsAppend = true;
+ private int mRolloverIntervalSeconds = 600;
+ private String mArchiveDirectory = null;
+ private int mArchiveFileCount = 10;
+
+ private Writer mWriter = null;
+ private String mCurrentFilename = null;
+ private long mNextRolloverTime = 0;
+
+ private Gson mGsonBuilder = null;
+
+ private DestinationDispatcherThread<T> mDispatcherThread = null;
+
+ public LocalFileLogBuffer() {
+ mGsonBuilder = new GsonBuilder().setPrettyPrinting().create();
+ }
+
+ public String getDirectory() {
+ return mDirectory;
+ }
+
+ public void setDirectory(String directory) {
+ mDirectory = directory;
+ }
+
+ public String getFile() {
+ return mFile;
+ }
+
+ public void setFile(String file) {
+ mFile = file;
+ }
+
+ public String getEncoding() {
+ return mEncoding;
+ }
+
+ public void setEncoding(String encoding) {
+ mEncoding = encoding;
+ }
+
+ public boolean getIsAppend() {
+ return mIsAppend;
+ }
+
+ public void setIsAppend(boolean isAppend) {
+ mIsAppend = isAppend;
+ }
+
+ public int getRolloverIntervalSeconds() {
+ return mRolloverIntervalSeconds;
+ }
+
+ public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+ mRolloverIntervalSeconds = rolloverIntervalSeconds;
+ }
+
+ public String getArchiveDirectory() {
+ return mArchiveDirectory;
+ }
+
+ public void setArchiveDirectory(String archiveDirectory) {
+ mArchiveDirectory = archiveDirectory;
+ }
+
+ public int getArchiveFileCount() {
+ return mArchiveFileCount;
+ }
+
+ public void setArchiveFileCount(int archiveFileCount) {
+ mArchiveFileCount = archiveFileCount;
+ }
+
+
+ @Override
+ public void start(LogDestination<T> destination) {
+ LogLog.debug("==> LocalFileLogBuffer.start()");
+
+ mDispatcherThread = new DestinationDispatcherThread<T>(this, destination);
+
+ mDispatcherThread.start();
+
+ LogLog.debug("<== LocalFileLogBuffer.start()");
+ }
+
+ @Override
+ public void stop() {
+ LogLog.debug("==> LocalFileLogBuffer.stop()");
+
+ DestinationDispatcherThread<T> dispatcherThread = mDispatcherThread;
+ mDispatcherThread = null;
+
+ if(dispatcherThread != null && dispatcherThread.isAlive()) {
+ dispatcherThread.stopThread();
+
+ try {
+ dispatcherThread.join();
+ } catch (InterruptedException e) {
+ LogLog.warn("LocalFileLogBuffer.stop(): failed in waiting for DispatcherThread", e);
+ }
+ }
+
+ closeFile();
+
+ LogLog.debug("<== LocalFileLogBuffer.stop()");
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return mWriter != null;
+ }
+
+ @Override
+ public synchronized boolean add(T log) {
+ boolean ret = false;
+
+ long now = System.currentTimeMillis();
+
+ if(now > mNextRolloverTime) {
+ rollover();
+ }
+
+ Writer writer = mWriter;
+
+ if(writer != null) {
+ try {
+ String msg = toJson(log);
+
+ if(msg.contains(MiscUtil.LINE_SEPARATOR)) {
+ msg = msg.replace(MiscUtil.LINE_SEPARATOR, MiscUtil.ESCAPE_STR + MiscUtil.LINE_SEPARATOR);
+ }
+
+ writer.write(msg + MiscUtil.LINE_SEPARATOR);
+
+ ret = true;
+ } catch(IOException excp) {
+ LogLog.warn("LocalFileLogBuffer.add(): write failed", excp);
+ }
+ } else {
+ LogLog.warn("LocalFileLogBuffer.add(): writer is null");
+ }
+
+ return ret;
+ }
+
+ private synchronized void openFile() {
+ LogLog.debug("==> LocalFileLogBuffer.openFile()");
+
+ closeFile();
+
+ mCurrentFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
+
+ FileOutputStream ostream = null;
+ try {
+ ostream = new FileOutputStream(mCurrentFilename, mIsAppend);
+ } catch(Exception excp) {
+ MiscUtil.createParents(new File(mCurrentFilename));
+
+ try {
+ ostream = new FileOutputStream(mCurrentFilename, mIsAppend);
+ } catch(Exception ex) {
+ // ignore; error printed down
+ }
+ }
+
+ mWriter = createWriter(ostream);
+
+ if(mWriter != null) {
+ LogLog.debug("LocalFileLogBuffer.openFile(): opened file " + mCurrentFilename);
+
+ mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000));
+ } else {
+ LogLog.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mCurrentFilename);
+
+ mCurrentFilename = null;
+ }
+
+ LogLog.debug("<== LocalFileLogBuffer.openFile()");
+ }
+
+ private synchronized void closeFile() {
+ LogLog.debug("==> LocalFileLogBuffer.closeFile()");
+
+ Writer writer = mWriter;
+
+ mWriter = null;
+
+ if(writer != null) {
+ try {
+ writer.flush();
+ writer.close();
+ } catch(IOException excp) {
+ LogLog.warn("LocalFileLogBuffer: failed to close file " + mCurrentFilename, excp);
+ }
+
+ if(mDispatcherThread != null) {
+ mDispatcherThread.addLogfile(mCurrentFilename);
+ }
+ }
+
+ LogLog.debug("<== LocalFileLogBuffer.closeFile()");
+ }
+
+ private void rollover() {
+ LogLog.debug("==> LocalFileLogBuffer.rollover()");
+
+ closeFile();
+
+ openFile();
+
+ LogLog.debug("<== LocalFileLogBuffer.rollover()");
+ }
+
+ public OutputStreamWriter createWriter(OutputStream os ) {
+ OutputStreamWriter writer = null;
+
+ if(os != null) {
+ if(mEncoding != null) {
+ try {
+ writer = new OutputStreamWriter(os, mEncoding);
+ } catch(UnsupportedEncodingException excp) {
+ LogLog.warn("LocalFileLogBuffer: failed to create output writer.", excp);
+ }
+ }
+
+ if(writer == null) {
+ writer = new OutputStreamWriter(os);
+ }
+ }
+
+ return writer;
+ }
+
+ boolean isCurrentFilename(String filename) {
+ return mCurrentFilename != null && filename != null && filename.equals(mCurrentFilename);
+ }
+
+ private String toJson(T log) {
+ String jsonString = mGsonBuilder.toJson(log) ;
+
+ return jsonString;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("LocalFileLogBuffer {");
+ sb.append("Directory=").append(mDirectory).append("; ");
+ sb.append("File=").append(mFile).append("; ");
+ sb.append("RolloverIntervaSeconds=").append(mRolloverIntervalSeconds).append("; ");
+ sb.append("ArchiveDirectory=").append(mArchiveDirectory).append("; ");
+ sb.append("ArchiveFileCount=").append(mArchiveFileCount);
+ sb.append("}");
+
+ return sb.toString();
+ }
+
+}
+
+class DestinationDispatcherThread<T> extends Thread {
+ private TreeSet<String> mCompletedLogfiles = new TreeSet<String>();
+ private boolean mStopThread = false;
+ private LocalFileLogBuffer<T> mFileLogBuffer = null;
+ private LogDestination<T> mDestination = null;
+
+ private String mCurrentLogfile = null;
+ private BufferedReader mReader = null;
+
+ public DestinationDispatcherThread(LocalFileLogBuffer<T> fileLogBuffer, LogDestination<T> destination) {
+ super(DestinationDispatcherThread.class.getSimpleName() + "-" + System.currentTimeMillis());
+
+ mFileLogBuffer = fileLogBuffer;
+ mDestination = destination;
+
+ setDaemon(true);
+ }
+
+ public void addLogfile(String filename) {
+ LogLog.debug("==> DestinationDispatcherThread.addLogfile(" + filename + ")");
+
+ if(filename != null) {
+ synchronized(mCompletedLogfiles) {
+ mCompletedLogfiles.add(filename);
+ mCompletedLogfiles.notify();
+ }
+ }
+
+ LogLog.debug("<== DestinationDispatcherThread.addLogfile(" + filename + ")");
+ }
+
+ public void stopThread() {
+ mStopThread = true;
+ }
+
+ @Override
+ public void run() {
+ init();
+
+ // destination start() should be from the dispatcher thread
+ mDestination.start();
+
+ int pollIntervalInMs = 1000;
+
+ while(! mStopThread) {
+ synchronized(mCompletedLogfiles) {
+ while(mCompletedLogfiles.isEmpty() && !mStopThread) {
+ try {
+ mCompletedLogfiles.wait(pollIntervalInMs);
+ } catch(InterruptedException excp) {
+ LogLog.warn("LocalFileLogBuffer.run(): failed to wait for log file", excp);
+ }
+ }
+
+ mCurrentLogfile = mCompletedLogfiles.pollFirst();
+ }
+
+ if(mCurrentLogfile != null) {
+ sendCurrentFile();
+ }
+ }
+
+ mDestination.stop();
+ }
+
+ private void init() {
+ LogLog.debug("==> DestinationDispatcherThread.init()");
+
+ String dirName = MiscUtil.replaceTokens(mFileLogBuffer.getDirectory());
+ File directory = new File(dirName);
+
+ if(directory.exists() && directory.isDirectory()) {
+ File[] files = directory.listFiles();
+
+ if(files != null) {
+ for(File file : files) {
+ if(file.exists() && file.canRead()) {
+ String filename = file.getAbsolutePath();
+ if(! mFileLogBuffer.isCurrentFilename(filename)) {
+ addLogfile(filename);
+ }
+ }
+ }
+ }
+ }
+
+ LogLog.debug("<== DestinationDispatcherThread.init()");
+ }
+
+ private boolean sendCurrentFile() {
+ boolean ret = false;
+
+ int destinationPollIntervalInMs = 1000;
+
+ openCurrentFile();
+
+ while(!mStopThread) {
+ String log = getNextStringifiedLog();
+
+ if(log == null) { // reached end-of-file
+ ret = true;
+
+ break;
+ }
+
+ // loop until log is sent successfully
+ while(!mStopThread && !mDestination.sendStringified(log)) {
+ sleep(destinationPollIntervalInMs, "LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed to wait for destination to be available");
+ }
+ }
+
+ closeCurrentFile();
+
+ return ret;
+ }
+
+ private String getNextStringifiedLog() {
+ String log = null;
+
+ if(mReader != null) {
+ try {
+ while(true) {
+ String line = mReader.readLine();
+
+ if(line == null) {
+ break;
+ } else {
+ if(log == null) {
+ log = "";
+ }
+
+ if(line.endsWith(MiscUtil.ESCAPE_STR)) {
+ line = line.substring(0, line.length() - MiscUtil.ESCAPE_STR.length());
+
+ log += MiscUtil.LINE_SEPARATOR;
+ log += line;
+
+ continue;
+ } else {
+ log += line;
+ break;
+ }
+ }
+ }
+ } catch (IOException excp) {
+ LogLog.warn("getNextStringifiedLog.getNextLog(): failed to read from file " + mCurrentLogfile, excp);
+ }
+
+ if(log == null) {
+ closeCurrentFile();
+ }
+ }
+ LogLog.warn("READ: " + log);
+
+ return log;
+ }
+
+ private void openCurrentFile() {
+ LogLog.debug("==> openCurrentFile(" + mCurrentLogfile + ")");
+
+ closeCurrentFile();
+
+ while(mReader == null) {
+ if(mCurrentLogfile != null) {
+ try {
+ FileInputStream inStr = new FileInputStream(mCurrentLogfile);
+
+ InputStreamReader strReader = createReader(inStr);
+
+ if(strReader != null) {
+ mReader = new BufferedReader(strReader);
+ }
+ } catch(FileNotFoundException excp) {
+ LogLog.warn("openNextFile(): error while opening file " + mCurrentLogfile, excp);
+ }
+ }
+ }
+
+ LogLog.debug("<== openCurrentFile(" + mCurrentLogfile + ")");
+ }
+
+ private void closeCurrentFile() {
+ LogLog.debug("==> closeCurrentFile(" + mCurrentLogfile + ")");
+
+ if(mReader != null) {
+ try {
+ mReader.close();
+ } catch(IOException excp) {
+ // ignore
+ }
+ }
+ mReader = null;
+
+ if(!mStopThread) {
+ archiveCurrentFile();
+ }
+
+ LogLog.debug("<== closeCurrentFile(" + mCurrentLogfile + ")");
+ }
+
+ private void archiveCurrentFile() {
+ if(mCurrentLogfile != null) {
+ File logFile = new File(mCurrentLogfile);
+ String archiveDirName = MiscUtil.replaceTokens(mFileLogBuffer.getArchiveDirectory());
+ String archiveFilename = archiveDirName + File.separator + MiscUtil.replaceTokens(logFile.getName());
+
+ try {
+ if(logFile.exists()) {
+ File archiveFile = new File(archiveFilename);
+
+ MiscUtil.createParents(archiveFile);
+
+ if(! logFile.renameTo(archiveFile)) {
+ // TODO: renameTo() does not work in all cases. in case of failure, copy the file contents to the destination and delete the file
+ }
+
+ File archiveDir = new File(archiveDirName);
+ File[] files = archiveDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File f) {
+ return f.isFile();
+ }
+ });
+
+ int numOfFilesToDelete = files == null ? 0 : (files.length - mFileLogBuffer.getArchiveFileCount());
+
+ if(numOfFilesToDelete > 0) {
+ Arrays.sort(files, new Comparator<File>() {
+
+ @Override
+ public int compare(File f1, File f2) {
+ return (int)(f1.lastModified() - f2.lastModified());
+ }
+ });
+
+ for(int i = 0; i < numOfFilesToDelete; i++) {
+ LogLog.debug("DELETE: " + files[i].getAbsolutePath());
+
+ files[i].delete();
+ }
+ }
+ }
+ } catch(Exception excp) {
+ LogLog.warn("archiveCurrentFile(): faile to move " + mCurrentLogfile + " to archive location " + archiveFilename, excp);
+ }
+ }
+ mCurrentLogfile = null;
+ }
+
+ public InputStreamReader createReader(InputStream iStr) {
+ InputStreamReader reader = null;
+
+ if(iStr != null) {
+ String encoding = mFileLogBuffer.getEncoding();
+
+ if(encoding != null) {
+ try {
+ reader = new InputStreamReader(iStr, encoding);
+ } catch(UnsupportedEncodingException excp) {
+ LogLog.warn("createReader(): failed to create input reader.", excp);
+ }
+ }
+
+ if(reader == null) {
+ reader = new InputStreamReader(iStr);
+ }
+ }
+
+ return reader;
+ }
+
+ private void sleep(int sleepTimeInMs, String onFailMsg) {
+ try {
+ Thread.sleep(sleepTimeInMs);
+ } catch(InterruptedException excp) {
+ LogLog.warn(onFailMsg, excp);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("DestinationDispatcherThread {");
+ sb.append("ThreadName=").append(this.getName()).append("; ");
+ sb.append("CompletedLogfiles.size()=").append(mCompletedLogfiles.size()).append("; ");
+ sb.append("StopThread=").append(mStopThread).append("; ");
+ sb.append("CurrentLogfile=").append(mCurrentLogfile);
+ sb.append("}");
+
+ return sb.toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/LogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/LogBuffer.java b/agents-audit/src/main/java/com/xasecure/audit/provider/LogBuffer.java
new file mode 100644
index 0000000..ce67e01
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/LogBuffer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.xasecure.audit.provider;
+
+
+public interface LogBuffer<T> {
+ public void start(LogDestination<T> destination);
+
+ public void stop();
+
+ boolean isAvailable();
+
+ public boolean add(T log);
+}
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/LogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/LogDestination.java b/agents-audit/src/main/java/com/xasecure/audit/provider/LogDestination.java
new file mode 100644
index 0000000..a8b5081
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/LogDestination.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.xasecure.audit.provider;
+
+
+public interface LogDestination<T> {
+ public void start();
+
+ public void stop();
+
+ boolean isAvailable();
+
+ public boolean send(T log);
+
+ public boolean sendStringified(String log);
+}
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java b/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java
new file mode 100644
index 0000000..6610210
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java
@@ -0,0 +1,139 @@
+package com.xasecure.audit.provider;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.rmi.dgc.VMID;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.log4j.helpers.LogLog;
+
+public class MiscUtil {
+ public static final String TOKEN_HOSTNAME = "%hostname%";
+ public static final String TOKEN_APP_INSTANCE = "%app-instance%";
+ public static final String TOKEN_CREATE_TIME_START = "%create-time:";
+ public static final String TOKEN_CREATE_TIME_END = "%";
+ public static final String ESCAPE_STR = "\\";
+
+ static VMID sJvmID = new VMID();
+
+ public static String LINE_SEPARATOR = System.getProperty("line.separator");
+
+ public static String replaceTokens(String str) {
+ if(str == null) {
+ return str;
+ }
+
+ str = replaceHostname(str);
+ str = replaceAppInstance(str);
+ str = replaceCreateTime(str);
+
+ return str;
+ }
+
+ public static String replaceHostname(String str) {
+ if(!str.contains(TOKEN_HOSTNAME)) {
+ return str;
+ }
+
+ String hostName = null;
+
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException excp) {
+ LogLog.warn("LocalFileLogBuffer", excp);
+ }
+
+ if(hostName == null) {
+ hostName = "Unknown";
+ }
+
+ return str.replace(TOKEN_HOSTNAME, hostName);
+ }
+
+ public static String replaceAppInstance(String str) {
+ if(!str.contains(TOKEN_APP_INSTANCE)) {
+ return str;
+ }
+
+ String appInstance = Integer.toString(Math.abs(sJvmID.hashCode()));
+
+ return str.replace(TOKEN_APP_INSTANCE, appInstance);
+ }
+
+ public static String replaceCreateTime(String str) {
+ Date now = new Date();
+
+ while(str.contains(TOKEN_CREATE_TIME_START)) {
+ int tagStartPos = str.indexOf(TOKEN_CREATE_TIME_START);
+ int tagEndPos = str.indexOf(TOKEN_CREATE_TIME_END, tagStartPos + TOKEN_CREATE_TIME_START.length());
+
+ if(tagEndPos <= tagStartPos) {
+ break;
+ }
+
+ String tag = str.substring(tagStartPos, tagEndPos+1);
+ String dtFormat = tag.substring(TOKEN_CREATE_TIME_START.length(), tag.lastIndexOf(TOKEN_CREATE_TIME_END));
+
+ String replaceStr = "";
+
+ if(dtFormat != null) {
+ SimpleDateFormat sdf = new SimpleDateFormat(dtFormat);
+
+ replaceStr = sdf.format(now);
+ }
+
+ str = str.replace(tag, replaceStr);
+ }
+
+ return str;
+ }
+
+ public static void createParents(File file) {
+ if(file != null) {
+ String parentName = file.getParent();
+
+ if (parentName != null) {
+ File parentDir = new File(parentName);
+
+ if(!parentDir.exists()) {
+ parentDir.mkdirs();
+ }
+ }
+ }
+ }
+
+ public static long getNextRolloverTime(long lastRolloverTime, long interval) {
+ long now = System.currentTimeMillis() / 1000 * 1000; // round to second
+
+ if(lastRolloverTime <= 0) {
+ // should this be set to the next multiple-of-the-interval from start of the day?
+ return now + interval;
+ } else if(lastRolloverTime <= now) {
+ long nextRolloverTime = now + interval;
+
+ // keep it at 'interval' boundary
+ long trimInterval = (nextRolloverTime - lastRolloverTime) % interval;
+
+ return nextRolloverTime - trimInterval;
+ } else {
+ return lastRolloverTime;
+ }
+ }
+
+ public static int parseInteger(String str, int defValue) {
+ int ret = defValue;
+
+ if(str != null) {
+ try {
+ ret = Integer.parseInt(str);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ return ret;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
new file mode 100644
index 0000000..db8489c
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
@@ -0,0 +1,48 @@
+package com.xasecure.audit.provider.hdfs;
+
+import java.util.Map;
+
+import com.xasecure.audit.model.AuditEventBase;
+import com.xasecure.audit.provider.BufferedAuditProvider;
+import com.xasecure.audit.provider.LocalFileLogBuffer;
+import com.xasecure.audit.provider.MiscUtil;
+
+public class HdfsAuditProvider extends BufferedAuditProvider {
+
+ public HdfsAuditProvider() {
+ }
+
+ public void init(Map<String, String> properties) {
+ String encoding = properties.get("encoding");
+
+ String hdfsDestinationDirectory = properties.get("destination.directroy");
+ String hdfsDestinationFile = properties.get("destination.file");
+ int hdfsDestinationRolloverIntervalSeconds = MiscUtil.parseInteger(properties.get("destination.rollover.interval.seconds"), 24 * 60 * 60);
+
+ String localFileBufferDirectory = properties.get("local.buffer.directroy");
+ String localFileBufferFile = properties.get("local.buffer.file");
+ int localFileBufferRolloverIntervalSeconds = MiscUtil.parseInteger(properties.get("local.buffer.rollover.interval.seconds"), 10 * 60);
+ String localFileBufferArchiveDirectory = properties.get("local.archive.directroy");
+ int localFileBufferArchiveFileCount = MiscUtil.parseInteger(properties.get("local.archive.max.file.count"), 10);
+
+ HdfsLogDestination<AuditEventBase> mHdfsDestination = new HdfsLogDestination<AuditEventBase>();
+
+ mHdfsDestination.setDirectory(hdfsDestinationDirectory);
+ mHdfsDestination.setFile(hdfsDestinationFile);
+ mHdfsDestination.setEncoding(encoding);
+ mHdfsDestination.setRolloverIntervalSeconds(hdfsDestinationRolloverIntervalSeconds);
+
+ LocalFileLogBuffer<AuditEventBase> mLocalFileBuffer = new LocalFileLogBuffer<AuditEventBase>();
+
+ mLocalFileBuffer.setDirectory(localFileBufferDirectory);
+ mLocalFileBuffer.setFile(localFileBufferFile);
+ mLocalFileBuffer.setEncoding(encoding);
+ mLocalFileBuffer.setRolloverIntervalSeconds(localFileBufferRolloverIntervalSeconds);
+ mLocalFileBuffer.setArchiveDirectory(localFileBufferArchiveDirectory);
+ mLocalFileBuffer.setArchiveFileCount(localFileBufferArchiveFileCount);
+
+ setBufferAndDestination(mLocalFileBuffer, mHdfsDestination);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
new file mode 100644
index 0000000..eeb7574
--- /dev/null
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.xasecure.audit.provider.hdfs;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.helpers.LogLog;
+
+import com.xasecure.audit.provider.LogDestination;
+import com.xasecure.audit.provider.MiscUtil;
+
+public class HdfsLogDestination<T> implements LogDestination<T> {
+ private String mDirectory = null;
+ private String mFile = null;
+ private String mEncoding = null;
+ private boolean mIsAppend = true;
+ private int mRolloverIntervalSeconds = 24 * 60 * 60;
+
+ private OutputStreamWriter mWriter = null;
+ private String mCurrentFilename = null;
+ private long mNextRolloverTime = 0;
+ private boolean mIsStopInProgress = false;
+
+ public HdfsLogDestination() {
+ }
+
+ public String getDirectory() {
+ return mDirectory;
+ }
+
+ public void setDirectory(String directory) {
+ this.mDirectory = directory;
+ }
+
+ public String getFile() {
+ return mFile;
+ }
+
+ public void setFile(String file) {
+ this.mFile = file;
+ }
+
+ public String getEncoding() {
+ return mEncoding;
+ }
+
+ public void setEncoding(String encoding) {
+ mEncoding = encoding;
+ }
+
+ public int getRolloverIntervalSeconds() {
+ return mRolloverIntervalSeconds;
+ }
+
+ public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+ this.mRolloverIntervalSeconds = rolloverIntervalSeconds;
+ }
+
+ @Override
+ public void start() {
+ LogLog.debug("==> HdfsLogDestination.start()");
+
+ openFile();
+
+ LogLog.debug("<== HdfsLogDestination.start()");
+ }
+
+ @Override
+ public void stop() {
+ LogLog.debug("==> HdfsLogDestination.stop()");
+
+ mIsStopInProgress = true;
+
+ closeFile();
+
+ mIsStopInProgress = false;
+
+ LogLog.debug("<== HdfsLogDestination.stop()");
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return mWriter != null;
+ }
+
+ @Override
+ public boolean send(T log) {
+ boolean ret = false;
+
+ if(log != null) {
+ String msg = log.toString();
+
+ ret = sendStringified(msg);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean sendStringified(String log) {
+ boolean ret = false;
+
+ rolloverIfNeeded();
+
+ OutputStreamWriter writer = mWriter;
+
+ if(writer != null) {
+ try {
+ writer.write(log);
+
+ ret = true;
+ } catch (IOException excp) {
+ LogLog.warn("HdfsLogDestination.sendStringified(): write failed", excp);
+ }
+ }
+
+ return ret;
+ }
+
+ private void openFile() {
+ LogLog.debug("==> HdfsLogDestination.openFile()");
+
+ closeFile();
+
+ mCurrentFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
+
+ FSDataOutputStream ostream = null;
+ FileSystem fileSystem = null;
+ Path pathLogfile = null;
+ Configuration conf = null;
+
+ try {
+ LogLog.debug("HdfsLogDestination.openFile(): opening file " + mCurrentFilename);
+
+ URI uri = URI.create(mCurrentFilename);
+
+ // TODO: mechanism to XA-HDFS plugin to disable auditing of access checks to the current HDFS file
+
+ conf = new Configuration();
+ pathLogfile = new Path(mCurrentFilename);
+ fileSystem = FileSystem.get(uri, conf);
+
+ if(fileSystem.exists(pathLogfile)) {
+ if(mIsAppend) {
+ try {
+ ostream = fileSystem.append(pathLogfile);
+ } catch(IOException excp) {
+ // append may not be supported by the filesystem. rename existing file and create a new one
+ String fileSuffix = MiscUtil.replaceTokens("-" + MiscUtil.TOKEN_CREATE_TIME_START + "yyyyMMdd-HHmm.ss" + MiscUtil.TOKEN_CREATE_TIME_END);
+ String movedFilename = appendToFilename(mCurrentFilename, fileSuffix);
+ Path movedFilePath = new Path(movedFilename);
+
+ fileSystem.rename(pathLogfile, movedFilePath);
+ }
+ }
+ }
+
+ if(ostream == null){
+ ostream = fileSystem.create(pathLogfile);
+ }
+ } catch(IOException ex) {
+ Path parentPath = pathLogfile.getParent();
+
+ try {
+ if(parentPath != null&& fileSystem != null && !fileSystem.exists(parentPath) && fileSystem.mkdirs(parentPath)) {
+ ostream = fileSystem.create(pathLogfile);
+ }
+ } catch (IOException e) {
+ LogLog.warn("HdfsLogDestination.openFile() failed", e);
+ } catch (Throwable e) {
+ LogLog.warn("HdfsLogDestination.openFile() failed", e);
+ }
+ } catch(Throwable ex) {
+ LogLog.warn("HdfsLogDestination.openFile() failed", ex);
+ } finally {
+ // TODO: unset the property set above to exclude auditing of logfile opening
+ // System.setProperty(hdfsCurrentFilenameProperty, null);
+ }
+
+ mWriter = createWriter(ostream);
+
+ if(mWriter != null) {
+ LogLog.debug("HdfsLogDestination.openFile(): opened file " + mCurrentFilename);
+
+ mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000));
+ } else {
+ LogLog.warn("HdfsLogDestination.openFile(): failed to open file for write " + mCurrentFilename);
+
+ mCurrentFilename = null;
+ }
+
+ LogLog.debug("<== HdfsLogDestination.openFile(" + mCurrentFilename + ")");
+ }
+
+ private void closeFile() {
+ LogLog.debug("==> HdfsLogDestination.closeFile()");
+
+ OutputStreamWriter writer = mWriter;
+
+ mWriter = null;
+
+ if(writer != null) {
+ try {
+ writer.flush();
+ writer.close();
+ } catch(IOException excp) {
+ if(! mIsStopInProgress) { // during shutdown, the underlying FileSystem might already be closed; so don't print error details
+ LogLog.warn("HdfsLogDestination: failed to close file " + mCurrentFilename, excp);
+ }
+ }
+ }
+
+ LogLog.debug("<== HdfsLogDestination.closeFile()");
+ }
+
+ private void rollover() {
+ LogLog.debug("==> HdfsLogDestination.rollover()");
+
+ closeFile();
+
+ openFile();
+
+ LogLog.debug("<== HdfsLogDestination.rollover()");
+ }
+
+ private void rolloverIfNeeded() {
+ long now = System.currentTimeMillis();
+
+ if(now > mNextRolloverTime) {
+ rollover();
+ }
+ }
+
+ private OutputStreamWriter createWriter(OutputStream os ) {
+ OutputStreamWriter writer = null;
+
+ if(os != null) {
+ if(mEncoding != null) {
+ try {
+ writer = new OutputStreamWriter(os, mEncoding);
+ } catch(UnsupportedEncodingException excp) {
+ LogLog.warn("LocalFileLogBuffer: failed to create output writer.", excp);
+ }
+ }
+
+ if(writer == null) {
+ writer = new OutputStreamWriter(os);
+ }
+ }
+
+ return writer;
+ }
+
+ private String appendToFilename(String fileName, String strToAppend) {
+ String ret = fileName;
+
+ if(strToAppend != null) {
+ if(ret == null) {
+ ret = "";
+ }
+
+ int extnPos = ret.lastIndexOf(".");
+
+ if(extnPos < 0) {
+ ret += strToAppend;
+ } else {
+ String extn = ret.substring(extnPos);
+
+ ret = ret.substring(0, extnPos) + strToAppend + extn;
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("HdfsLogDestination {");
+ sb.append("Directory=").append(mDirectory).append("; ");
+ sb.append("File=").append(mFile).append("; ");
+ sb.append("RolloverIntervalSeconds=").append(mRolloverIntervalSeconds);
+ sb.append("}");
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/hbase-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hbase-agent/conf/xasecure-audit.xml b/hbase-agent/conf/xasecure-audit.xml
index be1b900..f33b8ba 100644
--- a/hbase-agent/conf/xasecure-audit.xml
+++ b/hbase-agent/conf/xasecure-audit.xml
@@ -87,4 +87,76 @@
<name>xasecure.audit.db.batch.size</name>
<value>100</value>
</property>
+
+
+ <!-- HDFS audit provider configuration -->
+ <property>
+ <name>xasecure.audit.hdfs.is.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.is.async</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.queue.size</name>
+ <value>10240</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.resume.queue.size</name>
+ <value>8192</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+ <value>30000</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.encoding</name>
+ <value></value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.directroy</name>
+ <value>hdfs://namenodehost:8020/audit/hbase/%create-time:yyyyMMdd%</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.file</name>
+ <value>%hostname%-audit.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.rollover.interval.seconds</name>
+ <value>86400</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
+ <value>/tmp/logs/hbase</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.file</name>
+ <value>%create-time:yyyyMMdd-HHmm.ss%.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.rollover.interval.seconds</name>
+ <value>600</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.directroy</name>
+ <value>/tmp/logs/archive/hbase</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.max.file.count</name>
+ <value>10</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/hdfs-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hdfs-agent/conf/xasecure-audit.xml b/hdfs-agent/conf/xasecure-audit.xml
index 2b24f33..1ae6f3b 100644
--- a/hdfs-agent/conf/xasecure-audit.xml
+++ b/hdfs-agent/conf/xasecure-audit.xml
@@ -1,13 +1,49 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
+ <property>
+ <name>xasecure.audit.is.enabled</name>
+ <value>true</value>
+ </property>
<property>
- <name>xasecure.audit.provider.factory</name>
- <value>com.xasecure.audit.provider.AuditProviderFactory</value>
- </property>
+ <name>xasecure.audit.repository.name</name>
+ <value>hadoopdev</value>
+ </property>
+
+
+ <!-- DB audit provider configuration -->
+ <property>
+ <name>xasecure.audit.db.is.enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.db.is.async</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.db.async.max.queue.size</name>
+ <value>10240</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.db.async.resume.queue.size</name>
+ <value>8192</value>
+ </property>
- <!-- Properties whose name begin with "xasecure.audit." are used to configure JPA -->
+ <property>
+ <name>xasecure.audit.db.async.max.flush.interval.ms</name>
+ <value>30000</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.db.batch.size</name>
+ <value>100</value>
+ </property>
+
+ <!-- Properties whose name begin with "xasecure.audit.jpa." are used to configure JPA -->
<property>
<name>xasecure.audit.jpa.javax.persistence.jdbc.url</name>
<value>jdbc:mysql://localhost:3306/xa_db</value>
@@ -32,60 +68,103 @@
<name>xasecure.audit.credential.provider.file</name>
<value>jceks://file/etc/xasecure/conf/auditcred.jceks</value>
</property>
-
- <property>
- <name>xasecure.audit.repository.name</name>
- <value>hadoopdev</value>
- </property>
-
- <property>
- <name>xasecure.audit.is.enabled</name>
- <value>true</value>
- </property>
+
+ <!-- HDFS audit provider configuration -->
<property>
- <name>xasecure.audit.log4j.is.enabled</name>
+ <name>xasecure.audit.hdfs.is.enabled</name>
<value>false</value>
</property>
<property>
- <name>xasecure.audit.log4j.is.async</name>
+ <name>xasecure.audit.hdfs.is.async</name>
<value>false</value>
</property>
-
+
<property>
- <name>xasecure.audit.log4j.async.max.queue.size</name>
+ <name>xasecure.audit.hdfs.async.max.queue.size</name>
<value>10240</value>
</property>
<property>
- <name>xasecure.audit.log4j.async.max.flush.interval.ms</name>
+ <name>xasecure.audit.hdfs.async.resume.queue.size</name>
+ <value>8192</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
<value>30000</value>
</property>
-
+
<property>
- <name>xasecure.audit.db.is.enabled</name>
- <value>true</value>
+ <name>xasecure.audit.hdfs.config.encoding</name>
+ <value></value>
</property>
-
+
<property>
- <name>xasecure.audit.db.is.async</name>
+ <name>xasecure.audit.hdfs.config.destination.directroy</name>
+ <value>hdfs://namenodehost:8020/audit/hdfs/%create-time:yyyyMMdd%</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.file</name>
+ <value>%hostname%-audit.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.rollover.interval.seconds</name>
+ <value>86400</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
+ <value>/tmp/logs/hdfs</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.file</name>
+ <value>%create-time:yyyyMMdd-HHmm.ss%.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.rollover.interval.seconds</name>
+ <value>600</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.directroy</name>
+ <value>/tmp/logs/archive/hdfs</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.max.file.count</name>
+ <value>10</value>
+ </property>
+
+ <!-- Log4j audit provider configuration -->
+ <property>
+ <name>xasecure.audit.log4j.is.enabled</name>
<value>false</value>
</property>
-
+
<property>
- <name>xasecure.audit.db.async.max.queue.size</name>
+ <name>xasecure.audit.log4j.is.async</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.log4j.async.max.queue.size</name>
<value>10240</value>
</property>
<property>
- <name>xasecure.audit.db.async.max.flush.interval.ms</name>
- <value>30000</value>
+ <name>xasecure.audit.log4j.async.resume.queue.size</name>
+ <value>8192</value>
</property>
<property>
- <name>xasecure.audit.db.batch.size</name>
- <value>100</value>
+ <name>xasecure.audit.log4j.async.max.flush.interval.ms</name>
+ <value>30000</value>
</property>
-</configuration>
\ No newline at end of file
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/hive-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hive-agent/conf/xasecure-audit.xml b/hive-agent/conf/xasecure-audit.xml
index 4014546..eb951a4 100644
--- a/hive-agent/conf/xasecure-audit.xml
+++ b/hive-agent/conf/xasecure-audit.xml
@@ -87,4 +87,76 @@
<name>xasecure.audit.db.batch.size</name>
<value>100</value>
</property>
+
+
+ <!-- HDFS audit provider configuration -->
+ <property>
+ <name>xasecure.audit.hdfs.is.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.is.async</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.queue.size</name>
+ <value>10240</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.resume.queue.size</name>
+ <value>8192</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+ <value>30000</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.encoding</name>
+ <value></value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.directroy</name>
+ <value>hdfs://namenodehost:8020/audit/hive/%create-time:yyyyMMdd%</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.file</name>
+ <value>%hostname%-audit.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.rollover.interval.seconds</name>
+ <value>86400</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
+ <value>/tmp/logs/hive</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.file</name>
+ <value>%create-time:yyyyMMdd-HHmm.ss%.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.rollover.interval.seconds</name>
+ <value>600</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.directroy</name>
+ <value>/tmp/logs/archive/hive</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.max.file.count</name>
+ <value>10</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/knox-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/knox-agent/conf/xasecure-audit.xml b/knox-agent/conf/xasecure-audit.xml
index a5252d1..987a49d 100644
--- a/knox-agent/conf/xasecure-audit.xml
+++ b/knox-agent/conf/xasecure-audit.xml
@@ -82,4 +82,76 @@
<name>xasecure.audit.db.batch.size</name>
<value>100</value>
</property>
+
+
+ <!-- HDFS audit provider configuration -->
+ <property>
+ <name>xasecure.audit.hdfs.is.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.is.async</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.queue.size</name>
+ <value>10240</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.resume.queue.size</name>
+ <value>8192</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+ <value>30000</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.encoding</name>
+ <value></value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.directroy</name>
+ <value>hdfs://namenodehost:8020/audit/knox/%create-time:yyyyMMdd%</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.file</name>
+ <value>%hostname%-audit.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.rollover.interval.seconds</name>
+ <value>86400</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
+ <value>/tmp/logs/knox</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.file</name>
+ <value>%create-time:yyyyMMdd-HHmm.ss%.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.rollover.interval.seconds</name>
+ <value>600</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.directroy</name>
+ <value>/tmp/logs/archive/knox</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.max.file.count</name>
+ <value>10</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/5ccf382d/storm-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/storm-agent/conf/xasecure-audit.xml b/storm-agent/conf/xasecure-audit.xml
index 4014546..ef0b27a 100644
--- a/storm-agent/conf/xasecure-audit.xml
+++ b/storm-agent/conf/xasecure-audit.xml
@@ -87,4 +87,76 @@
<name>xasecure.audit.db.batch.size</name>
<value>100</value>
</property>
+
+
+ <!-- HDFS audit provider configuration -->
+ <property>
+ <name>xasecure.audit.hdfs.is.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.is.async</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.queue.size</name>
+ <value>10240</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.resume.queue.size</name>
+ <value>8192</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.async.max.flush.interval.ms</name>
+ <value>30000</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.encoding</name>
+ <value></value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.directroy</name>
+ <value>hdfs://namenodehost:8020/audit/storm/%create-time:yyyyMMdd%</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.file</name>
+ <value>%hostname%-audit.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.destination.rollover.interval.seconds</name>
+ <value>86400</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
+ <value>/tmp/logs/storm</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.file</name>
+ <value>%create-time:yyyyMMdd-HHmm.ss%.log</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.buffer.rollover.interval.seconds</name>
+ <value>600</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.directroy</name>
+ <value>/tmp/logs/archive/storm</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.hdfs.config.local.archive.max.file.count</name>
+ <value>10</value>
+ </property>
</configuration>