You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2014/09/22 20:29:21 UTC
[07/10] git commit: ARGUS-5: added configurable retry interval
between attempts to open HDFS file.
ARGUS-5: added configurable retry interval between attempts to open HDFS
file.
Project: http://git-wip-us.apache.org/repos/asf/incubator-argus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-argus/commit/c324e8df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-argus/tree/c324e8df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-argus/diff/c324e8df
Branch: refs/heads/master
Commit: c324e8dfa123091a49bff1b3d3f34675f232c7e2
Parents: 5ccf382
Author: mneethiraj <mn...@hortonworks.com>
Authored: Sun Sep 21 14:11:16 2014 -0700
Committer: mneethiraj <mn...@hortonworks.com>
Committed: Sun Sep 21 14:11:16 2014 -0700
----------------------------------------------------------------------
.../audit/provider/LocalFileLogBuffer.java | 83 ++++++++++----------
.../com/xasecure/audit/provider/MiscUtil.java | 2 +-
.../audit/provider/hdfs/HdfsAuditProvider.java | 8 +-
.../audit/provider/hdfs/HdfsLogDestination.java | 67 ++++++++++------
hbase-agent/conf/xasecure-audit.xml | 5 ++
hdfs-agent/conf/xasecure-audit.xml | 5 ++
hive-agent/conf/xasecure-audit.xml | 5 ++
knox-agent/conf/xasecure-audit.xml | 5 ++
storm-agent/conf/xasecure-audit.xml | 5 ++
9 files changed, 114 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
index 9acae11..280d02c 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/LocalFileLogBuffer.java
@@ -50,16 +50,16 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
private String mArchiveDirectory = null;
private int mArchiveFileCount = 10;
- private Writer mWriter = null;
- private String mCurrentFilename = null;
- private long mNextRolloverTime = 0;
+ private Writer mWriter = null;
+ private String mBufferFilename = null;
+ private long mNextRolloverTime = 0;
private Gson mGsonBuilder = null;
private DestinationDispatcherThread<T> mDispatcherThread = null;
public LocalFileLogBuffer() {
- mGsonBuilder = new GsonBuilder().setPrettyPrinting().create();
+ mGsonBuilder = new GsonBuilder().create();
}
public String getDirectory() {
@@ -161,11 +161,7 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
public synchronized boolean add(T log) {
boolean ret = false;
- long now = System.currentTimeMillis();
-
- if(now > mNextRolloverTime) {
- rollover();
- }
+ rolloverIfNeeded();
Writer writer = mWriter;
@@ -195,16 +191,16 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
closeFile();
- mCurrentFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
+ mBufferFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
FileOutputStream ostream = null;
try {
- ostream = new FileOutputStream(mCurrentFilename, mIsAppend);
+ ostream = new FileOutputStream(mBufferFilename, mIsAppend);
} catch(Exception excp) {
- MiscUtil.createParents(new File(mCurrentFilename));
+ MiscUtil.createParents(new File(mBufferFilename));
try {
- ostream = new FileOutputStream(mCurrentFilename, mIsAppend);
+ ostream = new FileOutputStream(mBufferFilename, mIsAppend);
} catch(Exception ex) {
// ignore; error printed down
}
@@ -213,13 +209,13 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
mWriter = createWriter(ostream);
if(mWriter != null) {
- LogLog.debug("LocalFileLogBuffer.openFile(): opened file " + mCurrentFilename);
+ LogLog.debug("LocalFileLogBuffer.openFile(): opened file " + mBufferFilename);
mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000));
} else {
- LogLog.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mCurrentFilename);
+ LogLog.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mBufferFilename);
- mCurrentFilename = null;
+ mBufferFilename = null;
}
LogLog.debug("<== LocalFileLogBuffer.openFile()");
@@ -237,11 +233,11 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
writer.flush();
writer.close();
} catch(IOException excp) {
- LogLog.warn("LocalFileLogBuffer: failed to close file " + mCurrentFilename, excp);
+ LogLog.warn("LocalFileLogBuffer: failed to close file " + mBufferFilename, excp);
}
if(mDispatcherThread != null) {
- mDispatcherThread.addLogfile(mCurrentFilename);
+ mDispatcherThread.addLogfile(mBufferFilename);
}
}
@@ -258,6 +254,14 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
LogLog.debug("<== LocalFileLogBuffer.rollover()");
}
+ private void rolloverIfNeeded() {
+ long now = System.currentTimeMillis();
+
+ if(now > mNextRolloverTime) {
+ rollover();
+ }
+ }
+
public OutputStreamWriter createWriter(OutputStream os ) {
OutputStreamWriter writer = null;
@@ -279,7 +283,7 @@ public class LocalFileLogBuffer<T> implements LogBuffer<T> {
}
boolean isCurrentFilename(String filename) {
- return mCurrentFilename != null && filename != null && filename.equals(mCurrentFilename);
+ return mBufferFilename != null && filename != null && filename.equals(mBufferFilename);
}
private String toJson(T log) {
@@ -395,6 +399,8 @@ class DestinationDispatcherThread<T> extends Thread {
}
private boolean sendCurrentFile() {
+ LogLog.debug("==> DestinationDispatcherThread.sendCurrentFile()");
+
boolean ret = false;
int destinationPollIntervalInMs = 1000;
@@ -418,6 +424,12 @@ class DestinationDispatcherThread<T> extends Thread {
closeCurrentFile();
+ if(!mStopThread) {
+ archiveCurrentFile();
+ }
+
+ LogLog.debug("<== DestinationDispatcherThread.sendCurrentFile()");
+
return ret;
}
@@ -452,12 +464,7 @@ class DestinationDispatcherThread<T> extends Thread {
} catch (IOException excp) {
LogLog.warn("getNextStringifiedLog.getNextLog(): failed to read from file " + mCurrentLogfile, excp);
}
-
- if(log == null) {
- closeCurrentFile();
- }
}
- LogLog.warn("READ: " + log);
return log;
}
@@ -465,21 +472,17 @@ class DestinationDispatcherThread<T> extends Thread {
private void openCurrentFile() {
LogLog.debug("==> openCurrentFile(" + mCurrentLogfile + ")");
- closeCurrentFile();
-
- while(mReader == null) {
- if(mCurrentLogfile != null) {
- try {
- FileInputStream inStr = new FileInputStream(mCurrentLogfile);
-
- InputStreamReader strReader = createReader(inStr);
-
- if(strReader != null) {
- mReader = new BufferedReader(strReader);
- }
- } catch(FileNotFoundException excp) {
- LogLog.warn("openNextFile(): error while opening file " + mCurrentLogfile, excp);
+ if(mCurrentLogfile != null) {
+ try {
+ FileInputStream inStr = new FileInputStream(mCurrentLogfile);
+
+ InputStreamReader strReader = createReader(inStr);
+
+ if(strReader != null) {
+ mReader = new BufferedReader(strReader);
}
+ } catch(FileNotFoundException excp) {
+ LogLog.warn("openNextFile(): error while opening file " + mCurrentLogfile, excp);
}
}
@@ -498,10 +501,6 @@ class DestinationDispatcherThread<T> extends Thread {
}
mReader = null;
- if(!mStopThread) {
- archiveCurrentFile();
- }
-
LogLog.debug("<== closeCurrentFile(" + mCurrentLogfile + ")");
}
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java b/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java
index 6610210..c84fdf6 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/MiscUtil.java
@@ -42,7 +42,7 @@ public class MiscUtil {
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException excp) {
- LogLog.warn("LocalFileLogBuffer", excp);
+ LogLog.warn("replaceHostname()", excp);
}
if(hostName == null) {
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
index db8489c..e8b3922 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsAuditProvider.java
@@ -15,9 +15,10 @@ public class HdfsAuditProvider extends BufferedAuditProvider {
public void init(Map<String, String> properties) {
String encoding = properties.get("encoding");
- String hdfsDestinationDirectory = properties.get("destination.directroy");
- String hdfsDestinationFile = properties.get("destination.file");
- int hdfsDestinationRolloverIntervalSeconds = MiscUtil.parseInteger(properties.get("destination.rollover.interval.seconds"), 24 * 60 * 60);
+ String hdfsDestinationDirectory = properties.get("destination.directroy");
+ String hdfsDestinationFile = properties.get("destination.file");
+ int hdfsDestinationRolloverIntervalSeconds = MiscUtil.parseInteger(properties.get("destination.rollover.interval.seconds"), 24 * 60 * 60);
+ int hdfsDestinationOpenRetryIntervalSeconds = MiscUtil.parseInteger(properties.get("destination.open.retry.interval.seconds"), 60);
String localFileBufferDirectory = properties.get("local.buffer.directroy");
String localFileBufferFile = properties.get("local.buffer.file");
@@ -31,6 +32,7 @@ public class HdfsAuditProvider extends BufferedAuditProvider {
mHdfsDestination.setFile(hdfsDestinationFile);
mHdfsDestination.setEncoding(encoding);
mHdfsDestination.setRolloverIntervalSeconds(hdfsDestinationRolloverIntervalSeconds);
+ mHdfsDestination.setOpenRetryIntervalSeconds(hdfsDestinationOpenRetryIntervalSeconds);
LocalFileLogBuffer<AuditEventBase> mLocalFileBuffer = new LocalFileLogBuffer<AuditEventBase>();
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
index eeb7574..7567962 100644
--- a/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
+++ b/agents-audit/src/main/java/com/xasecure/audit/provider/hdfs/HdfsLogDestination.java
@@ -20,7 +20,6 @@ package com.xasecure.audit.provider.hdfs;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
@@ -37,16 +36,18 @@ import com.xasecure.audit.provider.LogDestination;
import com.xasecure.audit.provider.MiscUtil;
public class HdfsLogDestination<T> implements LogDestination<T> {
- private String mDirectory = null;
- private String mFile = null;
- private String mEncoding = null;
- private boolean mIsAppend = true;
- private int mRolloverIntervalSeconds = 24 * 60 * 60;
-
- private OutputStreamWriter mWriter = null;
- private String mCurrentFilename = null;
- private long mNextRolloverTime = 0;
- private boolean mIsStopInProgress = false;
+ private String mDirectory = null;
+ private String mFile = null;
+ private String mEncoding = null;
+ private boolean mIsAppend = true;
+ private int mRolloverIntervalSeconds = 24 * 60 * 60;
+ private int mOpenRetryIntervalSeconds = 60;
+
+ private OutputStreamWriter mWriter = null;
+ private String mHdfsFilename = null;
+ private long mNextRolloverTime = 0;
+ private long mLastOpenFailedTime = 0;
+ private boolean mIsStopInProgress = false;
public HdfsLogDestination() {
}
@@ -83,6 +84,14 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
this.mRolloverIntervalSeconds = rolloverIntervalSeconds;
}
+ public int getOpenRetryIntervalSeconds() {
+ return mOpenRetryIntervalSeconds;
+ }
+
+ public void setOpenRetryIntervalSeconds(int minIntervalOpenRetrySeconds) {
+ this.mOpenRetryIntervalSeconds = minIntervalOpenRetrySeconds;
+ }
+
@Override
public void start() {
LogLog.debug("==> HdfsLogDestination.start()");
@@ -127,7 +136,7 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
public boolean sendStringified(String log) {
boolean ret = false;
- rolloverIfNeeded();
+ checkDestinationFileStatus();
OutputStreamWriter writer = mWriter;
@@ -138,6 +147,8 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
ret = true;
} catch (IOException excp) {
LogLog.warn("HdfsLogDestination.sendStringified(): write failed", excp);
+
+ closeFile();
}
}
@@ -149,7 +160,7 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
closeFile();
- mCurrentFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
+ mHdfsFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile);
FSDataOutputStream ostream = null;
FileSystem fileSystem = null;
@@ -157,14 +168,14 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
Configuration conf = null;
try {
- LogLog.debug("HdfsLogDestination.openFile(): opening file " + mCurrentFilename);
+ LogLog.debug("HdfsLogDestination.openFile(): opening file " + mHdfsFilename);
- URI uri = URI.create(mCurrentFilename);
+ URI uri = URI.create(mHdfsFilename);
// TODO: mechanism to XA-HDFS plugin to disable auditing of access checks to the current HDFS file
conf = new Configuration();
- pathLogfile = new Path(mCurrentFilename);
+ pathLogfile = new Path(mHdfsFilename);
fileSystem = FileSystem.get(uri, conf);
if(fileSystem.exists(pathLogfile)) {
@@ -174,7 +185,7 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
} catch(IOException excp) {
// append may not be supported by the filesystem. rename existing file and create a new one
String fileSuffix = MiscUtil.replaceTokens("-" + MiscUtil.TOKEN_CREATE_TIME_START + "yyyyMMdd-HHmm.ss" + MiscUtil.TOKEN_CREATE_TIME_END);
- String movedFilename = appendToFilename(mCurrentFilename, fileSuffix);
+ String movedFilename = appendToFilename(mHdfsFilename, fileSuffix);
Path movedFilePath = new Path(movedFilename);
fileSystem.rename(pathLogfile, movedFilePath);
@@ -207,16 +218,18 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
mWriter = createWriter(ostream);
if(mWriter != null) {
- LogLog.debug("HdfsLogDestination.openFile(): opened file " + mCurrentFilename);
+ LogLog.debug("HdfsLogDestination.openFile(): opened file " + mHdfsFilename);
mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000));
+ mLastOpenFailedTime = 0;
} else {
- LogLog.warn("HdfsLogDestination.openFile(): failed to open file for write " + mCurrentFilename);
+ LogLog.warn("HdfsLogDestination.openFile(): failed to open file for write " + mHdfsFilename);
- mCurrentFilename = null;
+ mHdfsFilename = null;
+ mLastOpenFailedTime = System.currentTimeMillis();
}
- LogLog.debug("<== HdfsLogDestination.openFile(" + mCurrentFilename + ")");
+ LogLog.debug("<== HdfsLogDestination.openFile(" + mHdfsFilename + ")");
}
private void closeFile() {
@@ -232,7 +245,7 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
writer.close();
} catch(IOException excp) {
if(! mIsStopInProgress) { // during shutdown, the underlying FileSystem might already be closed; so don't print error details
- LogLog.warn("HdfsLogDestination: failed to close file " + mCurrentFilename, excp);
+ LogLog.warn("HdfsLogDestination: failed to close file " + mHdfsFilename, excp);
}
}
}
@@ -250,10 +263,14 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
LogLog.debug("<== HdfsLogDestination.rollover()");
}
- private void rolloverIfNeeded() {
+ private void checkDestinationFileStatus() {
long now = System.currentTimeMillis();
- if(now > mNextRolloverTime) {
+ if(mWriter == null) {
+ if(now > (mLastOpenFailedTime + (mOpenRetryIntervalSeconds * 1000))) {
+ openFile();
+ }
+ } else if(now > mNextRolloverTime) {
rollover();
}
}
@@ -266,7 +283,7 @@ public class HdfsLogDestination<T> implements LogDestination<T> {
try {
writer = new OutputStreamWriter(os, mEncoding);
} catch(UnsupportedEncodingException excp) {
- LogLog.warn("LocalFileLogBuffer: failed to create output writer.", excp);
+ LogLog.warn("HdfsLogDestination.createWriter(): failed to create output writer.", excp);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/hbase-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hbase-agent/conf/xasecure-audit.xml b/hbase-agent/conf/xasecure-audit.xml
index f33b8ba..11597b3 100644
--- a/hbase-agent/conf/xasecure-audit.xml
+++ b/hbase-agent/conf/xasecure-audit.xml
@@ -136,6 +136,11 @@
</property>
<property>
+ <name>xasecure.audit.hdfs.config.destination.open.retry.interval.seconds</name>
+ <value>60</value>
+ </property>
+
+ <property>
<name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
<value>/tmp/logs/hbase</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/hdfs-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hdfs-agent/conf/xasecure-audit.xml b/hdfs-agent/conf/xasecure-audit.xml
index 1ae6f3b..6189cf2 100644
--- a/hdfs-agent/conf/xasecure-audit.xml
+++ b/hdfs-agent/conf/xasecure-audit.xml
@@ -117,6 +117,11 @@
</property>
<property>
+ <name>xasecure.audit.hdfs.config.destination.open.retry.interval.seconds</name>
+ <value>60</value>
+ </property>
+
+ <property>
<name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
<value>/tmp/logs/hdfs</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/hive-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/hive-agent/conf/xasecure-audit.xml b/hive-agent/conf/xasecure-audit.xml
index eb951a4..73c3d74 100644
--- a/hive-agent/conf/xasecure-audit.xml
+++ b/hive-agent/conf/xasecure-audit.xml
@@ -136,6 +136,11 @@
</property>
<property>
+ <name>xasecure.audit.hdfs.config.destination.open.retry.interval.seconds</name>
+ <value>60</value>
+ </property>
+
+ <property>
<name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
<value>/tmp/logs/hive</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/knox-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/knox-agent/conf/xasecure-audit.xml b/knox-agent/conf/xasecure-audit.xml
index 987a49d..d7d1a8d 100644
--- a/knox-agent/conf/xasecure-audit.xml
+++ b/knox-agent/conf/xasecure-audit.xml
@@ -131,6 +131,11 @@
</property>
<property>
+ <name>xasecure.audit.hdfs.config.destination.open.retry.interval.seconds</name>
+ <value>60</value>
+ </property>
+
+ <property>
<name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
<value>/tmp/logs/knox</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-argus/blob/c324e8df/storm-agent/conf/xasecure-audit.xml
----------------------------------------------------------------------
diff --git a/storm-agent/conf/xasecure-audit.xml b/storm-agent/conf/xasecure-audit.xml
index ef0b27a..b103aca 100644
--- a/storm-agent/conf/xasecure-audit.xml
+++ b/storm-agent/conf/xasecure-audit.xml
@@ -136,6 +136,11 @@
</property>
<property>
+ <name>xasecure.audit.hdfs.config.destination.open.retry.interval.seconds</name>
+ <value>60</value>
+ </property>
+
+ <property>
<name>xasecure.audit.hdfs.config.local.buffer.directroy</name>
<value>/tmp/logs/storm</value>
</property>