You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/04/22 19:23:19 UTC
[02/12] incubator-ranger git commit: RANGER-276 Add support for
aggregating audit logs at source
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
index ec5e9a8..ab6a74a 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java
@@ -24,7 +24,7 @@ import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
public abstract class BufferedAuditProvider extends BaseAuditProvider {
- private LogBuffer<AuditEventBase> mBuffer = null;
+ private LogBuffer<AuditEventBase> mBuffer = null;
private LogDestination<AuditEventBase> mDestination = null;
@Override
@@ -34,34 +34,39 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
@Override
public boolean log(AuditEventBase event) {
- if(event instanceof AuthzAuditEvent) {
- AuthzAuditEvent authzEvent = (AuthzAuditEvent)event;
+ if (event instanceof AuthzAuditEvent) {
+ AuthzAuditEvent authzEvent = (AuthzAuditEvent) event;
- if(authzEvent.getAgentHostname() == null) {
+ if (authzEvent.getAgentHostname() == null) {
authzEvent.setAgentHostname(MiscUtil.getHostname());
}
- if(authzEvent.getLogType() == null) {
+ if (authzEvent.getLogType() == null) {
authzEvent.setLogType("RangerAudit");
}
- if(authzEvent.getEventId() == null) {
+ if (authzEvent.getEventId() == null) {
authzEvent.setEventId(MiscUtil.generateUniqueId());
}
}
- if(! mBuffer.add(event)) {
+ if (!mBuffer.add(event)) {
logFailedEvent(event);
+ return false;
}
return true;
}
@Override
public boolean log(Collection<AuditEventBase> events) {
+ boolean ret = true;
for (AuditEventBase event : events) {
- log(event);
+ ret = log(event);
+ if (!ret) {
+ break;
+ }
}
- return true;
+ return ret;
}
@Override
@@ -73,8 +78,12 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
@Override
public boolean logJSON(Collection<String> events) {
+ boolean ret = true;
for (String event : events) {
- logJSON(event);
+ ret = logJSON(event);
+ if (!ret) {
+ break;
+ }
}
return false;
}
@@ -93,7 +102,6 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
public void waitToComplete() {
}
-
@Override
public void waitToComplete(long timeout) {
}
@@ -120,9 +128,9 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider {
return mDestination;
}
- protected void setBufferAndDestination(LogBuffer<AuditEventBase> buffer,
- LogDestination<AuditEventBase> destination) {
- mBuffer = buffer;
+ protected void setBufferAndDestination(LogBuffer<AuditEventBase> buffer,
+ LogDestination<AuditEventBase> destination) {
+ mBuffer = buffer;
mDestination = destination;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index f4976fb..f4bd90c 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -31,6 +31,7 @@ import javax.persistence.Persistence;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.dao.DaoManager;
+import org.apache.ranger.audit.destination.AuditDestination;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
@@ -120,10 +121,14 @@ public class DbAuditProvider extends AuditDestination {
@Override
public boolean log(Collection<AuditEventBase> events) {
+ boolean ret = true;
for (AuditEventBase event : events) {
- log(event);
+ ret = log(event);
+ if(!ret) {
+ break;
+ }
}
- return true;
+ return ret;
}
@Override
@@ -135,10 +140,14 @@ public class DbAuditProvider extends AuditDestination {
@Override
public boolean logJSON(Collection<String> events) {
+ boolean ret = true;
for (String event : events) {
- logJSON(event);
+ ret = logJSON(event);
+ if( !ret ) {
+ break;
+ }
}
- return false;
+ return ret;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
deleted file mode 100644
index 62ecab1..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-
-/**
- * This class write the logs to local file
- */
-public class FileAuditDestination extends AuditDestination {
- private static final Log logger = LogFactory
- .getLog(FileAuditDestination.class);
-
- public static final String PROP_FILE_LOCAL_DIR = "dir";
- public static final String PROP_FILE_LOCAL_FILE_NAME_FORMAT = "filename.format";
- public static final String PROP_FILE_FILE_ROLLOVER = "file.rollover.sec";
-
- String baseFolder = null;
- String fileFormat = null;
- int fileRolloverSec = 24 * 60 * 60; // In seconds
- private String logFileNameFormat;
-
- boolean initDone = false;
-
- private File logFolder;
- PrintWriter logWriter = null;
-
- private Date fileCreateTime = null;
-
- private String currentFileName;
-
- private boolean isStopped = false;
-
- @Override
- public void init(Properties prop, String propPrefix) {
- super.init(prop, propPrefix);
-
- // Initialize properties for this class
- // Initial folder and file properties
- String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
- + "." + PROP_FILE_LOCAL_DIR);
- logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
- + PROP_FILE_LOCAL_FILE_NAME_FORMAT);
- fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
- + PROP_FILE_FILE_ROLLOVER, fileRolloverSec);
-
- if (logFolderProp == null || logFolderProp.isEmpty()) {
- logger.error("File destination folder is not configured. Please set "
- + propPrefix
- + "."
- + PROP_FILE_LOCAL_DIR
- + ". name="
- + getName());
- return;
- }
- logFolder = new File(logFolderProp);
- if (!logFolder.isDirectory()) {
- logFolder.mkdirs();
- if (!logFolder.isDirectory()) {
- logger.error("FileDestination folder not found and can't be created. folder="
- + logFolder.getAbsolutePath() + ", name=" + getName());
- return;
- }
- }
- logger.info("logFolder=" + logFolder + ", name=" + getName());
-
- if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
- logFileNameFormat = "%app-type%_ranger_audit.log";
- }
-
- logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
- + getName());
-
- initDone = true;
- }
-
- @Override
- public boolean logJSON(Collection<String> events) {
- try {
- PrintWriter out = getLogFileStream();
- for (String event : events) {
- out.println(event);
- }
- out.flush();
- } catch (Throwable t) {
- logError("Error writing to log file.", t);
- return false;
- }
- return true;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
- */
- @Override
- synchronized public boolean log(Collection<AuditEventBase> events) {
- if (isStopped) {
- logError("log() called after stop was requested. name=" + getName());
- return false;
- }
- List<String> jsonList = new ArrayList<String>();
- for (AuditEventBase event : events) {
- try {
- jsonList.add(MiscUtil.stringify(event));
- } catch (Throwable t) {
- logger.error("Error converting to JSON. event=" + event);
- }
- }
- return logJSON(jsonList);
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#start()
- */
- @Override
- public void start() {
- // Nothing to do here. We will open the file when the first log request
- // comes
- }
-
- @Override
- synchronized public void stop() {
- if (logWriter != null) {
- logWriter.flush();
- logWriter.close();
- logWriter = null;
- isStopped = true;
- }
- }
-
- // Helper methods in this class
- synchronized private PrintWriter getLogFileStream() throws Exception {
- closeFileIfNeeded();
-
- // Either there are no open log file or the previous one has been rolled
- // over
- if (logWriter == null) {
- Date currentTime = new Date();
- // Create a new file
- String fileName = MiscUtil.replaceTokens(logFileNameFormat,
- currentTime.getTime());
- File outLogFile = new File(logFolder, fileName);
- if (outLogFile.exists()) {
- // Let's try to get the next available file
- int i = 0;
- while (true) {
- i++;
- int lastDot = fileName.lastIndexOf('.');
- String baseName = fileName.substring(0, lastDot);
- String extension = fileName.substring(lastDot);
- String newFileName = baseName + "." + i + extension;
- File newLogFile = new File(logFolder, newFileName);
- if (!newLogFile.exists()) {
- // Move the file
- if (!outLogFile.renameTo(newLogFile)) {
- logger.error("Error renameing file. " + outLogFile
- + " to " + newLogFile);
- }
- break;
- }
- }
- }
- if (!outLogFile.exists()) {
- logger.info("Creating new file. destName=" + getName()
- + ", fileName=" + fileName);
- // Open the file
- logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
- outLogFile)));
- } else {
- logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
- outLogFile, true)));
- }
- fileCreateTime = new Date();
- currentFileName = outLogFile.getPath();
- }
- return logWriter;
- }
-
- private void closeFileIfNeeded() throws FileNotFoundException, IOException {
- if (logWriter == null) {
- return;
- }
- if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
- logger.info("Closing file. Rolling over. name=" + getName()
- + ", fileName=" + currentFileName);
- logWriter.flush();
- logWriter.close();
- logWriter = null;
- currentFileName = null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
deleted file mode 100644
index a36c40f..0000000
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.ranger.audit.model.AuditEventBase;
-
-/**
- * This class write the logs to local file
- */
-public class HDFSAuditDestination extends AuditDestination {
- private static final Log logger = LogFactory
- .getLog(HDFSAuditDestination.class);
-
- public static final String PROP_HDFS_DIR = "dir";
- public static final String PROP_HDFS_SUBDIR = "subdir";
- public static final String PROP_HDFS_FILE_NAME_FORMAT = "filename.format";
- public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec";
-
- String baseFolder = null;
- String fileFormat = null;
- int fileRolloverSec = 24 * 60 * 60; // In seconds
- private String logFileNameFormat;
-
- boolean initDone = false;
-
- private String logFolder;
- PrintWriter logWriter = null;
-
- private Date fileCreateTime = null;
-
- private String currentFileName;
-
- private boolean isStopped = false;
-
- @Override
- public void init(Properties prop, String propPrefix) {
- super.init(prop, propPrefix);
-
- // Initialize properties for this class
- // Initial folder and file properties
- String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
- + "." + PROP_HDFS_DIR);
- String logSubFolder = MiscUtil.getStringProperty(props, propPrefix
- + "." + PROP_HDFS_SUBDIR);
- if (logSubFolder == null || logSubFolder.isEmpty()) {
- logSubFolder = "%app-type%/%time:yyyyMMdd%";
- }
-
- logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "."
- + PROP_HDFS_FILE_NAME_FORMAT);
- fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
- + PROP_HDFS_ROLLOVER, fileRolloverSec);
-
- if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
- logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log";
- }
-
- if (logFolderProp == null || logFolderProp.isEmpty()) {
- logger.fatal("File destination folder is not configured. Please set "
- + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName());
- return;
- }
-
- logFolder = logFolderProp + "/" + logSubFolder;
- logger.info("logFolder=" + logFolder + ", destName=" + getName());
- logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="
- + getName());
-
- initDone = true;
- }
-
- @Override
- public boolean logJSON(Collection<String> events) {
- try {
- PrintWriter out = getLogFileStream();
- for (String event : events) {
- out.println(event);
- }
- out.flush();
- } catch (Throwable t) {
- logError("Error writing to log file.", t);
- return false;
- }
- return true;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
- */
- @Override
- synchronized public boolean log(Collection<AuditEventBase> events) {
- if (isStopped) {
- logError("log() called after stop was requested. name=" + getName());
- return false;
- }
- List<String> jsonList = new ArrayList<String>();
- for (AuditEventBase event : events) {
- try {
- jsonList.add(MiscUtil.stringify(event));
- } catch (Throwable t) {
- logger.error("Error converting to JSON. event=" + event);
- }
- }
- return logJSON(jsonList);
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ranger.audit.provider.AuditProvider#start()
- */
- @Override
- public void start() {
- // Nothing to do here. We will open the file when the first log request
- // comes
- }
-
- @Override
- synchronized public void stop() {
- try {
- if (logWriter != null) {
- logWriter.flush();
- logWriter.close();
- logWriter = null;
- isStopped = true;
- }
- } catch (Throwable t) {
- logger.error("Error closing HDFS file.", t);
- }
- }
-
- // Helper methods in this class
- synchronized private PrintWriter getLogFileStream() throws Throwable {
- closeFileIfNeeded();
-
- // Either there are no open log file or the previous one has been rolled
- // over
- if (logWriter == null) {
- Date currentTime = new Date();
- // Create a new file
- String fileName = MiscUtil.replaceTokens(logFileNameFormat,
- currentTime.getTime());
- String parentFolder = MiscUtil.replaceTokens(logFolder,
- currentTime.getTime());
- Configuration conf = new Configuration();
-
- String fullPath = parentFolder
- + org.apache.hadoop.fs.Path.SEPARATOR + fileName;
- String defaultPath = fullPath;
- URI uri = URI.create(fullPath);
- FileSystem fileSystem = FileSystem.get(uri, conf);
-
- Path hdfPath = new Path(fullPath);
- logger.info("Checking whether log file exists. hdfPath=" + fullPath);
- int i = 0;
- while (fileSystem.exists(hdfPath)) {
- i++;
- int lastDot = defaultPath.lastIndexOf('.');
- String baseName = defaultPath.substring(0, lastDot);
- String extension = defaultPath.substring(lastDot);
- fullPath = baseName + "." + i + extension;
- hdfPath = new Path(fullPath);
- logger.info("Checking whether log file exists. hdfPath=" + fullPath);
- }
- logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath);
- // Create parent folders
- createParents(hdfPath, fileSystem);
-
- // Create the file to write
- logger.info("Creating new log file. hdfPath=" + fullPath);
- FSDataOutputStream ostream = fileSystem.create(hdfPath);
- logWriter = new PrintWriter(ostream);
- fileCreateTime = new Date();
- currentFileName = fullPath;
- }
- return logWriter;
- }
-
- private void createParents(Path pathLogfile, FileSystem fileSystem)
- throws Throwable {
- logger.info("Creating parent folder for " + pathLogfile);
- Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;
-
- if (parentPath != null && fileSystem != null
- && !fileSystem.exists(parentPath)) {
- fileSystem.mkdirs(parentPath);
- }
- }
-
- private void closeFileIfNeeded() throws FileNotFoundException, IOException {
- if (logWriter == null) {
- return;
- }
- // TODO: Close the file on absolute time. Currently it is implemented as
- // relative time
- if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) {
- logger.info("Closing file. Rolling over. name=" + getName()
- + ", fileName=" + currentFileName);
- logWriter.flush();
- logWriter.close();
- logWriter = null;
- currentFileName = null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
index a5a52a0..040a045 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.model.AuthzAuditEvent;
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 57ac0a0..876fa5b 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -69,6 +69,8 @@ public class MultiDestAuditProvider extends BaseAuditProvider {
public void addAuditProviders(List<AuditProvider> providers) {
if (providers != null) {
for (AuditProvider provider : providers) {
+ LOG.info("Adding " + provider.getName()
+ + " as consumer to MultiDestination " + getName());
addAuditProvider(provider);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
new file mode 100644
index 0000000..a6f291d
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedTransferQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+
+/**
+ * This is a non-blocking queue with no limit on capacity.
+ */
+public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
+ private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class);
+
+ LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+ Thread consumerThread = null;
+
+ static final int MAX_DRAIN = 1000;
+ static int threadCount = 0;
+ static final String DEFAULT_NAME = "async";
+
+ public AuditAsyncQueue() {
+ setName(DEFAULT_NAME);
+ }
+
+ public AuditAsyncQueue(AuditProvider consumer) {
+ super(consumer);
+ setName(DEFAULT_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+ * audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ // Add to the queue and return ASAP
+ if (queue.size() >= getMaxQueueSize()) {
+ return false;
+ }
+ queue.add(event);
+ addLifeTimeInLogCount(1);
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ boolean ret = true;
+ for (AuditEventBase event : events) {
+ ret = log(event);
+ if (!ret) {
+ break;
+ }
+ }
+ return ret;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ public void start() {
+ if (consumer != null) {
+ consumer.start();
+ }
+
+ consumerThread = new Thread(this, this.getClass().getName()
+ + (threadCount++));
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+ */
+ @Override
+ public void stop() {
+ setDrain(true);
+ try {
+ if (consumerThread != null) {
+ consumerThread.interrupt();
+ }
+ consumerThread = null;
+ } catch (Throwable t) {
+ // ignore any exception
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ */
+ @Override
+ public boolean isFlushPending() {
+ if (queue.isEmpty()) {
+ return consumer.isFlushPending();
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ AuditEventBase event = null;
+ if (!isDrain()) {
+ // For Transfer queue take() is blocking
+ event = queue.take();
+ } else {
+ // For Transfer queue poll() is non blocking
+ event = queue.poll();
+ }
+ if (event != null) {
+ Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+ eventList.add(event);
+ queue.drainTo(eventList, MAX_DRAIN - 1);
+ consumer.log(eventList);
+ }
+ } catch (InterruptedException e) {
+ logger.info(
+ "Caught exception in consumer thread. Mostly to about loop",
+ e);
+ } catch (Throwable t) {
+ logger.error("Caught error during processing request.", t);
+ }
+ if (isDrain() && queue.isEmpty()) {
+ break;
+ }
+ }
+ try {
+ // Call stop on the consumer
+ consumer.stop();
+ } catch (Throwable t) {
+ logger.error("Error while calling stop on consumer.", t);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
new file mode 100644
index 0000000..5e21efc
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+
+public class AuditBatchQueue extends BaseAuditProvider implements Runnable {
+ private static final Log logger = LogFactory.getLog(AuditBatchQueue.class);
+
+ private BlockingQueue<AuditEventBase> queue = null;
+ private Collection<AuditEventBase> localBatchBuffer = new ArrayList<AuditEventBase>();
+
+ Thread consumerThread = null;
+ static int threadCount = 0;
+
+ public AuditBatchQueue() {
+ }
+
+ public AuditBatchQueue(AuditProvider consumer) {
+ super(consumer);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+ * audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ // Add to batchQueue. Block if full
+ queue.add(event);
+ addLifeTimeInLogCount(1);
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ boolean ret = true;
+ for (AuditEventBase event : events) {
+ ret = log(event);
+ if (!ret) {
+ break;
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void init(Properties prop, String basePropertyName) {
+ String propPrefix = "xasecure.audit.batch";
+ if (basePropertyName != null) {
+ propPrefix = basePropertyName;
+ }
+
+ super.init(prop, propPrefix);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ synchronized public void start() {
+ if (consumerThread != null) {
+ logger.error("Provider is already started. name=" + getName());
+ return;
+ }
+ logger.info("Creating ArrayBlockingQueue with maxSize="
+ + getMaxQueueSize());
+ queue = new ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize());
+
+ // Start the consumer first
+ consumer.start();
+
+ // Then the FileSpooler
+ if (fileSpoolerEnabled) {
+ fileSpooler.start();
+ }
+
+ // Finally the queue listener
+ consumerThread = new Thread(this, this.getClass().getName()
+ + (threadCount++));
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+ */
+ @Override
+ public void stop() {
+ setDrain(true);
+ flush();
+ try {
+ if (consumerThread != null) {
+ consumerThread.interrupt();
+ }
+ consumerThread = null;
+ } catch (Throwable t) {
+ // ignore any exception
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+ */
+ @Override
+ public void waitToComplete() {
+ int defaultTimeOut = -1;
+ waitToComplete(defaultTimeOut);
+ consumer.waitToComplete(defaultTimeOut);
+ }
+
+ @Override
+ public void waitToComplete(long timeout) {
+ setDrain(true);
+ flush();
+ long sleepTime = 1000;
+ long startTime = System.currentTimeMillis();
+ int prevQueueSize = -1;
+ int staticLoopCount = 0;
+ while ((queue.size() > 0 || localBatchBuffer.size() > 0)) {
+ if (prevQueueSize == queue.size()) {
+ logger.error("Queue size is not changing. " + getName()
+ + ".size=" + queue.size());
+ staticLoopCount++;
+ if (staticLoopCount > 5) {
+ logger.error("Aborting writing to consumer. Some logs will be discarded."
+ + getName() + ".size=" + queue.size());
+ }
+ } else {
+ staticLoopCount = 0;
+ }
+ if (consumerThread != null) {
+ consumerThread.interrupt();
+ }
+ try {
+ Thread.sleep(sleepTime);
+ if (timeout > 0
+ && (System.currentTimeMillis() - startTime > timeout)) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ consumer.waitToComplete(timeout);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ */
+ @Override
+ public boolean isFlushPending() {
+ if (queue.isEmpty()) {
+ return consumer.isFlushPending();
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+ */
+ @Override
+ public void flush() {
+ if (fileSpoolerEnabled) {
+ fileSpooler.flush();
+ }
+ consumer.flush();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ long lastDispatchTime = System.currentTimeMillis();
+ boolean isDestActive = true;
+ while (true) {
+ // Time to next dispatch
+ long nextDispatchDuration = lastDispatchTime
+ - System.currentTimeMillis() + getMaxBatchInterval();
+
+ boolean isToSpool = false;
+ boolean fileSpoolDrain = false;
+ try {
+ if (fileSpoolerEnabled && fileSpooler.isPending()) {
+ int percentUsed = (getMaxQueueSize() - queue.size()) * 100
+ / getMaxQueueSize();
+ long lastAttemptDelta = fileSpooler
+ .getLastAttemptTimeDelta();
+
+ fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime;
+ // If we should even read from queue?
+ if (!isDrain() && !fileSpoolDrain
+ && percentUsed < fileSpoolDrainThresholdPercent) {
+ // Since some files are still under progress and it is
+ // not in drain mode, lets wait and retry
+ if (nextDispatchDuration > 0) {
+ Thread.sleep(nextDispatchDuration);
+ lastDispatchTime = System.currentTimeMillis();
+ }
+ continue;
+ }
+ isToSpool = true;
+ }
+
+ AuditEventBase event = null;
+
+ if (!isToSpool && !isDrain() && !fileSpoolDrain
+ && nextDispatchDuration > 0) {
+ event = queue.poll(nextDispatchDuration,
+ TimeUnit.MILLISECONDS);
+ } else {
+ // For poll() is non blocking
+ event = queue.poll();
+ }
+
+ if (event != null) {
+ localBatchBuffer.add(event);
+ if (getMaxBatchSize() >= localBatchBuffer.size()) {
+ queue.drainTo(localBatchBuffer, getMaxBatchSize()
+ - localBatchBuffer.size());
+ }
+ } else {
+ // poll returned due to timeout, so reseting clock
+ nextDispatchDuration = lastDispatchTime
+ - System.currentTimeMillis()
+ + getMaxBatchInterval();
+
+ lastDispatchTime = System.currentTimeMillis();
+ }
+ } catch (InterruptedException e) {
+ logger.info(
+ "Caught exception in consumer thread. Mostly to abort loop",
+ e);
+ setDrain(true);
+ } catch (Throwable t) {
+ logger.error("Caught error during processing request.", t);
+ }
+
+ if (localBatchBuffer.size() > 0 && isToSpool) {
+ // Let spool to the file directly
+ if (isDestActive) {
+ logger.info("Switching to file spool. Queue=" + getName()
+ + ", dest=" + consumer.getName());
+ }
+ isDestActive = false;
+ // Just before stashing
+ lastDispatchTime = System.currentTimeMillis();
+ fileSpooler.stashLogs(localBatchBuffer);
+ localBatchBuffer.clear();
+ } else if (localBatchBuffer.size() > 0
+ && (isDrain()
+ || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
+ if (fileSpoolerEnabled && !isDestActive) {
+ logger.info("Switching to writing to destination. Queue="
+ + getName() + ", dest=" + consumer.getName());
+ }
+ // Reset time just before sending the logs
+ lastDispatchTime = System.currentTimeMillis();
+ boolean ret = consumer.log(localBatchBuffer);
+ if (!ret) {
+ if (fileSpoolerEnabled) {
+ logger.info("Switching to file spool. Queue="
+ + getName() + ", dest=" + consumer.getName());
+ // Transient error. Stash and move on
+ fileSpooler.stashLogs(localBatchBuffer);
+ isDestActive = false;
+ } else {
+ // We need to drop this event
+ logFailedEvent(localBatchBuffer, null);
+ }
+ } else {
+ isDestActive = true;
+ }
+ localBatchBuffer.clear();
+ }
+
+ if (isDrain()) {
+ if (!queue.isEmpty() || localBatchBuffer.size() > 0) {
+ logger.info("Queue is not empty. Will retry. queue.size)="
+ + queue.size() + ", localBatchBuffer.size()="
+ + localBatchBuffer.size());
+ } else {
+ break;
+ }
+ }
+ }
+
+ logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
+ + consumer.getName());
+ try {
+ // Call stop on the consumer
+ consumer.stop();
+ if (fileSpoolerEnabled) {
+ fileSpooler.stop();
+ }
+ } catch (Throwable t) {
+ logger.error("Error while calling stop on consumer.", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
new file mode 100644
index 0000000..66d1573
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java
@@ -0,0 +1,884 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * This class temporarily stores logs in file system if the destination is
+ * overloaded or down
+ */
+public class AuditFileSpool implements Runnable {
+ private static final Log logger = LogFactory.getLog(AuditFileSpool.class);
+
+ public enum SPOOL_FILE_STATUS {
+ pending, write_inprogress, read_inprogress, done
+ }
+
+ public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir";
+ public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format";
+ public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir";
+ public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files";
+ public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix";
+ public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec";
+ public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename";
+ // public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE =
+ // "filespool.index.done_filename";
+ public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms";
+
+ AuditProvider queueProvider = null;
+ AuditProvider consumerProvider = null;
+
+ BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>();
+
+ // Folder and File attributes
+ File logFolder = null;
+ String logFileNameFormat = null;
+ File archiveFolder = null;
+ String fileNamePrefix = null;
+ String indexFileName = null;
+ File indexFile = null;
+ String indexDoneFileName = null;
+ File indexDoneFile = null;
+ int retryDestinationMS = 30 * 1000; // Default 30 seconds
+ int fileRolloverSec = 24 * 60 * 60; // In seconds
+ int maxArchiveFiles = 100;
+
+ int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
+ long lastErrorLogMS = 0;
+
+ List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>();
+
+ boolean isPending = false;
+ long lastAttemptTime = 0;
+ boolean initDone = false;
+
+ PrintWriter logWriter = null;
+ AuditIndexRecord currentWriterIndexRecord = null;
+ AuditIndexRecord currentConsumerIndexRecord = null;
+
+ BufferedReader logReader = null;
+
+ Thread destinationThread = null;
+
+ boolean isWriting = true;
+ boolean isDrain = false;
+ boolean isDestDown = true;
+
+ private static Gson gson = null;
+
+ public AuditFileSpool(AuditProvider queueProvider,
+ AuditProvider consumerProvider) {
+ this.queueProvider = queueProvider;
+ this.consumerProvider = consumerProvider;
+ }
+
+ public void init(Properties prop) {
+ init(prop, null);
+ }
+
+ public void init(Properties props, String basePropertyName) {
+ if (initDone) {
+ logger.error("init() called more than once. queueProvider="
+ + queueProvider.getName() + ", consumerProvider="
+ + consumerProvider.getName());
+ return;
+ }
+ String propPrefix = "xasecure.audit.filespool";
+ if (basePropertyName != null) {
+ propPrefix = basePropertyName;
+ }
+
+ try {
+ gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+ .create();
+
+ // Initial folder and file properties
+ String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_LOCAL_DIR);
+ logFileNameFormat = MiscUtil.getStringProperty(props,
+ basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME);
+ String archiveFolderProp = MiscUtil.getStringProperty(props,
+ propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR);
+ fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_FILENAME_PREFIX);
+ indexFileName = MiscUtil.getStringProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_INDEX_FILE);
+ retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix
+ + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS);
+ fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec);
+ maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles);
+
+ logger.info("retryDestinationMS=" + retryDestinationMS
+ + ", queueName=" + queueProvider.getName());
+ logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName="
+ + queueProvider.getName());
+ logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName="
+ + queueProvider.getName());
+
+ if (logFolderProp == null || logFolderProp.isEmpty()) {
+ logger.error("Audit spool folder is not configured. Please set "
+ + propPrefix
+ + "."
+ + PROP_FILE_SPOOL_LOCAL_DIR
+ + ". queueName=" + queueProvider.getName());
+ return;
+ }
+ logFolder = new File(logFolderProp);
+ if (!logFolder.isDirectory()) {
+ logFolder.mkdirs();
+ if (!logFolder.isDirectory()) {
+ logger.error("File Spool folder not found and can't be created. folder="
+ + logFolder.getAbsolutePath()
+ + ", queueName="
+ + queueProvider.getName());
+ return;
+ }
+ }
+ logger.info("logFolder=" + logFolder + ", queueName="
+ + queueProvider.getName());
+
+ if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+ logFileNameFormat = "spool_" + "%app-type%" + "_"
+ + "%time:yyyyMMdd-HHmm.ss%.log";
+ }
+ logger.info("logFileNameFormat=" + logFileNameFormat
+ + ", queueName=" + queueProvider.getName());
+
+ if (archiveFolderProp == null || archiveFolderProp.isEmpty()) {
+ archiveFolder = new File(logFolder, "archive");
+ } else {
+ archiveFolder = new File(archiveFolderProp);
+ }
+ if (!archiveFolder.isDirectory()) {
+ archiveFolder.mkdirs();
+ if (!archiveFolder.isDirectory()) {
+ logger.error("File Spool archive folder not found and can't be created. folder="
+ + archiveFolder.getAbsolutePath()
+ + ", queueName="
+ + queueProvider.getName());
+ return;
+ }
+ }
+ logger.info("archiveFolder=" + archiveFolder + ", queueName="
+ + queueProvider.getName());
+
+ if (indexFileName == null || indexFileName.isEmpty()) {
+ if (fileNamePrefix == null || fileNamePrefix.isEmpty()) {
+ fileNamePrefix = queueProvider.getName() + "_"
+ + consumerProvider.getName();
+ }
+ indexFileName = "index_" + fileNamePrefix + ".json";
+ }
+
+ indexFile = new File(logFolder, indexFileName);
+ if (!indexFile.exists()) {
+ indexFile.createNewFile();
+ }
+ logger.info("indexFile=" + indexFile + ", queueName="
+ + queueProvider.getName());
+
+ int lastDot = indexFileName.lastIndexOf('.');
+ indexDoneFileName = indexFileName.substring(0, lastDot)
+ + "_closed.json";
+ indexDoneFile = new File(logFolder, indexDoneFileName);
+ if (!indexDoneFile.exists()) {
+ indexDoneFile.createNewFile();
+ }
+ logger.info("indexDoneFile=" + indexDoneFile + ", queueName="
+ + queueProvider.getName());
+
+ // Load index file
+ loadIndexFile();
+ for (AuditIndexRecord auditIndexRecord : indexRecords) {
+ if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) {
+ isPending = true;
+ }
+ if (auditIndexRecord.status
+ .equals(SPOOL_FILE_STATUS.write_inprogress)) {
+ currentWriterIndexRecord = auditIndexRecord;
+ logger.info("currentWriterIndexRecord="
+ + currentWriterIndexRecord.filePath
+ + ", queueName=" + queueProvider.getName());
+ }
+ if (auditIndexRecord.status
+ .equals(SPOOL_FILE_STATUS.read_inprogress)) {
+ indexQueue.add(auditIndexRecord);
+ }
+ }
+ printIndex();
+ // One more loop to add the rest of the pending records in reverse
+ // order
+ for (int i = 0; i < indexRecords.size(); i++) {
+ AuditIndexRecord auditIndexRecord = indexRecords.get(i);
+ if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
+ File consumerFile = new File(auditIndexRecord.filePath);
+ if (!consumerFile.exists()) {
+ logger.error("INIT: Consumer file="
+ + consumerFile.getPath() + " not found.");
+ System.exit(1);
+ }
+ indexQueue.add(auditIndexRecord);
+ }
+ }
+
+ } catch (Throwable t) {
+ logger.fatal("Error initializing File Spooler. queue="
+ + queueProvider.getName(), t);
+ return;
+ }
+ initDone = true;
+ }
+
+ /**
+ * Start looking for outstanding logs and update status according.
+ */
+ public void start() {
+ if (!initDone) {
+ logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName="
+ + queueProvider.getName());
+ return;
+ }
+
+ logger.info("Starting writerThread, queueName="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+
+ // Let's start the thread to read
+ destinationThread = new Thread(this, queueProvider.getName()
+ + "_destWriter");
+ destinationThread.setDaemon(true);
+ destinationThread.start();
+ }
+
+ public void stop() {
+ if (!initDone) {
+ logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName="
+ + queueProvider.getName());
+ return;
+ }
+ logger.info("Stop called, queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+
+ isDrain = true;
+ flush();
+
+ PrintWriter out = getOpenLogFileStream();
+ if (out != null) {
+ // If write is still going on, then let's give it enough time to
+ // complete
+ for (int i = 0; i < 3; i++) {
+ if (isWriting) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ continue;
+ }
+ try {
+ logger.info("Closing open file, queueName="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+
+ out.flush();
+ out.close();
+ } catch (Throwable t) {
+ logger.debug("Error closing spool out file.", t);
+ }
+ }
+ }
+ try {
+ if (destinationThread != null) {
+ destinationThread.interrupt();
+ }
+ destinationThread = null;
+ } catch (Throwable e) {
+ // ignore
+ }
+ }
+
+ public void flush() {
+ if (!initDone) {
+ logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName="
+ + queueProvider.getName());
+ return;
+ }
+ PrintWriter out = getOpenLogFileStream();
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /**
+ * If any files are still not processed. Also, if the destination is not
+ * reachable
+ *
+ * @return
+ */
+ public boolean isPending() {
+ if (!initDone) {
+ logError("isPending(): File Spooler not initialized. queueName="
+ + queueProvider.getName());
+ return false;
+ }
+
+ return isPending;
+ }
+
+ /**
+ * Milliseconds from last attempt time
+ *
+ * @return
+ */
+ public long getLastAttemptTimeDelta() {
+ if (lastAttemptTime == 0) {
+ return 0;
+ }
+ return System.currentTimeMillis() - lastAttemptTime;
+ }
+
+ synchronized public void stashLogs(AuditEventBase event) {
+ if (isDrain) {
+ // Stop has been called, so this method shouldn't be called
+ logger.error("stashLogs() is called after stop is called. event="
+ + event);
+ return;
+ }
+ try {
+ isWriting = true;
+ PrintWriter logOut = getLogFileStream();
+ // Convert event to json
+ String jsonStr = MiscUtil.stringify(event);
+ logOut.println(jsonStr);
+ isPending = true;
+ } catch (Exception ex) {
+ logger.error("Error writing to file. event=" + event, ex);
+ } finally {
+ isWriting = false;
+ }
+
+ }
+
+ synchronized public void stashLogs(Collection<AuditEventBase> events) {
+ for (AuditEventBase event : events) {
+ stashLogs(event);
+ }
+ flush();
+ }
+
+ synchronized public void stashLogsString(String event) {
+ if (isDrain) {
+ // Stop has been called, so this method shouldn't be called
+ logger.error("stashLogs() is called after stop is called. event="
+ + event);
+ return;
+ }
+ try {
+ isWriting = true;
+ PrintWriter logOut = getLogFileStream();
+ logOut.println(event);
+ } catch (Exception ex) {
+ logger.error("Error writing to file. event=" + event, ex);
+ } finally {
+ isWriting = false;
+ }
+
+ }
+
+ synchronized public void stashLogsString(Collection<String> events) {
+ for (String event : events) {
+ stashLogsString(event);
+ }
+ flush();
+ }
+
+ /**
+ * This return the current file. If there are not current open output file,
+ * then it will return null
+ *
+ * @return
+ * @throws Exception
+ */
+ synchronized private PrintWriter getOpenLogFileStream() {
+ return logWriter;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ synchronized private PrintWriter getLogFileStream() throws Exception {
+ closeFileIfNeeded();
+
+ // Either there are no open log file or the previous one has been rolled
+ // over
+ if (currentWriterIndexRecord == null) {
+ Date currentTime = new Date();
+ // Create a new file
+ String fileName = MiscUtil.replaceTokens(logFileNameFormat,
+ currentTime.getTime());
+ String newFileName = fileName;
+ File outLogFile = null;
+ int i = 0;
+ while (true) {
+ outLogFile = new File(logFolder, newFileName);
+ File archiveLogFile = new File(archiveFolder, newFileName);
+ if (!outLogFile.exists() && !archiveLogFile.exists()) {
+ break;
+ }
+ i++;
+ int lastDot = fileName.lastIndexOf('.');
+ String baseName = fileName.substring(0, lastDot);
+ String extension = fileName.substring(lastDot);
+ newFileName = baseName + "." + i + extension;
+ }
+ fileName = newFileName;
+ logger.info("Creating new file. queueName="
+ + queueProvider.getName() + ", fileName=" + fileName);
+ // Open the file
+ logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+ outLogFile)));
+
+ AuditIndexRecord tmpIndexRecord = new AuditIndexRecord();
+
+ tmpIndexRecord.id = MiscUtil.generateUniqueId();
+ tmpIndexRecord.filePath = outLogFile.getPath();
+ tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress;
+ tmpIndexRecord.fileCreateTime = currentTime;
+ tmpIndexRecord.lastAttempt = true;
+ currentWriterIndexRecord = tmpIndexRecord;
+ indexRecords.add(currentWriterIndexRecord);
+ saveIndexFile();
+
+ } else {
+ if (logWriter == null) {
+ // This means the process just started. We need to open the file
+ // in append mode.
+ logger.info("Opening existing file for append. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ logWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+ currentWriterIndexRecord.filePath, true)));
+ }
+ }
+ return logWriter;
+ }
+
+ synchronized private void closeFileIfNeeded() throws FileNotFoundException,
+ IOException {
+ // Is there file open to write or there are no pending file, then close
+ // the active file
+ if (currentWriterIndexRecord != null) {
+ // Check whether the file needs to rolled
+ boolean closeFile = false;
+ if (indexRecords.size() == 1) {
+ closeFile = true;
+ logger.info("Closing file. Only one open file. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ } else if (System.currentTimeMillis()
+ - currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) {
+ closeFile = true;
+ logger.info("Closing file. Rolling over. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ }
+ if (closeFile) {
+ // Roll the file
+ if (logWriter != null) {
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ }
+ currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending;
+ currentWriterIndexRecord.writeCompleteTime = new Date();
+ saveIndexFile();
+ logger.info("Adding file to queue. queueName="
+ + queueProvider.getName() + ", fileName="
+ + currentWriterIndexRecord.filePath);
+ indexQueue.add(currentWriterIndexRecord);
+ currentWriterIndexRecord = null;
+ }
+ }
+ }
+
+ /**
+ * Load the index file
+ *
+ * @throws IOException
+ */
+ void loadIndexFile() throws IOException {
+ logger.info("Loading index file. fileName=" + indexFile.getPath());
+ BufferedReader br = new BufferedReader(new FileReader(indexFile));
+ indexRecords.clear();
+ String line;
+ while ((line = br.readLine()) != null) {
+ if (!line.isEmpty() && !line.startsWith("#")) {
+ AuditIndexRecord record = gson.fromJson(line,
+ AuditIndexRecord.class);
+ indexRecords.add(record);
+ }
+ }
+ br.close();
+ }
+
+ synchronized void printIndex() {
+ logger.info("INDEX printIndex() ==== START");
+ Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+ while (iter.hasNext()) {
+ AuditIndexRecord record = iter.next();
+ logger.info("INDEX=" + record + ", isFileExist="
+ + (new File(record.filePath).exists()));
+ }
+ logger.info("INDEX printIndex() ==== END");
+ }
+
+ synchronized void removeIndexRecord(AuditIndexRecord indexRecord)
+ throws FileNotFoundException, IOException {
+ Iterator<AuditIndexRecord> iter = indexRecords.iterator();
+ while (iter.hasNext()) {
+ AuditIndexRecord record = iter.next();
+ if (record.id.equals(indexRecord.id)) {
+ logger.info("Removing file from index. file=" + record.filePath
+ + ", queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+
+ iter.remove();
+ appendToDoneFile(record);
+ }
+ }
+ saveIndexFile();
+ }
+
+ synchronized void saveIndexFile() throws FileNotFoundException, IOException {
+ PrintWriter out = new PrintWriter(indexFile);
+ for (AuditIndexRecord auditIndexRecord : indexRecords) {
+ out.println(gson.toJson(auditIndexRecord));
+ }
+ out.close();
+ // printIndex();
+
+ }
+
+ void appendToDoneFile(AuditIndexRecord indexRecord)
+ throws FileNotFoundException, IOException {
+ logger.info("Moving to done file. " + indexRecord.filePath
+ + ", queueName=" + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+ String line = gson.toJson(indexRecord);
+ PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(
+ indexDoneFile, true)));
+ out.println(line);
+ out.flush();
+ out.close();
+
+ // Move to archive folder
+ File logFile = null;
+ File archiveFile = null;
+ try {
+ logFile = new File(indexRecord.filePath);
+ String fileName = logFile.getName();
+ archiveFile = new File(archiveFolder, fileName);
+ logger.info("Moving logFile " + logFile + " to " + archiveFile);
+ logFile.renameTo(archiveFile);
+ } catch (Throwable t) {
+ logger.error("Error moving log file to archive folder. logFile="
+ + logFile + ", archiveFile=" + archiveFile, t);
+ }
+
+ archiveFile = null;
+ try {
+ // Remove old files
+ File[] logFiles = archiveFolder.listFiles(new FileFilter() {
+ public boolean accept(File pathname) {
+ return pathname.getName().toLowerCase().endsWith(".log");
+ }
+ });
+
+ if (logFiles.length > maxArchiveFiles) {
+ int filesToDelete = logFiles.length - maxArchiveFiles;
+ BufferedReader br = new BufferedReader(new FileReader(
+ indexDoneFile));
+ try {
+ int filesDeletedCount = 0;
+ while ((line = br.readLine()) != null) {
+ if (!line.isEmpty() && !line.startsWith("#")) {
+ AuditIndexRecord record = gson.fromJson(line,
+ AuditIndexRecord.class);
+ logFile = new File(record.filePath);
+ String fileName = logFile.getName();
+ archiveFile = new File(archiveFolder, fileName);
+ if (archiveFile.exists()) {
+ logger.info("Deleting archive file "
+ + archiveFile);
+ boolean ret = archiveFile.delete();
+ if (!ret) {
+ logger.error("Error deleting archive file. archiveFile="
+ + archiveFile);
+ }
+ filesDeletedCount++;
+ if (filesDeletedCount >= filesToDelete) {
+ logger.info("Deleted " + filesDeletedCount
+ + " files");
+ break;
+ }
+ }
+ }
+ }
+ } finally {
+ br.close();
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Error deleting older archive file. archiveFile="
+ + archiveFile, t);
+ }
+
+ }
+
+ void logError(String msg) {
+ long currTimeMS = System.currentTimeMillis();
+ if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
+ logger.error(msg);
+ lastErrorLogMS = currTimeMS;
+ }
+ }
+
+ class AuditIndexRecord {
+ String id;
+ String filePath;
+ int linePosition = 0;
+ SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress;
+ Date fileCreateTime;
+ Date writeCompleteTime;
+ Date doneCompleteTime;
+ Date lastSuccessTime;
+ Date lastFailedTime;
+ int failedAttemptCount = 0;
+ boolean lastAttempt = false;
+
+ @Override
+ public String toString() {
+ return "AuditIndexRecord [id=" + id + ", filePath=" + filePath
+ + ", linePosition=" + linePosition + ", status=" + status
+ + ", fileCreateTime=" + fileCreateTime
+ + ", writeCompleteTime=" + writeCompleteTime
+ + ", doneCompleteTime=" + doneCompleteTime
+ + ", lastSuccessTime=" + lastSuccessTime
+ + ", lastFailedTime=" + lastFailedTime
+ + ", failedAttemptCount=" + failedAttemptCount
+ + ", lastAttempt=" + lastAttempt + "]";
+ }
+
+ }
+
+ class AuditFileSpoolAttempt {
+ Date attemptTime;
+ String status;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ // Let's pause between each iteration
+ if (currentConsumerIndexRecord == null) {
+ currentConsumerIndexRecord = indexQueue.poll(
+ retryDestinationMS, TimeUnit.MILLISECONDS);
+ } else {
+ Thread.sleep(retryDestinationMS);
+ }
+
+ if (isDrain) {
+ // Need to exit
+ break;
+ }
+ if (currentConsumerIndexRecord == null) {
+ closeFileIfNeeded();
+ continue;
+ }
+
+ boolean isRemoveIndex = false;
+ File consumerFile = new File(
+ currentConsumerIndexRecord.filePath);
+ if (!consumerFile.exists()) {
+ logger.error("Consumer file=" + consumerFile.getPath()
+ + " not found.");
+ printIndex();
+ isRemoveIndex = true;
+ } else {
+ // Let's open the file to write
+ BufferedReader br = new BufferedReader(new FileReader(
+ currentConsumerIndexRecord.filePath));
+ try {
+ int startLine = currentConsumerIndexRecord.linePosition;
+ String line;
+ int currLine = 0;
+ boolean isResumed = false;
+ List<String> lines = new ArrayList<String>();
+ while ((line = br.readLine()) != null) {
+ currLine++;
+ if (currLine < startLine) {
+ continue;
+ }
+ lines.add(line);
+ if (lines.size() == queueProvider.getMaxBatchSize()) {
+ boolean ret = sendEvent(lines,
+ currentConsumerIndexRecord, currLine);
+ if (!ret) {
+ throw new Exception("Destination down");
+ } else {
+ if (!isResumed) {
+ logger.info("Started writing to destination. file="
+ + currentConsumerIndexRecord.filePath
+ + ", queueName="
+ + queueProvider.getName()
+ + ", consumer="
+ + consumerProvider.getName());
+ }
+ }
+ lines.clear();
+ }
+ }
+ if (lines.size() > 0) {
+ boolean ret = sendEvent(lines,
+ currentConsumerIndexRecord, currLine);
+ if (!ret) {
+ throw new Exception("Destination down");
+ } else {
+ if (!isResumed) {
+ logger.info("Started writing to destination. file="
+ + currentConsumerIndexRecord.filePath
+ + ", queueName="
+ + queueProvider.getName()
+ + ", consumer="
+ + consumerProvider.getName());
+ }
+ }
+ lines.clear();
+ }
+ logger.info("Done reading file. file="
+ + currentConsumerIndexRecord.filePath
+ + ", queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+ // The entire file is read
+ currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done;
+ currentConsumerIndexRecord.doneCompleteTime = new Date();
+ currentConsumerIndexRecord.lastAttempt = true;
+
+ isRemoveIndex = true;
+ } catch (Exception ex) {
+ isDestDown = true;
+ logError("Destination down. queueName="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+ lastAttemptTime = System.currentTimeMillis();
+ // Update the index file
+ currentConsumerIndexRecord.lastFailedTime = new Date();
+ currentConsumerIndexRecord.failedAttemptCount++;
+ currentConsumerIndexRecord.lastAttempt = false;
+ saveIndexFile();
+ } finally {
+ br.close();
+ }
+ }
+ if (isRemoveIndex) {
+ // Remove this entry from index
+ removeIndexRecord(currentConsumerIndexRecord);
+ currentConsumerIndexRecord = null;
+ closeFileIfNeeded();
+ }
+ } catch (Throwable t) {
+ logger.error("Exception in destination writing thread.", t);
+ }
+ }
+ logger.info("Exiting file spooler. provider=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+ }
+
+ private boolean sendEvent(List<String> lines, AuditIndexRecord indexRecord,
+ int currLine) {
+ boolean ret = true;
+ try {
+ ret = consumerProvider.logJSON(lines);
+ if (!ret) {
+ // Need to log error after fixed interval
+ logError("Error sending logs to consumer. provider="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName());
+ } else {
+ // Update index and save
+ indexRecord.linePosition = currLine;
+ indexRecord.status = SPOOL_FILE_STATUS.read_inprogress;
+ indexRecord.lastSuccessTime = new Date();
+ indexRecord.lastAttempt = true;
+ saveIndexFile();
+
+ if (isDestDown) {
+ isDestDown = false;
+ logger.info("Destination up now. " + indexRecord.filePath
+ + ", queueName=" + queueProvider.getName()
+ + ", consumer=" + consumerProvider.getName());
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Error while sending logs to consumer. provider="
+ + queueProvider.getName() + ", consumer="
+ + consumerProvider.getName() + ", log=" + lines, t);
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
new file mode 100644
index 0000000..e102d8b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.queue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+/**
+ * This is a non-blocking queue with no limit on capacity.
+ */
+public class AuditSummaryQueue extends BaseAuditProvider implements Runnable {
+ private static final Log logger = LogFactory
+ .getLog(AuditSummaryQueue.class);
+
+ public static final String PROP_SUMMARY_INTERVAL = "summary.interval.ms";
+
+ LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>();
+ Thread consumerThread = null;
+
+ static int threadCount = 0;
+ static final String DEFAULT_NAME = "summary";
+
+ private static final int MAX_DRAIN = 100000;
+
+ private int maxSummaryInterval = 5000;
+
+ HashMap<String, AuditSummary> summaryMap = new HashMap<String, AuditSummary>();
+
+ public AuditSummaryQueue() {
+ setName(DEFAULT_NAME);
+ }
+
+ public AuditSummaryQueue(AuditProvider consumer) {
+ super(consumer);
+ setName(DEFAULT_NAME);
+ }
+
+ @Override
+ public void init(Properties props, String propPrefix) {
+ super.init(props, propPrefix);
+ maxSummaryInterval = MiscUtil.getIntProperty(props, propPrefix + "."
+ + PROP_SUMMARY_INTERVAL, maxSummaryInterval);
+ logger.info("maxSummaryInterval=" + maxSummaryInterval + ", name="
+ + getName());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
+ * audit.model.AuditEventBase)
+ */
+ @Override
+ public boolean log(AuditEventBase event) {
+ // Add to the queue and return ASAP
+ if (queue.size() >= getMaxQueueSize()) {
+ return false;
+ }
+ queue.add(event);
+ addLifeTimeInLogCount(1);
+ return true;
+ }
+
+ @Override
+ public boolean log(Collection<AuditEventBase> events) {
+ boolean ret = true;
+ for (AuditEventBase event : events) {
+ ret = log(event);
+ if (!ret) {
+ break;
+ }
+ }
+ return ret;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#start()
+ */
+ @Override
+ public void start() {
+ if (consumer != null) {
+ consumer.start();
+ }
+
+ consumerThread = new Thread(this, this.getClass().getName()
+ + (threadCount++));
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+ */
+ @Override
+ public void stop() {
+ setDrain(true);
+ try {
+ if (consumerThread != null) {
+ consumerThread.interrupt();
+ }
+ consumerThread = null;
+ } catch (Throwable t) {
+ // ignore any exception
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+ */
+ @Override
+ public boolean isFlushPending() {
+ if (queue.isEmpty()) {
+ return consumer.isFlushPending();
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ long lastDispatchTime = System.currentTimeMillis();
+
+ while (true) {
+ // Time to next dispatch
+ long nextDispatchDuration = lastDispatchTime
+ - System.currentTimeMillis() + maxSummaryInterval;
+
+ Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>();
+
+ try {
+ AuditEventBase event = null;
+ if (!isDrain() && nextDispatchDuration > 0) {
+ event = queue.poll(nextDispatchDuration,
+ TimeUnit.MILLISECONDS);
+ } else {
+ // For poll() is non blocking
+ event = queue.poll();
+ }
+
+ if (event != null) {
+ eventList.add(event);
+ queue.drainTo(eventList, MAX_DRAIN - 1);
+ } else {
+ // poll returned due to timeout, so reseting clock
+ nextDispatchDuration = lastDispatchTime
+ - System.currentTimeMillis() + maxSummaryInterval;
+ lastDispatchTime = System.currentTimeMillis();
+ }
+ } catch (InterruptedException e) {
+ logger.info(
+ "Caught exception in consumer thread. Mostly to about loop",
+ e);
+ } catch (Throwable t) {
+ logger.error("Caught error during processing request.", t);
+ }
+
+ for (AuditEventBase event : eventList) {
+ // Add to hash map
+ String key = event.getEventKey();
+ AuditSummary auditSummary = summaryMap.get(key);
+ if (auditSummary == null) {
+ auditSummary = new AuditSummary();
+ auditSummary.event = event;
+ auditSummary.startTime = event.getEventTime();
+ auditSummary.endTime = event.getEventTime();
+ auditSummary.count = 1;
+ summaryMap.put(key, auditSummary);
+ } else {
+ auditSummary.endTime = event.getEventTime();
+ auditSummary.count++;
+ }
+ }
+
+ if (isDrain() || nextDispatchDuration <= 0) {
+ for (Map.Entry<String, AuditSummary> entry : summaryMap
+ .entrySet()) {
+ AuditSummary auditSummary = entry.getValue();
+ auditSummary.event.setEventCount(auditSummary.count);
+ long timeDiff = auditSummary.endTime.getTime()
+ - auditSummary.startTime.getTime();
+ timeDiff = timeDiff > 0 ? timeDiff : 1;
+ auditSummary.event.setEventDurationMS(timeDiff);
+
+ // Reset time just before sending the logs
+ lastDispatchTime = System.currentTimeMillis();
+ boolean ret = consumer.log(auditSummary.event);
+ if (!ret) {
+ // We need to drop this event
+ logFailedEvent(auditSummary.event, null);
+ }
+ }
+ summaryMap.clear();
+ }
+
+ if (isDrain() && summaryMap.isEmpty() && queue.isEmpty()) {
+ break;
+ }
+ }
+
+ try {
+ // Call stop on the consumer
+ consumer.stop();
+ } catch (Throwable t) {
+ logger.error("Error while calling stop on consumer.", t);
+ }
+ }
+
+ class AuditSummary {
+ Date startTime = null;
+ Date endTime = null;
+ int count = 0;
+ AuditEventBase event;
+ }
+}