You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2014/12/12 02:30:30 UTC
[47/51] [partial] incubator-ranger git commit: RANGER-194: Rename
packages from xasecure to apache ranger
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
new file mode 100644
index 0000000..7414a7a
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
+import javax.persistence.Persistence;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.dao.DaoManager;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
+
+
+/*
+ * NOTE:
+ * - Instances of this class are not thread-safe.
+ */
+public class DbAuditProvider extends BaseAuditProvider {
+
+ private static final Log LOG = LogFactory.getLog(DbAuditProvider.class);
+
+ public static final String AUDIT_DB_IS_ASYNC_PROP = "xasecure.audit.db.is.async";
+ public static final String AUDIT_DB_MAX_QUEUE_SIZE_PROP = "xasecure.audit.db.async.max.queue.size" ;
+ public static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.db.async.max.flush.interval.ms";
+
+ private static final String AUDIT_DB_BATCH_SIZE_PROP = "xasecure.audit.db.batch.size" ;
+ private static final String AUDIT_DB_RETRY_MIN_INTERVAL_PROP = "xasecure.audit.db.config.retry.min.interval.ms";
+ private static final String AUDIT_JPA_CONFIG_PROP_PREFIX = "xasecure.audit.jpa.";
+ private static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE = "xasecure.audit.credential.provider.file";
+ private static final String AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS = "auditDBCred";
+ private static final String AUDIT_JPA_JDBC_PASSWORD = "javax.persistence.jdbc.password";
+
+ private EntityManagerFactory entityManagerFactory;
+ private DaoManager daoManager;
+
+ private int mCommitBatchSize = 1;
+ private int mDbRetryMinIntervalMs = 60 * 1000;
+ private long mLastCommitTime = System.currentTimeMillis();
+ private ArrayList<AuditEventBase> mUncommitted = new ArrayList<AuditEventBase>();
+ private Map<String, String> mDbProperties = null;
+ private long mLastDbFailedTime = 0;
+
+ public DbAuditProvider() {
+ LOG.info("DbAuditProvider: creating..");
+ }
+
+ @Override
+ public void init(Properties props) {
+ LOG.info("DbAuditProvider.init()");
+
+ super.init(props);
+
+ mDbProperties = BaseAuditProvider.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
+ mCommitBatchSize = BaseAuditProvider.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
+ mDbRetryMinIntervalMs = BaseAuditProvider.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15 * 1000);
+
+ boolean isAsync = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
+
+ if(! isAsync) {
+ mCommitBatchSize = 1; // Batching not supported in sync mode
+ }
+
+ String jdbcPassword = getCredentialString(BaseAuditProvider.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
+
+ if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
+ mDbProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
+ }
+ }
+
+ @Override
+ public void log(AuditEventBase event) {
+ LOG.debug("DbAuditProvider.log()");
+
+ boolean isSuccess = false;
+
+ try {
+ if(preCreate(event)) {
+ DaoManager daoMgr = daoManager;
+
+ if(daoMgr != null) {
+ event.persist(daoMgr);
+
+ isSuccess = postCreate(event);
+ }
+ }
+ } catch(Exception excp) {
+ logDbError("DbAuditProvider.log(): failed", excp);
+ } finally {
+ if(! isSuccess) {
+ logFailedEvent(event);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ LOG.info("DbAuditProvider.start()");
+
+ init();
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("DbAuditProvider.stop()");
+
+ cleanUp();
+ }
+
+ @Override
+ public void waitToComplete() {
+ LOG.info("DbAuditProvider.waitToComplete()");
+ }
+
+ @Override
+ public boolean isFlushPending() {
+ return mUncommitted.size() > 0;
+ }
+
+ @Override
+ public long getLastFlushTime() {
+ return mLastCommitTime;
+ }
+
+ @Override
+ public void flush() {
+ if(mUncommitted.size() > 0) {
+ boolean isSuccess = commitTransaction();
+
+ if(! isSuccess) {
+ for(AuditEventBase evt : mUncommitted) {
+ logFailedEvent(evt);
+ }
+ }
+
+ mUncommitted.clear();
+ }
+ }
+
+ private synchronized boolean init() {
+ long now = System.currentTimeMillis();
+
+ if((now - mLastDbFailedTime) < mDbRetryMinIntervalMs) {
+ return false;
+ }
+
+ LOG.info("DbAuditProvider: init()");
+
+ try {
+ entityManagerFactory = Persistence.createEntityManagerFactory("xa_server", mDbProperties);
+
+ daoManager = new DaoManager();
+ daoManager.setEntityManagerFactory(entityManagerFactory);
+
+ daoManager.getEntityManager(); // this forces the connection to be made to DB
+ } catch(Exception excp) {
+ logDbError("DbAuditProvider: DB initalization failed", excp);
+
+ cleanUp();
+
+ return false;
+ }
+
+ return true;
+ }
+
+ private synchronized void cleanUp() {
+ LOG.info("DbAuditProvider: cleanUp()");
+
+ try {
+ if(entityManagerFactory != null && entityManagerFactory.isOpen()) {
+ entityManagerFactory.close();
+ }
+ } catch(Exception excp) {
+ LOG.error("DbAuditProvider.cleanUp(): failed", excp);
+ } finally {
+ entityManagerFactory = null;
+ daoManager = null;
+ }
+ }
+
+ private boolean isDbConnected() {
+ EntityManager em = getEntityManager();
+
+ return em != null && em.isOpen();
+ }
+
+ private EntityManager getEntityManager() {
+ DaoManager daoMgr = daoManager;
+
+ if(daoMgr != null) {
+ try {
+ return daoMgr.getEntityManager();
+ } catch(Exception excp) {
+ logDbError("DbAuditProvider.getEntityManager(): failed", excp);
+
+ cleanUp();
+ }
+ }
+
+ return null;
+ }
+
+ private void clearEntityManager() {
+ try {
+ EntityManager em = getEntityManager();
+
+ if(em != null) {
+ em.clear();
+ }
+ } catch(Exception excp) {
+ LOG.warn("DbAuditProvider.clearEntityManager(): failed", excp);
+ }
+ }
+
+ private EntityTransaction getTransaction() {
+ EntityManager em = getEntityManager();
+
+ return em != null ? em.getTransaction() : null;
+ }
+
+ private boolean isInTransaction() {
+ EntityTransaction trx = getTransaction();
+
+ return trx != null && trx.isActive();
+ }
+
+ private boolean beginTransaction() {
+ EntityTransaction trx = getTransaction();
+
+ if(trx != null && !trx.isActive()) {
+ trx.begin();
+ }
+
+ if(trx == null) {
+ LOG.warn("DbAuditProvider.beginTransaction(): trx is null");
+ }
+
+ return trx != null;
+ }
+
+ private boolean commitTransaction() {
+ boolean ret = false;
+ EntityTransaction trx = null;
+
+ try {
+ trx = getTransaction();
+
+ if(trx != null && trx.isActive()) {
+ trx.commit();
+
+ ret =true;
+ } else {
+ throw new Exception("trx is null or not active");
+ }
+ } catch(Exception excp) {
+ logDbError("DbAuditProvider.commitTransaction(): failed", excp);
+
+ cleanUp(); // so that next insert will try to init()
+ } finally {
+ mLastCommitTime = System.currentTimeMillis();
+
+ clearEntityManager();
+ }
+
+ return ret;
+ }
+
+ private boolean preCreate(AuditEventBase event) {
+ boolean ret = true;
+
+ if(!isDbConnected()) {
+ ret = init();
+ }
+
+ if(ret) {
+ if(! isInTransaction()) {
+ ret = beginTransaction();
+ }
+ }
+
+ return ret;
+ }
+
+ private boolean postCreate(AuditEventBase event) {
+ boolean ret = true;
+
+ if(mCommitBatchSize <= 1) {
+ ret = commitTransaction();
+ } else {
+ mUncommitted.add(event);
+
+ if((mUncommitted.size() % mCommitBatchSize) == 0) {
+ ret = commitTransaction();
+
+ if(! ret) {
+ for(AuditEventBase evt : mUncommitted) {
+ if(evt != event) {
+ logFailedEvent(evt);
+ }
+ }
+ }
+
+ mUncommitted.clear();
+ }
+ }
+ return ret;
+ }
+
+ private void logDbError(String msg, Exception excp) {
+ long now = System.currentTimeMillis();
+
+ if((now - mLastDbFailedTime) > mDbRetryMinIntervalMs) {
+ mLastDbFailedTime = now;
+ }
+
+ LOG.warn(msg, excp);
+ }
+
+ private String getCredentialString(String url,String alias) {
+ String ret = null;
+
+ if(url != null && alias != null) {
+ char[] cred = RangerCredentialProvider.getInstance().getCredentialString(url,alias);
+
+ if ( cred != null ) {
+ ret = new String(cred);
+ }
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java
new file mode 100644
index 0000000..7396fd0
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+public interface DebugTracer {
+ void debug(String msg);
+ void debug(String msg, Throwable excp);
+ void info(String msg);
+ void info(String msg, Throwable excp);
+ void warn(String msg);
+ void warn(String msg, Throwable excp);
+ void error(String msg);
+ void error(String msg, Throwable excp);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
new file mode 100644
index 0000000..a6e3ef1
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.util.Properties;
+
+import org.apache.ranger.audit.model.AuditEventBase;
+
+
+public class DummyAuditProvider implements AuditProvider {
+ @Override
+ public void init(Properties prop) {
+ // intentionally left empty
+ }
+
+ @Override
+ public void log(AuditEventBase event) {
+ // intentionally left empty
+ }
+
+ @Override
+ public void start() {
+ // intentionally left empty
+ }
+
+ @Override
+ public void stop() {
+ // intentionally left empty
+ }
+
+ @Override
+ public void waitToComplete() {
+ // intentionally left empty
+ }
+
+ @Override
+ public boolean isFlushPending() {
+ return false;
+ }
+
+ @Override
+ public long getLastFlushTime() {
+ return 0;
+ }
+
+ @Override
+ public void flush() {
+ // intentionally left empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
new file mode 100644
index 0000000..cdc4d53
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+
+public class LocalFileLogBuffer<T> implements LogBuffer<T> {
+ private String mDirectory = null;
+ private String mFile = null;
+ private int mFlushIntervalSeconds = 1 * 60;
+ private int mFileBufferSizeBytes = 8 * 1024;
+ private String mEncoding = null;
+ private boolean mIsAppend = true;
+ private int mRolloverIntervalSeconds = 10 * 60;
+ private String mArchiveDirectory = null;
+ private int mArchiveFileCount = 10;
+ private DebugTracer mLogger = null;
+
+ private Writer mWriter = null;
+ private String mBufferFilename = null;
+ private long mNextRolloverTime = 0;
+ private long mNextFlushTime = 0;
+ private int mFileOpenRetryIntervalInMs = 60 * 1000;
+ private long mNextFileOpenRetryTime = 0;
+
+ private DestinationDispatcherThread<T> mDispatcherThread = null;
+
+ public LocalFileLogBuffer(DebugTracer tracer) {
+ mLogger = tracer;
+ }
+
+ public String getDirectory() {
+ return mDirectory;
+ }
+
+ public void setDirectory(String directory) {
+ mDirectory = directory;
+ }
+
+ public String getFile() {
+ return mFile;
+ }
+
+ public void setFile(String file) {
+ mFile = file;
+ }
+
+ public int getFileBufferSizeBytes() {
+ return mFileBufferSizeBytes;
+ }
+
+ public void setFileBufferSizeBytes(int fileBufferSizeBytes) {
+ mFileBufferSizeBytes = fileBufferSizeBytes;
+ }
+
+ public int getFlushIntervalSeconds() {
+ return mFlushIntervalSeconds;
+ }
+
+ public void setFlushIntervalSeconds(int flushIntervalSeconds) {
+ mFlushIntervalSeconds = flushIntervalSeconds;
+ }
+
+ public String getEncoding() {
+ return mEncoding;
+ }
+
+ public void setEncoding(String encoding) {
+ mEncoding = encoding;
+ }
+
+ public boolean getIsAppend() {
+ return mIsAppend;
+ }
+
+ public void setIsAppend(boolean isAppend) {
+ mIsAppend = isAppend;
+ }
+
+ public int getRolloverIntervalSeconds() {
+ return mRolloverIntervalSeconds;
+ }
+
+ public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+ mRolloverIntervalSeconds = rolloverIntervalSeconds;
+ }
+
+ public String getArchiveDirectory() {
+ return mArchiveDirectory;
+ }
+
+ public void setArchiveDirectory(String archiveDirectory) {
+ mArchiveDirectory = archiveDirectory;
+ }
+
+ public int getArchiveFileCount() {
+ return mArchiveFileCount;
+ }
+
+ public void setArchiveFileCount(int archiveFileCount) {
+ mArchiveFileCount = archiveFileCount;
+ }
+
+
+ @Override
+ public void start(LogDestination<T> destination) {
+ mLogger.debug("==> LocalFileLogBuffer.start()");
+
+ mDispatcherThread = new DestinationDispatcherThread<T>(this, destination, mLogger);
+
+ mDispatcherThread.start();
+
+ mLogger.debug("<== LocalFileLogBuffer.start()");
+ }
+
+ @Override
+ public void stop() {
+ mLogger.debug("==> LocalFileLogBuffer.stop()");
+
+ DestinationDispatcherThread<T> dispatcherThread = mDispatcherThread;
+ mDispatcherThread = null;
+
+ if(dispatcherThread != null && dispatcherThread.isAlive()) {
+ dispatcherThread.stopThread();
+
+ try {
+ dispatcherThread.join();
+ } catch (InterruptedException e) {
+ mLogger.warn("LocalFileLogBuffer.stop(): failed in waiting for DispatcherThread", e);
+ }
+ }
+
+ closeFile();
+
+ mLogger.debug("<== LocalFileLogBuffer.stop()");
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return mWriter != null;
+ }
+
+ @Override
+ public boolean add(T log) {
+ boolean ret = false;
+
+ String msg = MiscUtil.stringify(log);
+
+ if(msg.contains(MiscUtil.LINE_SEPARATOR)) {
+ msg = msg.replace(MiscUtil.LINE_SEPARATOR, MiscUtil.ESCAPE_STR + MiscUtil.LINE_SEPARATOR);
+ }
+
+ synchronized(this) {
+ checkFileStatus();
+
+ Writer writer = mWriter;
+
+ if(writer != null) {
+ try {
+ writer.write(msg + MiscUtil.LINE_SEPARATOR);
+
+ if(mFileBufferSizeBytes == 0) {
+ writer.flush();
+ }
+
+ ret = true;
+ } catch(IOException excp) {
+ mLogger.warn("LocalFileLogBuffer.add(): write failed", excp);
+
+ closeFile();
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return mDispatcherThread == null || mDispatcherThread.isIdle();
+ }
+
+ private synchronized void openFile() {
+ mLogger.debug("==> LocalFileLogBuffer.openFile()");
+
+ long now = System.currentTimeMillis();
+
+ closeFile();
+
+ if(mNextFileOpenRetryTime <= now) {
+ try {
+ mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+ long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+ mBufferFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile, startTime);
+
+ MiscUtil.createParents(new File(mBufferFilename));
+
+ FileOutputStream ostream = null;
+ try {
+ ostream = new FileOutputStream(mBufferFilename, mIsAppend);
+ } catch(Exception excp) {
+ mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file " + mBufferFilename, excp);
+ }
+
+ if(ostream != null) {
+ mWriter = createWriter(ostream);
+
+ if(mWriter != null) {
+ mLogger.debug("LocalFileLogBuffer.openFile(): opened file " + mBufferFilename);
+
+ mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
+ } else {
+ mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mBufferFilename);
+
+ mBufferFilename = null;
+ }
+ }
+ } finally {
+ if(mWriter == null) {
+ mNextFileOpenRetryTime = now + mFileOpenRetryIntervalInMs;
+ }
+ }
+ }
+
+ mLogger.debug("<== LocalFileLogBuffer.openFile()");
+ }
+
+ private synchronized void closeFile() {
+ mLogger.debug("==> LocalFileLogBuffer.closeFile()");
+
+ Writer writer = mWriter;
+
+ mWriter = null;
+
+ if(writer != null) {
+ try {
+ writer.flush();
+ writer.close();
+ } catch(IOException excp) {
+ mLogger.warn("LocalFileLogBuffer: failed to close file " + mBufferFilename, excp);
+ }
+
+ if(mDispatcherThread != null) {
+ mDispatcherThread.addLogfile(mBufferFilename);
+ }
+ }
+
+ mLogger.debug("<== LocalFileLogBuffer.closeFile()");
+ }
+
+ private void rollover() {
+ mLogger.debug("==> LocalFileLogBuffer.rollover()");
+
+ closeFile();
+
+ openFile();
+
+ mLogger.debug("<== LocalFileLogBuffer.rollover()");
+ }
+
+ private void checkFileStatus() {
+ long now = System.currentTimeMillis();
+
+ if(now > mNextRolloverTime) {
+ rollover();
+ } else if(mWriter == null) {
+ openFile();
+ } else if(now > mNextFlushTime) {
+ try {
+ mNextFlushTime = now + (mFlushIntervalSeconds * 1000L);
+
+ mWriter.flush();
+ } catch (IOException excp) {
+ mLogger.warn("LocalFileLogBuffer: failed to flush to file " + mBufferFilename, excp);
+ }
+ }
+ }
+
+ private Writer createWriter(OutputStream os ) {
+ Writer writer = null;
+
+ if(os != null) {
+ if(mEncoding != null) {
+ try {
+ writer = new OutputStreamWriter(os, mEncoding);
+ } catch(UnsupportedEncodingException excp) {
+ mLogger.warn("LocalFileLogBuffer: failed to create output writer for file " + mBufferFilename, excp);
+ }
+ }
+
+ if(writer == null) {
+ writer = new OutputStreamWriter(os);
+ }
+
+ if(mFileBufferSizeBytes > 0 && writer != null) {
+ writer = new BufferedWriter(writer, mFileBufferSizeBytes);
+ }
+ }
+
+ return writer;
+ }
+
+ boolean isCurrentFilename(String filename) {
+ return mBufferFilename != null && filename != null && filename.equals(mBufferFilename);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("LocalFileLogBuffer {");
+ sb.append("Directory=").append(mDirectory).append("; ");
+ sb.append("File=").append(mFile).append("; ");
+ sb.append("RolloverIntervaSeconds=").append(mRolloverIntervalSeconds).append("; ");
+ sb.append("ArchiveDirectory=").append(mArchiveDirectory).append("; ");
+ sb.append("ArchiveFileCount=").append(mArchiveFileCount);
+ sb.append("}");
+
+ return sb.toString();
+ }
+
+}
+
+class DestinationDispatcherThread<T> extends Thread {
+ private TreeSet<String> mCompletedLogfiles = new TreeSet<String>();
+ private boolean mStopThread = false;
+ private LocalFileLogBuffer<T> mFileLogBuffer = null;
+ private LogDestination<T> mDestination = null;
+ private DebugTracer mLogger = null;
+
+ private String mCurrentLogfile = null;
+ private BufferedReader mReader = null;
+
+ public DestinationDispatcherThread(LocalFileLogBuffer<T> fileLogBuffer, LogDestination<T> destination, DebugTracer tracer) {
+ super(DestinationDispatcherThread.class.getSimpleName() + "-" + System.currentTimeMillis());
+
+ mLogger = tracer;
+
+ mFileLogBuffer = fileLogBuffer;
+ mDestination = destination;
+
+ setDaemon(true);
+ }
+
+ public void addLogfile(String filename) {
+ mLogger.debug("==> DestinationDispatcherThread.addLogfile(" + filename + ")");
+
+ if(filename != null) {
+ synchronized(mCompletedLogfiles) {
+ mCompletedLogfiles.add(filename);
+ mCompletedLogfiles.notify();
+ }
+ }
+
+ mLogger.debug("<== DestinationDispatcherThread.addLogfile(" + filename + ")");
+ }
+
+ public void stopThread() {
+ mStopThread = true;
+ }
+
+ public boolean isIdle() {
+ synchronized(mCompletedLogfiles) {
+ return mCompletedLogfiles.isEmpty() && mCurrentLogfile == null;
+ }
+ }
+
+ @Override
+ public void run() {
+ UserGroupInformation loginUser = null;
+
+ try {
+ loginUser = UserGroupInformation.getLoginUser();
+ } catch (IOException excp) {
+ mLogger.error("DestinationDispatcherThread.run(): failed to get login user details. Audit files will not be sent to HDFS destination", excp);
+ }
+
+ if(loginUser == null) {
+ mLogger.error("DestinationDispatcherThread.run(): failed to get login user. Audit files will not be sent to HDFS destination");
+
+ return;
+ }
+
+ loginUser.doAs(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ doRun();
+
+ return 0;
+ }
+ });
+ }
+
+ private void doRun() {
+ init();
+
+ mDestination.start();
+
+ long pollIntervalInMs = 1000L;
+
+ while(! mStopThread) {
+ synchronized(mCompletedLogfiles) {
+ while(mCompletedLogfiles.isEmpty() && !mStopThread) {
+ try {
+ mCompletedLogfiles.wait(pollIntervalInMs);
+ } catch(InterruptedException excp) {
+ throw new RuntimeException("DestinationDispatcherThread.run(): failed to wait for log file", excp);
+ }
+ }
+
+ mCurrentLogfile = mCompletedLogfiles.pollFirst();
+ }
+
+ if(mCurrentLogfile != null) {
+ sendCurrentFile();
+ }
+ }
+
+ mDestination.stop();
+ }
+
+ private void init() {
+ mLogger.debug("==> DestinationDispatcherThread.init()");
+
+ String dirName = MiscUtil.replaceTokens(mFileLogBuffer.getDirectory(), 0);
+
+ if(dirName != null) {
+ File directory = new File(dirName);
+
+ if(directory.exists() && directory.isDirectory()) {
+ File[] files = directory.listFiles();
+
+ if(files != null) {
+ for(File file : files) {
+ if(file.exists() && file.isFile() && file.canRead()) {
+ String filename = file.getAbsolutePath();
+ if(! mFileLogBuffer.isCurrentFilename(filename)) {
+ addLogfile(filename);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ mLogger.debug("<== DestinationDispatcherThread.init()");
+ }
+
+ private boolean sendCurrentFile() {
+ mLogger.debug("==> DestinationDispatcherThread.sendCurrentFile()");
+
+ boolean ret = false;
+
+ long destinationPollIntervalInMs = 1000L;
+
+ openCurrentFile();
+
+ while(!mStopThread) {
+ String log = getNextStringifiedLog();
+
+ if(log == null) { // reached end-of-file
+ ret = true;
+
+ break;
+ }
+
+ // loop until log is sent successfully
+ while(!mStopThread && !mDestination.sendStringified(log)) {
+ try {
+ Thread.sleep(destinationPollIntervalInMs);
+ } catch(InterruptedException excp) {
+ throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed while waiting for destination to be available", excp);
+ }
+ }
+ }
+
+ closeCurrentFile();
+
+ if(!mStopThread) {
+ mDestination.flush();
+ archiveCurrentFile();
+ }
+
+ mLogger.debug("<== DestinationDispatcherThread.sendCurrentFile()");
+
+ return ret;
+ }
+
+ private String getNextStringifiedLog() {
+ String log = null;
+
+ if(mReader != null) {
+ try {
+ while(true) {
+ String line = mReader.readLine();
+
+ if(line == null) { // reached end-of-file
+ break;
+ }
+
+ if(line.endsWith(MiscUtil.ESCAPE_STR)) {
+ line = line.substring(0, line.length() - MiscUtil.ESCAPE_STR.length());
+
+ if(log == null) {
+ log = line;
+ } else {
+ log += MiscUtil.LINE_SEPARATOR;
+ log += line;
+ }
+
+ continue;
+ } else {
+ if(log == null) {
+ log = line;
+ } else {
+ log += line;
+ }
+ break;
+ }
+ }
+ } catch (IOException excp) {
+ mLogger.warn("getNextStringifiedLog.getNextLog(): failed to read from file " + mCurrentLogfile, excp);
+ }
+ }
+
+ return log;
+ }
+
+ private void openCurrentFile() {
+ mLogger.debug("==> openCurrentFile(" + mCurrentLogfile + ")");
+
+ if(mCurrentLogfile != null) {
+ try {
+ FileInputStream inStr = new FileInputStream(mCurrentLogfile);
+
+ InputStreamReader strReader = createReader(inStr);
+
+ if(strReader != null) {
+ mReader = new BufferedReader(strReader);
+ }
+ } catch(FileNotFoundException excp) {
+ mLogger.warn("openNextFile(): error while opening file " + mCurrentLogfile, excp);
+ }
+ }
+
+ mLogger.debug("<== openCurrentFile(" + mCurrentLogfile + ")");
+ }
+
+ private void closeCurrentFile() {
+ mLogger.debug("==> closeCurrentFile(" + mCurrentLogfile + ")");
+
+ if(mReader != null) {
+ try {
+ mReader.close();
+ } catch(IOException excp) {
+ // ignore
+ }
+ }
+ mReader = null;
+
+ mLogger.debug("<== closeCurrentFile(" + mCurrentLogfile + ")");
+ }
+
+ private void archiveCurrentFile() {
+ if(mCurrentLogfile != null) {
+ File logFile = new File(mCurrentLogfile);
+ String archiveDirName = MiscUtil.replaceTokens(mFileLogBuffer.getArchiveDirectory(), 0);
+ String archiveFilename = archiveDirName + File.separator +logFile.getName();
+
+ try {
+ if(logFile.exists()) {
+ File archiveFile = new File(archiveFilename);
+
+ MiscUtil.createParents(archiveFile);
+
+ if(! logFile.renameTo(archiveFile)) {
+ // TODO: renameTo() does not work in all cases. in case of failure, copy the file contents to the destination and delete the file
+ mLogger.warn("archiving failed to move file: " + mCurrentLogfile + " ==> " + archiveFilename);
+ }
+
+ File archiveDir = new File(archiveDirName);
+ File[] files = archiveDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File f) {
+ return f.isFile();
+ }
+ });
+
+ int numOfFilesToDelete = files == null ? 0 : (files.length - mFileLogBuffer.getArchiveFileCount());
+
+ if(numOfFilesToDelete > 0) {
+ Arrays.sort(files, new Comparator<File>() {
+ @Override
+ public int compare(File f1, File f2) {
+ return (int)(f1.lastModified() - f2.lastModified());
+ }
+ });
+
+ for(int i = 0; i < numOfFilesToDelete; i++) {
+ if(! files[i].delete()) {
+ mLogger.warn("archiving failed to delete file: " + files[i].getAbsolutePath());
+ }
+ }
+ }
+ }
+ } catch(Exception excp) {
+ mLogger.warn("archiveCurrentFile(): faile to move " + mCurrentLogfile + " to archive location " + archiveFilename, excp);
+ }
+ }
+ mCurrentLogfile = null;
+ }
+
+ public InputStreamReader createReader(InputStream iStr) {
+ InputStreamReader reader = null;
+
+ if(iStr != null) {
+ String encoding = mFileLogBuffer.getEncoding();
+
+ if(encoding != null) {
+ try {
+ reader = new InputStreamReader(iStr, encoding);
+ } catch(UnsupportedEncodingException excp) {
+ mLogger.warn("createReader(): failed to create input reader.", excp);
+ }
+ }
+
+ if(reader == null) {
+ reader = new InputStreamReader(iStr);
+ }
+ }
+
+ return reader;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("DestinationDispatcherThread {");
+ sb.append("ThreadName=").append(this.getName()).append("; ");
+ sb.append("CompletedLogfiles.size()=").append(mCompletedLogfiles.size()).append("; ");
+ sb.append("StopThread=").append(mStopThread).append("; ");
+ sb.append("CurrentLogfile=").append(mCurrentLogfile);
+ sb.append("}");
+
+ return sb.toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
new file mode 100644
index 0000000..0d0809a
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+
+public class Log4jAuditProvider extends BaseAuditProvider {
+
+ private static final Log LOG = LogFactory.getLog(Log4jAuditProvider.class);
+ private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class.getName());
+
+ public static final String AUDIT_LOG4J_IS_ASYNC_PROP = "xasecure.audit.log4j.is.async";
+ public static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP = "xasecure.audit.log4j.async.max.queue.size" ;
+ public static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
+
+ private Gson mGsonBuilder = null;
+
+ public Log4jAuditProvider() {
+ LOG.info("Log4jAuditProvider: creating..");
+ }
+
+ @Override
+ public void init(Properties props) {
+ LOG.info("Log4jAuditProvider.init()");
+
+ super.init(props);
+
+ try {
+ mGsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+ } catch(Throwable excp) {
+ LOG.warn("Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json", excp);
+ }
+ }
+
+ @Override
+ public void log(AuditEventBase event) {
+ if(! AUDITLOG.isInfoEnabled())
+ return;
+
+ if(event != null) {
+ String eventStr = mGsonBuilder != null ? mGsonBuilder.toJson(event) : event.toString();
+
+ AUDITLOG.info(eventStr);
+ }
+ }
+
+ @Override
+ public void start() {
+ // intentionally left empty
+ }
+
+ @Override
+ public void stop() {
+ // intentionally left empty
+ }
+
+ @Override
+ public void waitToComplete() {
+ // intentionally left empty
+ }
+
+ @Override
+ public boolean isFlushPending() {
+ return false;
+ }
+
+ @Override
+ public long getLastFlushTime() {
+ return 0;
+ }
+
+ @Override
+ public void flush() {
+ // intentionally left empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java
new file mode 100644
index 0000000..bdb1a47
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import org.apache.commons.logging.Log;
+
+public class Log4jTracer implements DebugTracer {
+ private Log mLogger = null;
+
+ public Log4jTracer(Log logger) {
+ mLogger = logger;
+ }
+
+ public void debug(String msg) {
+ mLogger.debug(msg);
+ }
+
+ public void debug(String msg, Throwable excp) {
+ mLogger.debug(msg, excp);
+ }
+
+ public void info(String msg) {
+ mLogger.info(msg);
+ }
+
+ public void info(String msg, Throwable excp) {
+ mLogger.info(msg, excp);
+ }
+
+ public void warn(String msg) {
+ mLogger.warn(msg);
+ }
+
+ public void warn(String msg, Throwable excp) {
+ mLogger.warn(msg, excp);
+ }
+
+ public void error(String msg) {
+ mLogger.error(msg);
+ }
+
+ public void error(String msg, Throwable excp) {
+ mLogger.error(msg, excp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java
new file mode 100644
index 0000000..aebef1b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+
+public interface LogBuffer<T> {
+ public void start(LogDestination<T> destination);
+
+ public void stop();
+
+ boolean isAvailable();
+
+ public boolean isEmpty();
+
+ public boolean add(T log);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
new file mode 100644
index 0000000..44e94ad
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+
+public interface LogDestination<T> {
+ public void start();
+
+ public void stop();
+
+ boolean isAvailable();
+
+ public boolean send(T log);
+
+ public boolean sendStringified(String log);
+
+ public boolean flush();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
new file mode 100644
index 0000000..17230b2
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.rmi.dgc.VMID;
+import java.text.SimpleDateFormat;
+import java.util.UUID;
+
+import org.apache.log4j.helpers.LogLog;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class MiscUtil {
+ public static final String TOKEN_START = "%";
+ public static final String TOKEN_END = "%";
+ public static final String TOKEN_HOSTNAME = "hostname";
+ public static final String TOKEN_APP_TYPE = "app-type";
+ public static final String TOKEN_JVM_INSTANCE = "jvm-instance";
+ public static final String TOKEN_TIME = "time:";
+ public static final String TOKEN_PROPERTY = "property:";
+ public static final String TOKEN_ENV = "env:";
+ public static final String ESCAPE_STR = "\\";
+
+ static VMID sJvmID = new VMID();
+
+ public static String LINE_SEPARATOR = System.getProperty("line.separator");
+
+ private static Gson sGsonBuilder = null;
+ private static String sApplicationType = null;
+
+ static {
+ try {
+ sGsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS").create();
+ } catch(Throwable excp) {
+ LogLog.warn("failed to create GsonBuilder object. stringigy() will return obj.toString(), instead of Json", excp);
+ }
+ }
+
+ public static String replaceTokens(String str, long time) {
+ if(str == null) {
+ return str;
+ }
+
+ if(time <= 0) {
+ time = System.currentTimeMillis();
+ }
+
+ for(int startPos = 0; startPos < str.length(); ) {
+ int tagStartPos = str.indexOf(TOKEN_START, startPos);
+
+ if(tagStartPos == -1) {
+ break;
+ }
+
+ int tagEndPos = str.indexOf(TOKEN_END, tagStartPos + TOKEN_START.length());
+
+ if(tagEndPos == -1) {
+ break;
+ }
+
+ String tag = str.substring(tagStartPos, tagEndPos+TOKEN_END.length());
+ String token = tag.substring(TOKEN_START.length(), tag.lastIndexOf(TOKEN_END));
+ String val = "";
+
+ if(token != null) {
+ if(token.equals(TOKEN_HOSTNAME)) {
+ val = getHostname();
+ } else if(token.equals(TOKEN_APP_TYPE)) {
+ val = getApplicationType();
+ } else if(token.equals(TOKEN_JVM_INSTANCE)) {
+ val = getJvmInstanceId();
+ } else if(token.startsWith(TOKEN_PROPERTY)) {
+ String propertyName = token.substring(TOKEN_PROPERTY.length());
+
+ val = getSystemProperty(propertyName);
+ } else if(token.startsWith(TOKEN_ENV)) {
+ String envName = token.substring(TOKEN_ENV.length());
+
+ val = getEnv(envName);
+ } else if(token.startsWith(TOKEN_TIME)) {
+ String dtFormat = token.substring(TOKEN_TIME.length());
+
+ val = getFormattedTime(time, dtFormat);
+ }
+ }
+
+ if(val == null) {
+ val = "";
+ }
+
+ str = str.substring(0, tagStartPos) + val + str.substring(tagEndPos + TOKEN_END.length());
+ startPos = tagStartPos + val.length();
+ }
+
+ return str;
+ }
+
+ public static String getHostname() {
+ String ret = null;
+
+ try {
+ ret = InetAddress.getLocalHost().getHostName();
+ } catch (Exception excp) {
+ LogLog.warn("getHostname()", excp);
+ }
+
+ return ret;
+ }
+
+ public static void setApplicationType(String applicationType) {
+ sApplicationType = applicationType;
+ }
+
+ public static String getApplicationType() {
+ return sApplicationType;
+ }
+
+ public static String getJvmInstanceId() {
+ String ret = Integer.toString(Math.abs(sJvmID.toString().hashCode()));
+
+ return ret;
+ }
+
+ public static String getSystemProperty(String propertyName) {
+ String ret = null;
+
+ try {
+ ret = propertyName != null ? System.getProperty(propertyName) : null;
+ } catch (Exception excp) {
+ LogLog.warn("getSystemProperty(" + propertyName + ") failed", excp);
+ }
+
+ return ret;
+ }
+
+ public static String getEnv(String envName) {
+ String ret = null;
+
+ try {
+ ret = envName != null ? System.getenv(envName) : null;
+ } catch (Exception excp) {
+ LogLog.warn("getenv(" + envName + ") failed", excp);
+ }
+
+ return ret;
+ }
+
+ public static String getFormattedTime(long time, String format) {
+ String ret = null;
+
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat(format);
+
+ ret = sdf.format(time);
+ } catch (Exception excp) {
+ LogLog.warn("SimpleDateFormat.format() failed: " + format, excp);
+ }
+
+ return ret;
+ }
+
+ public static void createParents(File file) {
+ if(file != null) {
+ String parentName = file.getParent();
+
+ if (parentName != null) {
+ File parentDir = new File(parentName);
+
+ if(!parentDir.exists()) {
+ if(! parentDir.mkdirs()) {
+ LogLog.warn("createParents(): failed to create " + parentDir.getAbsolutePath());
+ }
+ }
+ }
+ }
+ }
+
+ public static long getNextRolloverTime(long lastRolloverTime, long interval) {
+ long now = System.currentTimeMillis() / 1000 * 1000; // round to second
+
+ if(lastRolloverTime <= 0) {
+ // should this be set to the next multiple-of-the-interval from start of the day?
+ return now + interval;
+ } else if(lastRolloverTime <= now) {
+ long nextRolloverTime = now + interval;
+
+ // keep it at 'interval' boundary
+ long trimInterval = (nextRolloverTime - lastRolloverTime) % interval;
+
+ return nextRolloverTime - trimInterval;
+ } else {
+ return lastRolloverTime;
+ }
+ }
+
+ public static long getRolloverStartTime(long nextRolloverTime, long interval) {
+ return (nextRolloverTime <= interval) ? System.currentTimeMillis() : nextRolloverTime - interval;
+ }
+
+ public static int parseInteger(String str, int defValue) {
+ int ret = defValue;
+
+ if(str != null) {
+ try {
+ ret = Integer.parseInt(str);
+ } catch(Exception excp) {
+ // ignore
+ }
+ }
+
+ return ret;
+ }
+
+ public static String generateUniqueId() {
+ return UUID.randomUUID().toString();
+ }
+
+ public static <T> String stringify(T log) {
+ String ret = null;
+
+ if(log != null) {
+ if(log instanceof String) {
+ ret = (String)log;
+ } else if(MiscUtil.sGsonBuilder != null) {
+ ret = MiscUtil.sGsonBuilder.toJson(log);
+ } else {
+ ret = log.toString();
+ }
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
new file mode 100644
index 0000000..0c2bca6
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.HBaseAuditEvent;
+import org.apache.ranger.audit.model.HdfsAuditEvent;
+import org.apache.ranger.audit.model.HiveAuditEvent;
+import org.apache.ranger.audit.model.KnoxAuditEvent;
+import org.apache.ranger.audit.model.StormAuditEvent;
+
+
+public class MultiDestAuditProvider extends BaseAuditProvider {
+
+ private static final Log LOG = LogFactory.getLog(MultiDestAuditProvider.class);
+
+ protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>();
+
+
+ public MultiDestAuditProvider() {
+ LOG.info("MultiDestAuditProvider: creating..");
+ }
+
+ public MultiDestAuditProvider(AuditProvider provider) {
+ addAuditProvider(provider);
+ }
+
+ @Override
+ public void init(Properties props) {
+ LOG.info("MultiDestAuditProvider.init()");
+
+ super.init(props);
+
+ for(AuditProvider provider : mProviders) {
+ try {
+ provider.init(props);
+ } catch(Throwable excp) {
+ LOG.info("MultiDestAuditProvider.init(): failed" + provider.getClass().getCanonicalName() + ")");
+ }
+ }
+ }
+
+ public void addAuditProvider(AuditProvider provider) {
+ if(provider != null) {
+ LOG.info("MultiDestAuditProvider.addAuditProvider(providerType=" + provider.getClass().getCanonicalName() + ")");
+
+ mProviders.add(provider);
+ }
+ }
+
+ public void addAuditProviders(List<AuditProvider> providers) {
+ if(providers != null) {
+ for(AuditProvider provider : providers) {
+ addAuditProvider(provider);
+ }
+ }
+ }
+
+ @Override
+ public void log(AuditEventBase event) {
+ for(AuditProvider provider : mProviders) {
+ try {
+ provider.log(event);
+ } catch(Throwable excp) {
+ logFailedEvent(event, excp);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ for(AuditProvider provider : mProviders) {
+ try {
+ provider.start();
+ } catch(Throwable excp) {
+ LOG.error("AsyncAuditProvider.start(): failed for provider { " + provider.getClass().getName() + " }", excp);
+ }
+ }
+ }
+
+ @Override
+ public void stop() {
+ for(AuditProvider provider : mProviders) {
+ try {
+ provider.stop();
+ } catch(Throwable excp) {
+ LOG.error("AsyncAuditProvider.stop(): failed for provider { " + provider.getClass().getName() + " }", excp);
+ }
+ }
+ }
+
+ @Override
+ public void waitToComplete() {
+ for(AuditProvider provider : mProviders) {
+ try {
+ provider.waitToComplete();
+ } catch(Throwable excp) {
+ LOG.error("AsyncAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName() + " }", excp);
+ }
+ }
+ }
+
+ @Override
+ public boolean isFlushPending() {
+ for(AuditProvider provider : mProviders) {
+ if(provider.isFlushPending()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public long getLastFlushTime() {
+ long lastFlushTime = 0;
+ for(AuditProvider provider : mProviders) {
+ long flushTime = provider.getLastFlushTime();
+
+ if(flushTime != 0) {
+ if(lastFlushTime == 0 || lastFlushTime > flushTime) {
+ lastFlushTime = flushTime;
+ }
+ }
+ }
+
+ return lastFlushTime;
+ }
+
+ @Override
+ public void flush() {
+ for(AuditProvider provider : mProviders) {
+ try {
+ provider.flush();
+ } catch(Throwable excp) {
+ LOG.error("AsyncAuditProvider.flush(): failed for provider { " + provider.getClass().getName() + " }", excp);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
new file mode 100644
index 0000000..620951c
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider.hdfs;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.BufferedAuditProvider;
+import org.apache.ranger.audit.provider.DebugTracer;
+import org.apache.ranger.audit.provider.LocalFileLogBuffer;
+import org.apache.ranger.audit.provider.Log4jTracer;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class HdfsAuditProvider extends BufferedAuditProvider {
+ private static final Log LOG = LogFactory.getLog(HdfsAuditProvider.class);
+
+ public static final String AUDIT_HDFS_IS_ASYNC_PROP = "xasecure.audit.hdfs.is.async";
+ public static final String AUDIT_HDFS_MAX_QUEUE_SIZE_PROP = "xasecure.audit.hdfs.async.max.queue.size" ;
+ public static final String AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.hdfs.async.max.flush.interval.ms";
+
+ public HdfsAuditProvider() {
+ }
+
+ public void init(Properties props) {
+ LOG.info("HdfsAuditProvider.init()");
+
+ super.init(props);
+
+ Map<String, String> hdfsProps = BaseAuditProvider.getPropertiesWithPrefix(props, "xasecure.audit.hdfs.config.");
+
+ String encoding = hdfsProps.get("encoding");
+
+ String hdfsDestinationDirectory = hdfsProps.get("destination.directory");
+ String hdfsDestinationFile = hdfsProps.get("destination.file");
+ int hdfsDestinationFlushIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.flush.interval.seconds"), 15 * 60);
+ int hdfsDestinationRolloverIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.rollover.interval.seconds"), 24 * 60 * 60);
+ int hdfsDestinationOpenRetryIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.open.retry.interval.seconds"), 60);
+
+ String localFileBufferDirectory = hdfsProps.get("local.buffer.directory");
+ String localFileBufferFile = hdfsProps.get("local.buffer.file");
+ int localFileBufferFlushIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("local.buffer.flush.interval.seconds"), 1 * 60);
+ int localFileBufferFileBufferSizeBytes = MiscUtil.parseInteger(hdfsProps.get("local.buffer.file.buffer.size.bytes"), 8 * 1024);
+ int localFileBufferRolloverIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("local.buffer.rollover.interval.seconds"), 10 * 60);
+ String localFileBufferArchiveDirectory = hdfsProps.get("local.archive.directory");
+ int localFileBufferArchiveFileCount = MiscUtil.parseInteger(hdfsProps.get("local.archive.max.file.count"), 10);
+
+ DebugTracer tracer = new Log4jTracer(LOG);
+
+ HdfsLogDestination<AuditEventBase> mHdfsDestination = new HdfsLogDestination<AuditEventBase>(tracer);
+
+ mHdfsDestination.setDirectory(hdfsDestinationDirectory);
+ mHdfsDestination.setFile(hdfsDestinationFile);
+ mHdfsDestination.setFlushIntervalSeconds(hdfsDestinationFlushIntervalSeconds);
+ mHdfsDestination.setEncoding(encoding);
+ mHdfsDestination.setRolloverIntervalSeconds(hdfsDestinationRolloverIntervalSeconds);
+ mHdfsDestination.setOpenRetryIntervalSeconds(hdfsDestinationOpenRetryIntervalSeconds);
+
+ LocalFileLogBuffer<AuditEventBase> mLocalFileBuffer = new LocalFileLogBuffer<AuditEventBase>(tracer);
+
+ mLocalFileBuffer.setDirectory(localFileBufferDirectory);
+ mLocalFileBuffer.setFile(localFileBufferFile);
+ mLocalFileBuffer.setFlushIntervalSeconds(localFileBufferFlushIntervalSeconds);
+ mLocalFileBuffer.setFileBufferSizeBytes(localFileBufferFileBufferSizeBytes);
+ mLocalFileBuffer.setEncoding(encoding);
+ mLocalFileBuffer.setRolloverIntervalSeconds(localFileBufferRolloverIntervalSeconds);
+ mLocalFileBuffer.setArchiveDirectory(localFileBufferArchiveDirectory);
+ mLocalFileBuffer.setArchiveFileCount(localFileBufferArchiveFileCount);
+
+ setBufferAndDestination(mLocalFileBuffer, mHdfsDestination);
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
new file mode 100644
index 0000000..6b5cb4b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider.hdfs;
+
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ranger.audit.provider.DebugTracer;
+import org.apache.ranger.audit.provider.LogDestination;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class HdfsLogDestination<T> implements LogDestination<T> {
+ public final static String EXCP_MSG_FILESYSTEM_CLOSED = "Filesystem closed";
+
+ private String mDirectory = null;
+ private String mFile = null;
+ private int mFlushIntervalSeconds = 1 * 60;
+ private String mEncoding = null;
+ private boolean mIsAppend = false;
+ private int mRolloverIntervalSeconds = 24 * 60 * 60;
+ private int mOpenRetryIntervalSeconds = 60;
+ private DebugTracer mLogger = null;
+
+ private FSDataOutputStream mFsDataOutStream = null;
+ private OutputStreamWriter mWriter = null;
+ private String mHdfsFilename = null;
+ private long mNextRolloverTime = 0;
+ private long mNextFlushTime = 0;
+ private long mLastOpenFailedTime = 0;
+ private boolean mIsStopInProgress = false;
+
+ public HdfsLogDestination(DebugTracer tracer) {
+ mLogger = tracer;
+ }
+
+ public String getDirectory() {
+ return mDirectory;
+ }
+
+ public void setDirectory(String directory) {
+ this.mDirectory = directory;
+ }
+
+ public String getFile() {
+ return mFile;
+ }
+
+ public void setFile(String file) {
+ this.mFile = file;
+ }
+
+ public int getFlushIntervalSeconds() {
+ return mFlushIntervalSeconds;
+ }
+
+ public void setFlushIntervalSeconds(int flushIntervalSeconds) {
+ mFlushIntervalSeconds = flushIntervalSeconds;
+ }
+
+ public String getEncoding() {
+ return mEncoding;
+ }
+
+ public void setEncoding(String encoding) {
+ mEncoding = encoding;
+ }
+
+ public int getRolloverIntervalSeconds() {
+ return mRolloverIntervalSeconds;
+ }
+
+ public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+ this.mRolloverIntervalSeconds = rolloverIntervalSeconds;
+ }
+
+ public int getOpenRetryIntervalSeconds() {
+ return mOpenRetryIntervalSeconds;
+ }
+
+ public void setOpenRetryIntervalSeconds(int minIntervalOpenRetrySeconds) {
+ this.mOpenRetryIntervalSeconds = minIntervalOpenRetrySeconds;
+ }
+
+ @Override
+ public void start() {
+ mLogger.debug("==> HdfsLogDestination.start()");
+
+ openFile();
+
+ mLogger.debug("<== HdfsLogDestination.start()");
+ }
+
+ @Override
+ public void stop() {
+ mLogger.debug("==> HdfsLogDestination.stop()");
+
+ mIsStopInProgress = true;
+
+ closeFile();
+
+ mIsStopInProgress = false;
+
+ mLogger.debug("<== HdfsLogDestination.stop()");
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return mWriter != null;
+ }
+
+ @Override
+ public boolean send(T log) {
+ boolean ret = false;
+
+ if(log != null) {
+ String msg = log.toString();
+
+ ret = sendStringified(msg);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean sendStringified(String log) {
+ boolean ret = false;
+
+ checkFileStatus();
+
+ OutputStreamWriter writer = mWriter;
+
+ if(writer != null) {
+ try {
+ writer.write(log + MiscUtil.LINE_SEPARATOR);
+
+ ret = true;
+ } catch (IOException excp) {
+ mLogger.warn("HdfsLogDestination.sendStringified(): write failed", excp);
+
+ closeFile();
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean flush() {
+ mLogger.debug("==> HdfsLogDestination.flush()");
+
+ boolean ret = false;
+
+ OutputStreamWriter writer = mWriter;
+
+ if(writer != null) {
+ try {
+ writer.flush();
+
+ ret = true;
+ } catch (IOException excp) {
+ logException("HdfsLogDestination: flush() failed", excp);
+ }
+ }
+
+ FSDataOutputStream ostream = mFsDataOutStream;
+
+ if(ostream != null) {
+ try {
+ ostream.hflush();
+
+ ret = true;
+ } catch (IOException excp) {
+ logException("HdfsLogDestination: hflush() failed", excp);
+ }
+ }
+
+ if(ret) {
+ mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
+ }
+
+ mLogger.debug("<== HdfsLogDestination.flush()");
+
+ return ret;
+ }
+
+ private void openFile() {
+ mLogger.debug("==> HdfsLogDestination.openFile()");
+
+ closeFile();
+
+ mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+ long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+ mHdfsFilename = MiscUtil.replaceTokens(mDirectory + org.apache.hadoop.fs.Path.SEPARATOR + mFile, startTime);
+
+ FSDataOutputStream ostream = null;
+ FileSystem fileSystem = null;
+ Path pathLogfile = null;
+ Configuration conf = null;
+ boolean bOverwrite = false;
+
+ try {
+ mLogger.debug("HdfsLogDestination.openFile(): opening file " + mHdfsFilename);
+
+ URI uri = URI.create(mHdfsFilename);
+
+ // TODO: mechanism to XA-HDFS plugin to disable auditing of access checks to the current HDFS file
+
+ conf = new Configuration();
+ pathLogfile = new Path(mHdfsFilename);
+ fileSystem = FileSystem.get(uri, conf);
+
+ try {
+ if(fileSystem.exists(pathLogfile)) { // file already exists. either append to the file or write to a new file
+ if(mIsAppend) {
+ mLogger.info("HdfsLogDestination.openFile(): opening file for append " + mHdfsFilename);
+
+ ostream = fileSystem.append(pathLogfile);
+ } else {
+ mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem);
+ pathLogfile = new Path(mHdfsFilename);
+ }
+ }
+
+ // if file does not exist or if mIsAppend==false, create the file
+ if(ostream == null) {
+ mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename);
+
+ createParents(pathLogfile, fileSystem);
+ ostream = fileSystem.create(pathLogfile, bOverwrite);
+ }
+ } catch(IOException excp) {
+ // append may not be supported by the filesystem; or the file might already be open by another application. Try a different filename
+ String failedFilename = mHdfsFilename;
+
+ mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem);
+ pathLogfile = new Path(mHdfsFilename);
+
+ mLogger.info("HdfsLogDestination.openFile(): failed in opening file " + failedFilename + ". Will try opening " + mHdfsFilename);
+ }
+
+ if(ostream == null){
+ mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename);
+
+ createParents(pathLogfile, fileSystem);
+ ostream = fileSystem.create(pathLogfile, bOverwrite);
+ }
+ } catch(Throwable ex) {
+ mLogger.warn("HdfsLogDestination.openFile() failed", ex);
+ } finally {
+ // TODO: unset the property set above to exclude auditing of logfile opening
+ // System.setProperty(hdfsCurrentFilenameProperty, null);
+ }
+
+ mWriter = createWriter(ostream);
+
+ if(mWriter != null) {
+ mLogger.debug("HdfsLogDestination.openFile(): opened file " + mHdfsFilename);
+
+ mFsDataOutStream = ostream;
+ mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
+ mLastOpenFailedTime = 0;
+ } else {
+ mLogger.warn("HdfsLogDestination.openFile(): failed to open file for write " + mHdfsFilename);
+
+ mHdfsFilename = null;
+ mLastOpenFailedTime = System.currentTimeMillis();
+ }
+
+ mLogger.debug("<== HdfsLogDestination.openFile(" + mHdfsFilename + ")");
+ }
+
+ private void closeFile() {
+ mLogger.debug("==> HdfsLogDestination.closeFile()");
+
+ flush();
+
+ OutputStreamWriter writer = mWriter;
+
+ mWriter = null;
+ mFsDataOutStream = null;
+
+ if(writer != null) {
+ try {
+ mLogger.info("HdfsLogDestination.closeFile(): closing file " + mHdfsFilename);
+
+ writer.close();
+ } catch(IOException excp) {
+ logException("HdfsLogDestination: failed to close file " + mHdfsFilename, excp);
+ }
+ }
+
+ mLogger.debug("<== HdfsLogDestination.closeFile()");
+ }
+
+ private void rollover() {
+ mLogger.debug("==> HdfsLogDestination.rollover()");
+
+ closeFile();
+
+ openFile();
+
+ mLogger.debug("<== HdfsLogDestination.rollover()");
+ }
+
+ private void checkFileStatus() {
+ long now = System.currentTimeMillis();
+
+ if(mWriter == null) {
+ if(now > (mLastOpenFailedTime + (mOpenRetryIntervalSeconds * 1000L))) {
+ openFile();
+ }
+ } else if(now > mNextRolloverTime) {
+ rollover();
+ } else if(now > mNextFlushTime) {
+ flush();
+ }
+ }
+
+ private OutputStreamWriter createWriter(OutputStream os ) {
+ OutputStreamWriter writer = null;
+
+ if(os != null) {
+ if(mEncoding != null) {
+ try {
+ writer = new OutputStreamWriter(os, mEncoding);
+ } catch(UnsupportedEncodingException excp) {
+ mLogger.warn("HdfsLogDestination.createWriter(): failed to create output writer.", excp);
+ }
+ }
+
+ if(writer == null) {
+ writer = new OutputStreamWriter(os);
+ }
+ }
+
+ return writer;
+ }
+
+ private void createParents(Path pathLogfile, FileSystem fileSystem) {
+ try {
+ Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;
+
+ if(parentPath != null && fileSystem != null && !fileSystem.exists(parentPath)) {
+ fileSystem.mkdirs(parentPath);
+ }
+ } catch (IOException e) {
+ logException("HdfsLogDestination.createParents() failed", e);
+ } catch (Throwable e) {
+ mLogger.warn("HdfsLogDestination.createParents() failed", e);
+ }
+ }
+
+ private String getNewFilename(String fileName, FileSystem fileSystem) {
+ if(fileName == null) {
+ return "";
+ }
+
+ for(int i = 1; ; i++) {
+ String ret = fileName;
+
+ String strToAppend = "-" + Integer.toString(i);
+
+ int extnPos = ret.lastIndexOf(".");
+
+ if(extnPos < 0) {
+ ret += strToAppend;
+ } else {
+ String extn = ret.substring(extnPos);
+
+ ret = ret.substring(0, extnPos) + strToAppend + extn;
+ }
+
+ if(fileSystem != null && fileExists(ret, fileSystem)) {
+ continue;
+ } else {
+ return ret;
+ }
+ }
+ }
+
+ private boolean fileExists(String fileName, FileSystem fileSystem) {
+ boolean ret = false;
+
+ if(fileName != null && fileSystem != null) {
+ Path path = new Path(fileName);
+
+ try {
+ ret = fileSystem.exists(path);
+ } catch(IOException excp) {
+ // ignore
+ }
+ }
+
+ return ret;
+ }
+
+ private void logException(String msg, IOException excp) {
+ // during shutdown, the underlying FileSystem might already be closed; so don't print error details
+
+ if(mIsStopInProgress) {
+ return;
+ }
+
+ String excpMsgToExclude = EXCP_MSG_FILESYSTEM_CLOSED;;
+ String excpMsg = excp != null ? excp.getMessage() : null;
+ boolean excpExcludeLogging = (excpMsg != null && excpMsg.contains(excpMsgToExclude));
+
+ if(! excpExcludeLogging) {
+ mLogger.warn(msg, excp);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("HdfsLogDestination {");
+ sb.append("Directory=").append(mDirectory).append("; ");
+ sb.append("File=").append(mFile).append("; ");
+ sb.append("RolloverIntervalSeconds=").append(mRolloverIntervalSeconds);
+ sb.append("}");
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
new file mode 100644
index 0000000..34b8e4b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.ranger.audit.test;
+import org.apache.commons.logging.Log;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.HBaseAuditEvent;
+import org.apache.ranger.audit.model.HdfsAuditEvent;
+import org.apache.ranger.audit.model.HiveAuditEvent;
+import org.apache.ranger.audit.model.KnoxAuditEvent;
+import org.apache.ranger.audit.model.StormAuditEvent;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditProviderFactory;
+import org.apache.ranger.audit.provider.AuditProviderFactory.ApplicationType;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Date;
+import java.util.Properties;
+
+public class TestEvents {
+
+ private static final Log LOG = LogFactory.getLog(TestEvents.class);
+
+ public static void main(String[] args) {
+ DOMConfigurator.configure("log4j.xml");
+
+ LOG.info("==> TestEvents.main()");
+
+ try {
+ Properties auditProperties = new Properties();
+
+ String AUDIT_PROPERTIES_FILE = "xasecure-audit.properties";
+
+ File propFile = new File(AUDIT_PROPERTIES_FILE);
+
+ if(propFile.exists()) {
+ LOG.info("Loading Audit properties file" + AUDIT_PROPERTIES_FILE);
+
+ auditProperties.load(new FileInputStream(propFile));
+ } else {
+ LOG.info("Audit properties file missing: " + AUDIT_PROPERTIES_FILE);
+
+ auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.url", "jdbc:mysql://localhost:3306/xa_db");
+ auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.user", "xaaudit");
+ auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.password", "xaaudit");
+ auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.driver", "com.mysql.jdbc.Driver");
+
+ auditProperties.setProperty("xasecure.audit.is.enabled", "true");
+ auditProperties.setProperty("xasecure.audit.log4j.is.enabled", "false");
+ auditProperties.setProperty("xasecure.audit.log4j.is.async", "false");
+ auditProperties.setProperty("xasecure.audit.log4j.async.max.queue.size", "100000");
+ auditProperties.setProperty("xasecure.audit.log4j.async.max.flush.interval.ms", "30000");
+ auditProperties.setProperty("xasecure.audit.db.is.enabled", "true");
+ auditProperties.setProperty("xasecure.audit.db.is.async", "true");
+ auditProperties.setProperty("xasecure.audit.db.async.max.queue.size", "100000");
+ auditProperties.setProperty("xasecure.audit.db.async.max.flush.interval.ms", "30000");
+ auditProperties.setProperty("xasecure.audit.db.batch.size", "100");
+ }
+
+ AuditProviderFactory.getInstance().init(auditProperties, ApplicationType.Hdfs);
+
+ AuditProvider provider = AuditProviderFactory.getAuditProvider();
+
+ LOG.info("provider=" + provider.toString());
+
+ String strEventCount = args.length > 0 ? args[0] : auditProperties.getProperty("xasecure.audit.test.event.count");
+ String strEventPauseTimeInMs = args.length > 1 ? args[1] : auditProperties.getProperty("xasecure.audit.test.event.pause.time.ms");
+ String strSleepTimeBeforeExit = args.length > 2 ? args[2] : auditProperties.getProperty("xasecure.audit.test.sleep.time.before.exit.seconds");
+
+ int eventCount = (strEventCount == null) ? 1024 : Integer.parseInt(strEventCount);
+ int eventPauseTime = (strEventPauseTimeInMs == null) ? 0 : Integer.parseInt(strEventPauseTimeInMs);
+ int sleepTimeBeforeExit = ((strSleepTimeBeforeExit == null) ? 0 : Integer.parseInt(strSleepTimeBeforeExit)) * 1000;
+
+ for(int i = 0; i < eventCount; i++) {
+ AuditEventBase event = getTestEvent(i);
+
+ LOG.info("==> TestEvents.main(" + (i+1) + "): adding " + event.getClass().getName());
+ provider.log(event);
+
+ if(eventPauseTime > 0) {
+ Thread.sleep(eventPauseTime);
+ }
+ }
+
+ provider.waitToComplete();
+
+ // incase of HdfsAuditProvider, logs are saved to local file system which gets sent to HDFS asynchronusly in a separate thread.
+ // So, at this point it is possible that few local log files haven't made to HDFS.
+ if(sleepTimeBeforeExit > 0) {
+ LOG.info("waiting for " + sleepTimeBeforeExit + "ms before exiting..");
+
+ try {
+ Thread.sleep(sleepTimeBeforeExit);
+ } catch(Exception excp) {
+ LOG.info("error while waiting before exiting..");
+ }
+ }
+
+ provider.stop();
+ } catch(Exception excp) {
+ LOG.info(excp.getLocalizedMessage());
+ excp.printStackTrace();
+ }
+
+ LOG.info("<== TestEvents.main()");
+ }
+
+ private static AuditEventBase getTestEvent(int idx) {
+ AuditEventBase event = null;
+
+ switch(idx % 5) {
+ case 0:
+ event = new HdfsAuditEvent();
+ break;
+ case 1:
+ event = new HBaseAuditEvent();
+ break;
+ case 2:
+ event = new HiveAuditEvent();
+ break;
+ case 3:
+ event = new KnoxAuditEvent();
+ break;
+ case 4:
+ event = new StormAuditEvent();
+ break;
+ }
+ event.setEventTime(new Date());
+ event.setResultReason(Integer.toString(idx));
+
+ return event;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/resources/META-INF/persistence.xml b/agents-audit/src/main/resources/META-INF/persistence.xml
index 0b87ab9..21b8f06 100644
--- a/agents-audit/src/main/resources/META-INF/persistence.xml
+++ b/agents-audit/src/main/resources/META-INF/persistence.xml
@@ -17,12 +17,12 @@
-->
<persistence version="2.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">
<persistence-unit name="xa_server">
- <class>com.xasecure.audit.entity.XXBaseAuditEvent</class>
- <class>com.xasecure.audit.entity.XXHBaseAuditEvent</class>
- <class>com.xasecure.audit.entity.XXHdfsAuditEvent</class>
- <class>com.xasecure.audit.entity.XXHiveAuditEvent</class>
- <class>com.xasecure.audit.entity.XXKnoxAuditEvent</class>
- <class>com.xasecure.audit.entity.XXStormAuditEvent</class>
+ <class>org.apache.ranger.audit.entity.XXBaseAuditEvent</class>
+ <class>org.apache.ranger.audit.entity.XXHBaseAuditEvent</class>
+ <class>org.apache.ranger.audit.entity.XXHdfsAuditEvent</class>
+ <class>org.apache.ranger.audit.entity.XXHiveAuditEvent</class>
+ <class>org.apache.ranger.audit.entity.XXKnoxAuditEvent</class>
+ <class>org.apache.ranger.audit.entity.XXStormAuditEvent</class>
<properties>
<property name="eclipselink.logging.level" value="SEVERE"/>
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/agents-common/conf/log4j.properties b/agents-common/conf/log4j.properties
index ca599f2..dd22c6d 100644
--- a/agents-common/conf/log4j.properties
+++ b/agents-common/conf/log4j.properties
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.logger.xaaudit.com.xasecure.audit.provider.Log4jAuditProvider=INFO, hdfsAppender
+log4j.logger.xaaudit.org.apache.ranger.audit.provider.Log4jAuditProvider=INFO, hdfsAppender
log4j.appender.hdfsAppender=org.apache.log4j.HdfsRollingFileAppender
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/scripts/enable-agent.sh
----------------------------------------------------------------------
diff --git a/agents-common/scripts/enable-agent.sh b/agents-common/scripts/enable-agent.sh
index 8024b74..f8d90ad 100755
--- a/agents-common/scripts/enable-agent.sh
+++ b/agents-common/scripts/enable-agent.sh
@@ -185,7 +185,7 @@ create_jceks() {
tempFile=/tmp/jce.$$.out
- $JAVA_HOME/bin/java -cp ":${PROJ_INSTALL_LIB_DIR}/*:" com.hortonworks.credentialapi.buildks create "${alias}" -value "${pass}" -provider "jceks://file${jceksFile}" > ${tempFile} 2>&1
+ $JAVA_HOME/bin/java -cp ":${PROJ_INSTALL_LIB_DIR}/*:" org.apache.ranger.credentialapi.buildks create "${alias}" -value "${pass}" -provider "jceks://file${jceksFile}" > ${tempFile} 2>&1
if [ $? -ne 0 ]
then
@@ -359,7 +359,7 @@ then
cp ${fullpathorgfn} ${archivefn}
if [ $? -eq 0 ]
then
- ${JAVA} -cp "${INSTALL_CP}" com.xasecure.utils.install.XmlConfigChanger -i ${archivefn} -o ${newfn} -c ${f} -p ${INSTALL_ARGS}
+ ${JAVA} -cp "${INSTALL_CP}" org.apache.ranger.utils.install.XmlConfigChanger -i ${archivefn} -o ${newfn} -c ${f} -p ${INSTALL_ARGS}
if [ $? -eq 0 ]
then
diff -w ${newfn} ${fullpathorgfn} > /dev/null 2>&1
@@ -544,13 +544,13 @@ then
}
{
if ($1 == "nimbus.authorizer") {
- if ($2 ~ /^[ \t]*"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer"[ \t]*$/) {
+ if ($2 ~ /^[ \t]*"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer"[ \t]*$/) {
configured = 1 ;
printf("%s\n",$0) ;
}
else {
printf("#%s\n",$0);
- printf("nimbus.authorizer: \"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer\"\n") ;
+ printf("nimbus.authorizer: \"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer\"\n") ;
configured = 1 ;
}
}
@@ -560,7 +560,7 @@ then
}
END {
if (configured == 0) {
- printf("nimbus.authorizer: \"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer\"\n") ;
+ printf("nimbus.authorizer: \"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer\"\n") ;
}
}' ${CFG_FILE} > ${CFG_FILE}.new && cat ${CFG_FILE}.new > ${CFG_FILE} && rm -f ${CFG_FILE}.new
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java b/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java
deleted file mode 100644
index 74d096a..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package com.xasecure.admin.client;
-
-
-import com.xasecure.admin.client.datatype.GrantRevokeData;
-
-
-public interface XaAdminClient {
- String getPolicies(String repositoryName, long lastModifiedTime, int policyCount, String agentName);
-
- void grantPrivilege(GrantRevokeData grData) throws Exception;
-
- void revokePrivilege(GrantRevokeData grData) throws Exception;
-}