You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2014/12/12 02:30:30 UTC

[47/51] [partial] incubator-ranger git commit: RANGER-194: Rename packages from xasecure to apache ranger

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
new file mode 100644
index 0000000..7414a7a
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
+import javax.persistence.Persistence;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.dao.DaoManager;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
+
+
+/*
+ * NOTE:
+ * - Instances of this class are not thread-safe.
+ */
+public class DbAuditProvider extends BaseAuditProvider {
+
+	private static final Log LOG = LogFactory.getLog(DbAuditProvider.class);
+
+	public static final String AUDIT_DB_IS_ASYNC_PROP           = "xasecure.audit.db.is.async";
+	public static final String AUDIT_DB_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.db.async.max.queue.size" ;
+	public static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.db.async.max.flush.interval.ms";
+
+	private static final String AUDIT_DB_BATCH_SIZE_PROP            = "xasecure.audit.db.batch.size" ;
+	private static final String AUDIT_DB_RETRY_MIN_INTERVAL_PROP    = "xasecure.audit.db.config.retry.min.interval.ms";
+	private static final String AUDIT_JPA_CONFIG_PROP_PREFIX        = "xasecure.audit.jpa.";
+	private static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE   = "xasecure.audit.credential.provider.file";
+	private static final String AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS	= "auditDBCred";
+	private static final String AUDIT_JPA_JDBC_PASSWORD  			= "javax.persistence.jdbc.password";
+
+	private EntityManagerFactory entityManagerFactory;
+	private DaoManager          daoManager;
+	
+	private int                 mCommitBatchSize      = 1;
+	private int                 mDbRetryMinIntervalMs = 60 * 1000;
+	private long                mLastCommitTime       = System.currentTimeMillis();
+	private ArrayList<AuditEventBase> mUncommitted    = new ArrayList<AuditEventBase>();
+	private Map<String, String> mDbProperties         = null;
+	private long                mLastDbFailedTime     = 0;
+
+	public DbAuditProvider() {
+		LOG.info("DbAuditProvider: creating..");
+	}
+
+	@Override
+	public void init(Properties props) {
+		LOG.info("DbAuditProvider.init()");
+
+		super.init(props);
+
+		mDbProperties         = BaseAuditProvider.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX);
+		mCommitBatchSize      = BaseAuditProvider.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000);
+		mDbRetryMinIntervalMs = BaseAuditProvider.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15 * 1000);
+
+		boolean isAsync = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false);
+
+		if(! isAsync) {
+			mCommitBatchSize = 1; // Batching not supported in sync mode
+		}
+
+		String jdbcPassword = getCredentialString(BaseAuditProvider.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS);
+
+		if(jdbcPassword != null && !jdbcPassword.isEmpty()) {
+			mDbProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword);
+		}
+	}
+
+	@Override
+	public void log(AuditEventBase event) {
+		LOG.debug("DbAuditProvider.log()");
+
+		boolean isSuccess = false;
+
+		try {
+			if(preCreate(event)) {
+				DaoManager daoMgr = daoManager;
+	
+				if(daoMgr != null) {
+					event.persist(daoMgr);
+	
+					isSuccess = postCreate(event);
+				}
+			}
+		} catch(Exception excp) {
+			logDbError("DbAuditProvider.log(): failed", excp);
+		} finally {
+			if(! isSuccess) {
+				logFailedEvent(event);
+			}
+		}
+	}
+
+	@Override
+	public void start() {
+		LOG.info("DbAuditProvider.start()");
+
+		init();
+	}
+
+	@Override
+	public void stop() {
+		LOG.info("DbAuditProvider.stop()");
+
+		cleanUp();
+	}
+	
+	@Override
+    public void waitToComplete() {
+		LOG.info("DbAuditProvider.waitToComplete()");
+	}
+
+	@Override
+	public boolean isFlushPending() {
+		return mUncommitted.size() > 0;
+	}
+	
+	@Override
+	public long getLastFlushTime() {
+		return mLastCommitTime;
+	}
+
+	@Override
+	public void flush() {
+		if(mUncommitted.size() > 0) {
+			boolean isSuccess = commitTransaction();
+
+			if(! isSuccess) {
+				for(AuditEventBase evt : mUncommitted) {
+					logFailedEvent(evt);
+				}
+			}
+
+			mUncommitted.clear();
+		}
+	}
+
+	private synchronized boolean init() {
+		long now = System.currentTimeMillis();
+
+		if((now - mLastDbFailedTime) < mDbRetryMinIntervalMs) {
+			return false;
+		}
+
+		LOG.info("DbAuditProvider: init()");
+
+		try {
+			entityManagerFactory = Persistence.createEntityManagerFactory("xa_server", mDbProperties);
+
+	   	    daoManager = new DaoManager();
+	   	    daoManager.setEntityManagerFactory(entityManagerFactory);
+
+	   	    daoManager.getEntityManager(); // this forces the connection to be made to DB
+		} catch(Exception excp) {
+			logDbError("DbAuditProvider: DB initalization failed", excp);
+
+			cleanUp();
+
+			return false;
+		}
+
+		return true;
+	}
+	
+	private synchronized void cleanUp() {
+		LOG.info("DbAuditProvider: cleanUp()");
+
+		try {
+			if(entityManagerFactory != null && entityManagerFactory.isOpen()) {
+				entityManagerFactory.close();
+			}
+		} catch(Exception excp) {
+			LOG.error("DbAuditProvider.cleanUp(): failed", excp);
+		} finally {
+			entityManagerFactory = null;
+			daoManager    = null;
+		}
+	}
+	
+	private boolean isDbConnected() {
+		EntityManager em = getEntityManager();
+		
+		return em != null && em.isOpen();
+	}
+	
+	private EntityManager getEntityManager() {
+		DaoManager daoMgr = daoManager;
+
+		if(daoMgr != null) {
+			try {
+				return daoMgr.getEntityManager();
+			} catch(Exception excp) {
+				logDbError("DbAuditProvider.getEntityManager(): failed", excp);
+
+				cleanUp();
+			}
+		}
+
+		return null;
+	}
+	
+	private void clearEntityManager() {
+		try {
+			EntityManager em = getEntityManager();
+			
+			if(em != null) {
+				em.clear();
+			}
+		} catch(Exception excp) {
+			LOG.warn("DbAuditProvider.clearEntityManager(): failed", excp);
+		}
+	}
+	
+	private EntityTransaction getTransaction() {
+		EntityManager em = getEntityManager();
+
+		return em != null ? em.getTransaction() : null;
+	}
+	
+	private boolean isInTransaction() {
+		EntityTransaction trx = getTransaction();
+
+		return trx != null && trx.isActive();
+	}
+
+	private boolean beginTransaction() {
+		EntityTransaction trx = getTransaction();
+		
+		if(trx != null && !trx.isActive()) {
+			trx.begin();
+		}
+
+		if(trx == null) {
+			LOG.warn("DbAuditProvider.beginTransaction(): trx is null");
+		}
+		
+		return trx != null;
+	}
+
+	private boolean commitTransaction() {
+		boolean           ret = false;
+		EntityTransaction trx = null;
+
+		try {
+			trx = getTransaction();
+
+			if(trx != null && trx.isActive()) {
+				trx.commit();
+
+				ret =true;
+			} else {
+				throw new Exception("trx is null or not active");
+			}
+		} catch(Exception excp) {
+			logDbError("DbAuditProvider.commitTransaction(): failed", excp);
+
+			cleanUp(); // so that next insert will try to init()
+		} finally {
+			mLastCommitTime = System.currentTimeMillis();
+
+			clearEntityManager();
+		}
+
+		return ret;
+	}
+	
+	private boolean preCreate(AuditEventBase event) {
+		boolean ret = true;
+
+		if(!isDbConnected()) {
+			ret = init();
+		}
+
+		if(ret) {
+			if(! isInTransaction()) {
+				ret = beginTransaction();
+			}
+		}
+		
+		return ret;
+	}
+	
+	private boolean postCreate(AuditEventBase event) {
+		boolean ret = true;
+
+		if(mCommitBatchSize <= 1) {
+			ret = commitTransaction();
+		} else {
+			mUncommitted.add(event);
+
+			if((mUncommitted.size() % mCommitBatchSize) == 0) {
+				ret = commitTransaction();
+
+				if(! ret) {
+					for(AuditEventBase evt : mUncommitted) {
+						if(evt != event) {
+							logFailedEvent(evt);
+						}
+					}
+				}
+
+				mUncommitted.clear();
+			}
+ 		}
+ 		return ret;
+	}
+
+	private void logDbError(String msg, Exception excp) {
+		long now = System.currentTimeMillis();
+
+		if((now - mLastDbFailedTime) > mDbRetryMinIntervalMs) {
+			mLastDbFailedTime = now;
+ 		}
+
+		LOG.warn(msg, excp);
+	}
+
+	private String getCredentialString(String url,String alias) {
+		String ret = null;
+
+		if(url != null && alias != null) {
+			char[] cred = RangerCredentialProvider.getInstance().getCredentialString(url,alias);
+
+			if ( cred != null ) {
+				ret = new String(cred);	
+			}
+		}
+		
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java
new file mode 100644
index 0000000..7396fd0
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+public interface DebugTracer {
+	void debug(String msg);
+	void debug(String msg, Throwable excp);
+	void info(String msg);
+	void info(String msg, Throwable excp);
+	void warn(String msg);
+	void warn(String msg, Throwable excp);
+	void error(String msg);
+	void error(String msg, Throwable excp);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
new file mode 100644
index 0000000..a6e3ef1
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.util.Properties;
+
+import org.apache.ranger.audit.model.AuditEventBase;
+
+
+public class DummyAuditProvider implements AuditProvider {
+	@Override
+	public void init(Properties prop) {
+		// intentionally left empty
+	}
+
+	@Override
+	public void log(AuditEventBase event) {
+		// intentionally left empty
+	}
+
+	@Override
+	public void start() {
+		// intentionally left empty
+	}
+
+	@Override
+	public void stop() {
+		// intentionally left empty
+	}
+
+	@Override
+	public void waitToComplete() {
+		// intentionally left empty
+	}
+
+	@Override
+	public boolean isFlushPending() {
+		return false;
+	}
+	
+	@Override
+	public long getLastFlushTime() {
+		return 0;
+	}
+
+	@Override
+	public void flush() {
+		// intentionally left empty
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
new file mode 100644
index 0000000..cdc4d53
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+
+public class LocalFileLogBuffer<T> implements LogBuffer<T> {
+	private String  mDirectory               = null;
+	private String  mFile                    = null;
+	private int     mFlushIntervalSeconds    = 1 * 60;
+	private int     mFileBufferSizeBytes     = 8 * 1024;
+	private String  mEncoding                = null;
+	private boolean mIsAppend                = true;
+	private int     mRolloverIntervalSeconds = 10 * 60;
+	private String  mArchiveDirectory        = null;
+	private int     mArchiveFileCount        = 10;
+	private DebugTracer mLogger              = null;
+
+	private Writer mWriter           = null;
+	private String mBufferFilename   = null;
+	private long   mNextRolloverTime = 0;
+	private long   mNextFlushTime    = 0;
+	private int    mFileOpenRetryIntervalInMs = 60 * 1000;
+	private long   mNextFileOpenRetryTime     = 0;
+
+	private DestinationDispatcherThread<T> mDispatcherThread = null;
+	
+	public LocalFileLogBuffer(DebugTracer tracer) {
+		mLogger = tracer;
+	}
+
+	public String getDirectory() {
+		return mDirectory;
+	}
+
+	public void setDirectory(String directory) {
+		mDirectory = directory;
+	}
+
+	public String getFile() {
+		return mFile;
+	}
+
+	public void setFile(String file) {
+		mFile = file;
+	}
+
+	public int getFileBufferSizeBytes() {
+		return mFileBufferSizeBytes;
+	}
+
+	public void setFileBufferSizeBytes(int fileBufferSizeBytes) {
+		mFileBufferSizeBytes = fileBufferSizeBytes;
+	}
+
+	public int getFlushIntervalSeconds() {
+		return mFlushIntervalSeconds;
+	}
+
+	public void setFlushIntervalSeconds(int flushIntervalSeconds) {
+		mFlushIntervalSeconds = flushIntervalSeconds;
+	}
+
+	public String getEncoding() {
+		return mEncoding;
+	}
+
+	public void setEncoding(String encoding) {
+		mEncoding = encoding;
+	}
+
+	public boolean getIsAppend() {
+		return mIsAppend;
+	}
+
+	public void setIsAppend(boolean isAppend) {
+		mIsAppend = isAppend;
+	}
+
+	public int getRolloverIntervalSeconds() {
+		return mRolloverIntervalSeconds;
+	}
+
+	public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+		mRolloverIntervalSeconds = rolloverIntervalSeconds;
+	}
+
+	public String getArchiveDirectory() {
+		return mArchiveDirectory;
+	}
+
+	public void setArchiveDirectory(String archiveDirectory) {
+		mArchiveDirectory = archiveDirectory;
+	}
+
+	public int getArchiveFileCount() {
+		return mArchiveFileCount;
+	}
+
+	public void setArchiveFileCount(int archiveFileCount) {
+		mArchiveFileCount = archiveFileCount;
+	}
+
+
+	@Override
+	public void start(LogDestination<T> destination) {
+		mLogger.debug("==> LocalFileLogBuffer.start()");
+
+		mDispatcherThread = new DestinationDispatcherThread<T>(this, destination, mLogger);
+
+		mDispatcherThread.start();
+
+		mLogger.debug("<== LocalFileLogBuffer.start()");
+	}
+
+	@Override
+	public void stop() {
+		mLogger.debug("==> LocalFileLogBuffer.stop()");
+		
+		DestinationDispatcherThread<T> dispatcherThread = mDispatcherThread;
+		mDispatcherThread = null;
+
+		if(dispatcherThread != null && dispatcherThread.isAlive()) {
+			dispatcherThread.stopThread();
+
+			try {
+				dispatcherThread.join();
+			} catch (InterruptedException e) {
+				mLogger.warn("LocalFileLogBuffer.stop(): failed in waiting for DispatcherThread", e);
+			}
+		}
+
+		closeFile();
+
+		mLogger.debug("<== LocalFileLogBuffer.stop()");
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return mWriter != null;
+	}
+
+	@Override
+	public boolean add(T log) {
+		boolean ret = false;
+
+		String msg = MiscUtil.stringify(log);
+
+		if(msg.contains(MiscUtil.LINE_SEPARATOR)) {
+			msg = msg.replace(MiscUtil.LINE_SEPARATOR, MiscUtil.ESCAPE_STR + MiscUtil.LINE_SEPARATOR);
+		}
+
+		synchronized(this) {
+			checkFileStatus();
+	
+			Writer writer = mWriter;
+	
+			if(writer != null) {
+				try {
+					writer.write(msg + MiscUtil.LINE_SEPARATOR);
+					
+					if(mFileBufferSizeBytes == 0) {
+						writer.flush();
+					}
+	
+					ret = true;
+				} catch(IOException excp) {
+					mLogger.warn("LocalFileLogBuffer.add(): write failed", excp);
+
+					closeFile();
+				}
+			}
+		}
+
+		return ret;
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return mDispatcherThread == null || mDispatcherThread.isIdle();
+	}
+
+	private synchronized void openFile() {
+		mLogger.debug("==> LocalFileLogBuffer.openFile()");
+
+		long now = System.currentTimeMillis();
+
+		closeFile();
+
+		if(mNextFileOpenRetryTime <= now) {
+			try {
+				mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+				long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+				mBufferFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile, startTime);
+
+				MiscUtil.createParents(new File(mBufferFilename));
+
+				FileOutputStream ostream = null;
+				try {
+					ostream = new FileOutputStream(mBufferFilename, mIsAppend);
+				} catch(Exception excp) {
+					mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file " + mBufferFilename, excp);
+				}
+
+				if(ostream != null) {
+					mWriter = createWriter(ostream);
+
+					if(mWriter != null) {
+						mLogger.debug("LocalFileLogBuffer.openFile(): opened file " + mBufferFilename);
+
+						mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
+					} else {
+						mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mBufferFilename);
+
+						mBufferFilename = null;
+					}
+				}
+			} finally {
+				if(mWriter == null) {
+					mNextFileOpenRetryTime = now + mFileOpenRetryIntervalInMs;
+				}
+			}
+		}
+
+		mLogger.debug("<== LocalFileLogBuffer.openFile()");
+	}
+
+	private synchronized void closeFile() {
+		mLogger.debug("==> LocalFileLogBuffer.closeFile()");
+
+		Writer writer = mWriter;
+
+		mWriter = null;
+
+		if(writer != null) {
+			try {
+				writer.flush();
+				writer.close();
+			} catch(IOException excp) {
+				mLogger.warn("LocalFileLogBuffer: failed to close file " + mBufferFilename, excp);
+			}
+
+			if(mDispatcherThread != null) {
+				mDispatcherThread.addLogfile(mBufferFilename);
+			}
+		}
+
+		mLogger.debug("<== LocalFileLogBuffer.closeFile()");
+	}
+
+	private void rollover() {
+		mLogger.debug("==> LocalFileLogBuffer.rollover()");
+
+		closeFile();
+
+		openFile();
+
+		mLogger.debug("<== LocalFileLogBuffer.rollover()");
+	}
+
+	private void checkFileStatus() {
+		long now = System.currentTimeMillis();
+
+		if(now > mNextRolloverTime) {
+			rollover();
+		} else  if(mWriter == null) {
+			openFile();
+		} else if(now > mNextFlushTime) {
+			try {
+				mNextFlushTime = now + (mFlushIntervalSeconds * 1000L);
+
+				mWriter.flush();
+			} catch (IOException excp) {
+				mLogger.warn("LocalFileLogBuffer: failed to flush to file " + mBufferFilename, excp);
+			}
+		}
+	}
+
+	private Writer createWriter(OutputStream os ) {
+	    Writer writer = null;
+
+	    if(os != null) {
+			if(mEncoding != null) {
+				try {
+					writer = new OutputStreamWriter(os, mEncoding);
+				} catch(UnsupportedEncodingException excp) {
+					mLogger.warn("LocalFileLogBuffer: failed to create output writer for file " + mBufferFilename, excp);
+				}
+			}
+	
+			if(writer == null) {
+				writer = new OutputStreamWriter(os);
+			}
+
+			if(mFileBufferSizeBytes > 0 && writer != null) {
+	    		writer = new BufferedWriter(writer, mFileBufferSizeBytes);
+	    	}
+	    }
+
+	    return writer;
+	}
+
+	boolean isCurrentFilename(String filename) {
+		return mBufferFilename != null && filename != null && filename.equals(mBufferFilename);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+
+		sb.append("LocalFileLogBuffer {");
+		sb.append("Directory=").append(mDirectory).append("; ");
+		sb.append("File=").append(mFile).append("; ");
+		sb.append("RolloverIntervaSeconds=").append(mRolloverIntervalSeconds).append("; ");
+		sb.append("ArchiveDirectory=").append(mArchiveDirectory).append("; ");
+		sb.append("ArchiveFileCount=").append(mArchiveFileCount);
+		sb.append("}");
+
+		return sb.toString();
+	}
+	
+}
+
+class DestinationDispatcherThread<T> extends Thread {
+	private TreeSet<String>        mCompletedLogfiles = new TreeSet<String>();
+	private boolean                mStopThread        = false;
+	private LocalFileLogBuffer<T>  mFileLogBuffer     = null;
+	private LogDestination<T>      mDestination       = null;
+	private DebugTracer            mLogger            = null;
+
+	private String         mCurrentLogfile = null;
+	private BufferedReader mReader         = null;
+
+	public DestinationDispatcherThread(LocalFileLogBuffer<T> fileLogBuffer, LogDestination<T> destination, DebugTracer tracer) {
+		super(DestinationDispatcherThread.class.getSimpleName() + "-" + System.currentTimeMillis());
+
+		mLogger = tracer;
+
+		mFileLogBuffer = fileLogBuffer;
+		mDestination   = destination;
+
+		setDaemon(true);
+	}
+
+	public void addLogfile(String filename) {
+		mLogger.debug("==> DestinationDispatcherThread.addLogfile(" + filename + ")");
+
+		if(filename != null) {
+			synchronized(mCompletedLogfiles) {
+				mCompletedLogfiles.add(filename);
+				mCompletedLogfiles.notify();
+			}
+		}
+
+		mLogger.debug("<== DestinationDispatcherThread.addLogfile(" + filename + ")");
+	}
+
+	public void stopThread() {
+		mStopThread = true;
+	}
+
+	public boolean isIdle() {
+		synchronized(mCompletedLogfiles) {
+			return mCompletedLogfiles.isEmpty() && mCurrentLogfile == null;
+		}
+	}
+
+	@Override
+	public void run() {
+		UserGroupInformation loginUser = null;
+
+		try {
+			loginUser = UserGroupInformation.getLoginUser();
+		} catch (IOException excp) {
+			mLogger.error("DestinationDispatcherThread.run(): failed to get login user details. Audit files will not be sent to HDFS destination", excp);
+		}
+
+		if(loginUser == null) {
+			mLogger.error("DestinationDispatcherThread.run(): failed to get login user. Audit files will not be sent to HDFS destination");
+
+			return;
+		}
+
+		loginUser.doAs(new PrivilegedAction<Integer>() {
+			@Override
+			public Integer run() {
+				doRun();
+
+				return 0;
+			}
+		});
+	}
+
+	private void doRun() {
+		init();
+
+		mDestination.start();
+
+		long pollIntervalInMs = 1000L;
+
+		while(! mStopThread) {
+			synchronized(mCompletedLogfiles) {
+				while(mCompletedLogfiles.isEmpty() && !mStopThread) {
+					try {
+						mCompletedLogfiles.wait(pollIntervalInMs);
+					} catch(InterruptedException excp) {
+						throw new RuntimeException("DestinationDispatcherThread.run(): failed to wait for log file", excp);
+					}
+				}
+				
+				mCurrentLogfile = mCompletedLogfiles.pollFirst();
+			}
+			
+			if(mCurrentLogfile != null) {
+				sendCurrentFile();
+			}
+		}
+
+		mDestination.stop();
+	}
+
+	private void init() {
+		mLogger.debug("==> DestinationDispatcherThread.init()");
+
+		String dirName = MiscUtil.replaceTokens(mFileLogBuffer.getDirectory(), 0);
+		
+		if(dirName != null) {
+			File directory = new File(dirName);
+		
+			if(directory.exists() && directory.isDirectory()) {
+				File[] files = directory.listFiles();
+		
+				if(files != null) {
+					for(File file : files) {
+						if(file.exists() && file.isFile() && file.canRead()) {
+							String filename = file.getAbsolutePath();
+							if(! mFileLogBuffer.isCurrentFilename(filename)) {
+								addLogfile(filename);
+							}
+						}
+					}
+				}
+			}
+		}
+
+		mLogger.debug("<== DestinationDispatcherThread.init()");
+	}
+	
+	private boolean sendCurrentFile() {
+		mLogger.debug("==> DestinationDispatcherThread.sendCurrentFile()");
+
+		boolean ret = false;
+
+		long destinationPollIntervalInMs = 1000L;
+
+		openCurrentFile();
+
+		 while(!mStopThread) {
+			String log = getNextStringifiedLog();
+
+			if(log == null) { // reached end-of-file
+				ret = true;
+
+				break;
+			}
+
+			// loop until log is sent successfully
+			while(!mStopThread && !mDestination.sendStringified(log)) {
+				try {
+					Thread.sleep(destinationPollIntervalInMs);
+				} catch(InterruptedException excp) {
+					throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed while waiting for destination to be available", excp);
+				}
+			}
+		}
+
+		closeCurrentFile();
+
+		if(!mStopThread) {
+			mDestination.flush();
+			archiveCurrentFile();
+		}
+
+		mLogger.debug("<== DestinationDispatcherThread.sendCurrentFile()");
+
+		return ret;
+	}
+
+	private String getNextStringifiedLog() {
+		String log = null;
+
+		if(mReader != null) {
+			try {
+				while(true) {
+					String line = mReader.readLine();
+
+					if(line == null) { // reached end-of-file
+						break;
+					}
+
+					if(line.endsWith(MiscUtil.ESCAPE_STR)) {
+						line = line.substring(0, line.length() - MiscUtil.ESCAPE_STR.length());
+
+						if(log == null) {
+							log = line;
+						} else {
+							log += MiscUtil.LINE_SEPARATOR;
+							log += line;
+						}
+
+						continue;
+					} else {
+						if(log == null) {
+							log = line;
+						} else {
+							log += line;
+						}
+						break;
+					}
+				}
+			} catch (IOException excp) {
+				mLogger.warn("getNextStringifiedLog.getNextLog(): failed to read from file " + mCurrentLogfile, excp);
+			}
+		}
+
+		return log;
+	}
+
+	private void openCurrentFile() {
+		mLogger.debug("==> openCurrentFile(" + mCurrentLogfile + ")");
+
+		if(mCurrentLogfile != null) {
+			try {
+				FileInputStream inStr = new FileInputStream(mCurrentLogfile);
+				
+				InputStreamReader strReader = createReader(inStr);
+				
+				if(strReader != null) {
+					mReader = new BufferedReader(strReader);
+				}
+			} catch(FileNotFoundException excp) {
+				mLogger.warn("openNextFile(): error while opening file " + mCurrentLogfile, excp);
+			}
+		}
+
+		mLogger.debug("<== openCurrentFile(" + mCurrentLogfile + ")");
+	}
+	
+	private void closeCurrentFile() {
+		mLogger.debug("==> closeCurrentFile(" + mCurrentLogfile + ")");
+
+		if(mReader != null) {
+			try {
+				mReader.close();
+			} catch(IOException excp) {
+				// ignore
+			}
+		}
+		mReader = null;
+
+		mLogger.debug("<== closeCurrentFile(" + mCurrentLogfile + ")");
+	}
+
+	private void archiveCurrentFile() {
+		if(mCurrentLogfile != null) {
+			File   logFile         = new File(mCurrentLogfile);
+			String archiveDirName  = MiscUtil.replaceTokens(mFileLogBuffer.getArchiveDirectory(), 0);
+			String archiveFilename = archiveDirName + File.separator +logFile.getName();
+
+			try {
+				if(logFile.exists()) {
+					File archiveFile = new File(archiveFilename);
+
+					MiscUtil.createParents(archiveFile);
+
+					if(! logFile.renameTo(archiveFile)) {
+						// TODO: renameTo() does not work in all cases. in case of failure, copy the file contents to the destination and delete the file
+						mLogger.warn("archiving failed to move file: " + mCurrentLogfile + " ==> " + archiveFilename);
+					}
+
+					File   archiveDir = new File(archiveDirName);
+					File[] files      = archiveDir.listFiles(new FileFilter() {
+											@Override
+											public boolean accept(File f) {
+												return f.isFile();
+											}
+										});
+
+					int numOfFilesToDelete = files == null ? 0 : (files.length - mFileLogBuffer.getArchiveFileCount());
+
+					if(numOfFilesToDelete > 0) {
+						Arrays.sort(files, new Comparator<File>() {
+												@Override
+												public int compare(File f1, File f2) {
+													return (int)(f1.lastModified() - f2.lastModified());
+												}
+											});
+
+						for(int i = 0; i < numOfFilesToDelete; i++) {
+							if(! files[i].delete()) {
+								mLogger.warn("archiving failed to delete file: " + files[i].getAbsolutePath());
+							}
+						}
+					}
+				}
+			} catch(Exception excp) {
+				mLogger.warn("archiveCurrentFile(): faile to move " + mCurrentLogfile + " to archive location " + archiveFilename, excp);
+			}
+		}
+		mCurrentLogfile = null;
+	}
+
+	public InputStreamReader createReader(InputStream iStr) {
+		InputStreamReader reader = null;
+
+	    if(iStr != null) {
+			String encoding = mFileLogBuffer.getEncoding();
+
+			if(encoding != null) {
+				try {
+					reader = new InputStreamReader(iStr, encoding);
+				} catch(UnsupportedEncodingException excp) {
+					mLogger.warn("createReader(): failed to create input reader.", excp);
+				}
+			}
+
+			if(reader == null) {
+				reader = new InputStreamReader(iStr);
+			}
+	    }
+
+	    return reader;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+
+		sb.append("DestinationDispatcherThread {");
+		sb.append("ThreadName=").append(this.getName()).append("; ");
+		sb.append("CompletedLogfiles.size()=").append(mCompletedLogfiles.size()).append("; ");
+		sb.append("StopThread=").append(mStopThread).append("; ");
+		sb.append("CurrentLogfile=").append(mCurrentLogfile);
+		sb.append("}");
+
+		return sb.toString();
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
new file mode 100644
index 0000000..0d0809a
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.provider;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+
+public class Log4jAuditProvider extends BaseAuditProvider {
+
+	private static final Log LOG      = LogFactory.getLog(Log4jAuditProvider.class);
+	private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class.getName());
+
+	public static final String AUDIT_LOG4J_IS_ASYNC_PROP           = "xasecure.audit.log4j.is.async";
+	public static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.log4j.async.max.queue.size" ;
+	public static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms";
+
+	private Gson mGsonBuilder = null;
+
+	public Log4jAuditProvider() {
+		LOG.info("Log4jAuditProvider: creating..");
+	}
+
+	@Override
+	public void init(Properties props) {
+		LOG.info("Log4jAuditProvider.init()");
+
+		super.init(props);
+
+		try {
+			mGsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create();
+		} catch(Throwable excp) {
+			LOG.warn("Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json", excp);
+		}
+	}
+
+	@Override
+	public void log(AuditEventBase event) {
+		if(! AUDITLOG.isInfoEnabled())
+			return;
+		
+		if(event != null) {
+			String eventStr = mGsonBuilder != null ? mGsonBuilder.toJson(event) : event.toString();
+
+			AUDITLOG.info(eventStr);
+		}
+	}
+
+	@Override
+	public void start() {
+		// intentionally left empty
+	}
+
+	@Override
+	public void stop() {
+		// intentionally left empty
+	}
+
+	@Override
+    public void waitToComplete() {
+		// intentionally left empty
+	}
+
+	@Override
+	public boolean isFlushPending() {
+		return false;
+	}
+	
+	@Override
+	public long getLastFlushTime() {
+		return 0;
+	}
+
+	@Override
+	public void flush() {
+		// intentionally left empty
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java
new file mode 100644
index 0000000..bdb1a47
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import org.apache.commons.logging.Log;
+
+public class Log4jTracer implements DebugTracer {
+	private Log mLogger = null;
+
+	public Log4jTracer(Log logger) {
+		mLogger = logger;
+	}
+
+	public void debug(String msg) {
+		mLogger.debug(msg);
+	}
+
+	public void debug(String msg, Throwable excp) {
+		mLogger.debug(msg, excp);
+	}
+
+	public void info(String msg) {
+		mLogger.info(msg);
+	}
+
+	public void info(String msg, Throwable excp) {
+		mLogger.info(msg, excp);
+	}
+
+	public void warn(String msg) {
+		mLogger.warn(msg);
+	}
+
+	public void warn(String msg, Throwable excp) {
+		mLogger.warn(msg, excp);
+	}
+
+	public void error(String msg) {
+		mLogger.error(msg);
+	}
+
+	public void error(String msg, Throwable excp) {
+		mLogger.error(msg, excp);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java
new file mode 100644
index 0000000..aebef1b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+
+public interface LogBuffer<T> {
+	public void start(LogDestination<T> destination);
+
+	public void stop();
+
+	boolean isAvailable();
+
+	public boolean isEmpty();
+
+	public boolean add(T log);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
new file mode 100644
index 0000000..44e94ad
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+
+public interface LogDestination<T> {
+	public void start();
+
+	public void stop();
+
+	boolean isAvailable();
+
+	public boolean send(T log);
+
+	public boolean sendStringified(String log);
+
+	public boolean flush();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
new file mode 100644
index 0000000..17230b2
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.rmi.dgc.VMID;
+import java.text.SimpleDateFormat;
+import java.util.UUID;
+
+import org.apache.log4j.helpers.LogLog;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class MiscUtil {
+	public static final String TOKEN_START        = "%";
+	public static final String TOKEN_END          = "%";
+	public static final String TOKEN_HOSTNAME     = "hostname";
+	public static final String TOKEN_APP_TYPE     = "app-type";
+	public static final String TOKEN_JVM_INSTANCE = "jvm-instance";
+	public static final String TOKEN_TIME         = "time:";
+	public static final String TOKEN_PROPERTY     = "property:";
+	public static final String TOKEN_ENV          = "env:";
+	public static final String ESCAPE_STR           = "\\";
+
+	static VMID sJvmID = new VMID();
+
+	public static String LINE_SEPARATOR = System.getProperty("line.separator");
+
+	private static Gson   sGsonBuilder = null;
+	private static String sApplicationType = null;
+
+	static {
+		try {
+			sGsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS").create();
+		} catch(Throwable excp) {
+			LogLog.warn("failed to create GsonBuilder object. stringigy() will return obj.toString(), instead of Json", excp);
+		}
+	}
+
+	public static String replaceTokens(String str, long time) {
+		if(str == null) {
+			return str;
+		}
+
+		if(time <= 0) {
+			time = System.currentTimeMillis();
+		}
+
+        for(int startPos = 0; startPos < str.length(); ) {
+            int tagStartPos = str.indexOf(TOKEN_START, startPos);
+            
+            if(tagStartPos == -1) {
+            	break;
+            }
+
+            int tagEndPos = str.indexOf(TOKEN_END, tagStartPos + TOKEN_START.length());
+
+            if(tagEndPos == -1) {
+            	break;
+            }
+
+            String tag   = str.substring(tagStartPos, tagEndPos+TOKEN_END.length());
+            String token = tag.substring(TOKEN_START.length(), tag.lastIndexOf(TOKEN_END));
+            String val   = "";
+
+            if(token != null) {
+	            if(token.equals(TOKEN_HOSTNAME)) {
+	            	val = getHostname();
+	            } else if(token.equals(TOKEN_APP_TYPE)) {
+	            	val = getApplicationType();
+	            } else if(token.equals(TOKEN_JVM_INSTANCE)) {
+	            	val = getJvmInstanceId();
+	            } else if(token.startsWith(TOKEN_PROPERTY)) {
+	            	String propertyName = token.substring(TOKEN_PROPERTY.length());
+	
+	                val = getSystemProperty(propertyName);
+	            } else if(token.startsWith(TOKEN_ENV)) {
+	            	String envName = token.substring(TOKEN_ENV.length());
+	
+	                val = getEnv(envName);
+	            } else if(token.startsWith(TOKEN_TIME)) {
+	                String dtFormat = token.substring(TOKEN_TIME.length());
+	                
+	                val = getFormattedTime(time, dtFormat);
+	            }
+            }
+
+            if(val == null) {
+            	val = "";
+            }
+
+            str = str.substring(0, tagStartPos) + val + str.substring(tagEndPos + TOKEN_END.length());
+            startPos = tagStartPos + val.length();
+        }
+
+        return str;
+	}
+
+	public static String getHostname() {
+		String ret = null;
+
+		try {
+			ret = InetAddress.getLocalHost().getHostName();
+		} catch (Exception excp) {
+			LogLog.warn("getHostname()", excp);
+		}
+
+		return ret;
+	}
+
+	public static void setApplicationType(String applicationType) {
+		sApplicationType = applicationType;
+	}
+
+	public static String getApplicationType() {
+		return sApplicationType;
+	}
+
+	public static String getJvmInstanceId() {
+		String ret = Integer.toString(Math.abs(sJvmID.toString().hashCode()));
+
+		return ret;
+	}
+
+	public static String getSystemProperty(String propertyName) {
+		String ret = null;
+
+		try {
+			ret = propertyName != null ? System.getProperty(propertyName) : null;
+		} catch (Exception excp) {
+			LogLog.warn("getSystemProperty(" + propertyName + ") failed", excp);
+		}
+
+		return ret;
+	}
+
+	public static String getEnv(String envName) {
+		String ret = null;
+
+		try {
+			ret = envName != null ? System.getenv(envName) : null;
+		} catch (Exception excp) {
+			LogLog.warn("getenv(" + envName + ") failed", excp);
+		}
+
+		return ret;
+	}
+
+	public static String getFormattedTime(long time, String format) {
+		String ret = null;
+
+		try {
+            SimpleDateFormat sdf = new SimpleDateFormat(format);
+
+            ret = sdf.format(time);
+		} catch (Exception excp) {
+			LogLog.warn("SimpleDateFormat.format() failed: " + format, excp);
+		}
+
+		return ret;
+	}
+
+	public static void createParents(File file) {
+		if(file != null) {
+			String parentName = file.getParent();
+
+			if (parentName != null) {
+				File parentDir = new File(parentName);
+
+				if(!parentDir.exists()) {
+					if(! parentDir.mkdirs()) {
+						LogLog.warn("createParents(): failed to create " + parentDir.getAbsolutePath());
+					}
+				}
+			}
+		}
+	}
+
+	public static long getNextRolloverTime(long lastRolloverTime, long interval) {
+		long now = System.currentTimeMillis() / 1000 * 1000; // round to second
+
+		if(lastRolloverTime <= 0) {
+			// should this be set to the next multiple-of-the-interval from start of the day?
+			return now + interval;
+		} else if(lastRolloverTime <= now) {
+			long nextRolloverTime = now + interval;
+
+			// keep it at 'interval' boundary
+			long trimInterval = (nextRolloverTime - lastRolloverTime) % interval;
+
+			return nextRolloverTime - trimInterval;
+		} else {
+			return lastRolloverTime;
+		}
+	}
+
+	public static long getRolloverStartTime(long nextRolloverTime, long interval) {
+		return (nextRolloverTime <= interval) ? System.currentTimeMillis() : nextRolloverTime - interval;
+	}
+
+	public static int parseInteger(String str, int defValue) {
+		int ret = defValue;
+
+		if(str != null) {
+			try {
+				ret = Integer.parseInt(str);
+			} catch(Exception excp) {
+				// ignore
+			}
+		}
+
+		return ret;
+	}
+	
+	public static String generateUniqueId() {
+		return UUID.randomUUID().toString();
+	}
+
+	public static <T> String stringify(T log) {
+		String ret = null;
+
+		if(log != null) {
+			if(log instanceof String) {
+				ret = (String)log;
+			} else if(MiscUtil.sGsonBuilder != null) {
+				ret = MiscUtil.sGsonBuilder.toJson(log);
+			} else {
+				ret = log.toString();
+			}
+		}
+
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
new file mode 100644
index 0000000..0c2bca6
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.HBaseAuditEvent;
+import org.apache.ranger.audit.model.HdfsAuditEvent;
+import org.apache.ranger.audit.model.HiveAuditEvent;
+import org.apache.ranger.audit.model.KnoxAuditEvent;
+import org.apache.ranger.audit.model.StormAuditEvent;
+
+
+public class MultiDestAuditProvider extends BaseAuditProvider {
+
+	private static final Log LOG = LogFactory.getLog(MultiDestAuditProvider.class);
+
+	protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>();
+
+
+	public MultiDestAuditProvider() {
+		LOG.info("MultiDestAuditProvider: creating..");
+	}
+
+	public MultiDestAuditProvider(AuditProvider provider) {
+		addAuditProvider(provider);
+	}
+
+	@Override
+	public void init(Properties props) {
+		LOG.info("MultiDestAuditProvider.init()");
+
+		super.init(props);
+
+		for(AuditProvider provider : mProviders) {
+    		try {
+                provider.init(props);
+    		} catch(Throwable excp) {
+    			LOG.info("MultiDestAuditProvider.init(): failed" + provider.getClass().getCanonicalName() + ")");
+    		}
+        }
+	}
+
+	public void addAuditProvider(AuditProvider provider) {
+		if(provider != null) {
+			LOG.info("MultiDestAuditProvider.addAuditProvider(providerType=" + provider.getClass().getCanonicalName() + ")");
+
+			mProviders.add(provider);
+		}
+	}
+
+	public void addAuditProviders(List<AuditProvider> providers) {
+		if(providers != null) {
+			for(AuditProvider provider : providers) {
+				addAuditProvider(provider);
+			}
+		}
+	}
+
+	@Override
+	public void log(AuditEventBase event) {
+        for(AuditProvider provider : mProviders) {
+    		try {
+                provider.log(event);
+    		} catch(Throwable excp) {
+    			logFailedEvent(event, excp);
+    		}
+        }
+	}
+
+	@Override
+	public void start() {
+		for(AuditProvider provider : mProviders) {
+    		try {
+				provider.start();
+    		} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.start(): failed for provider { " + provider.getClass().getName() + " }", excp);
+    		}
+		}
+	}
+
+	@Override
+	public void stop() {
+		for(AuditProvider provider : mProviders) {
+			try {
+				provider.stop();
+			} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.stop(): failed for provider { " + provider.getClass().getName() + " }", excp);
+			}
+		}
+	}
+
+	@Override
+    public void waitToComplete() {
+		for(AuditProvider provider : mProviders) {
+			try {
+				provider.waitToComplete();
+			} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName() + " }", excp);
+			}
+		}
+	}
+	
+	@Override
+	public boolean isFlushPending() {
+		for(AuditProvider provider : mProviders) {
+			if(provider.isFlushPending()) {
+				return true;
+			}
+		}
+		
+		return false;
+	}
+	
+	@Override
+	public long getLastFlushTime() {
+		long lastFlushTime = 0;
+		for(AuditProvider provider : mProviders) {
+			long flushTime = provider.getLastFlushTime();
+			
+			if(flushTime != 0) {
+				if(lastFlushTime == 0 || lastFlushTime > flushTime) {
+					lastFlushTime = flushTime;
+				}
+			}
+		}
+		
+		return lastFlushTime;
+	}
+	
+	@Override
+	public void flush() {
+		for(AuditProvider provider : mProviders) {
+			try {
+				provider.flush();
+			} catch(Throwable excp) {
+    			LOG.error("AsyncAuditProvider.flush(): failed for provider { " + provider.getClass().getName() + " }", excp);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
new file mode 100644
index 0000000..620951c
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ranger.audit.provider.hdfs;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.BufferedAuditProvider;
+import org.apache.ranger.audit.provider.DebugTracer;
+import org.apache.ranger.audit.provider.LocalFileLogBuffer;
+import org.apache.ranger.audit.provider.Log4jTracer;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class HdfsAuditProvider extends BufferedAuditProvider {
+	private static final Log LOG = LogFactory.getLog(HdfsAuditProvider.class);
+
+	public static final String AUDIT_HDFS_IS_ASYNC_PROP           = "xasecure.audit.hdfs.is.async";
+	public static final String AUDIT_HDFS_MAX_QUEUE_SIZE_PROP     = "xasecure.audit.hdfs.async.max.queue.size" ;
+	public static final String AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.hdfs.async.max.flush.interval.ms";
+
+	public HdfsAuditProvider() {
+	}
+
+	public void init(Properties props) {
+		LOG.info("HdfsAuditProvider.init()");
+
+		super.init(props);
+
+		Map<String, String> hdfsProps = BaseAuditProvider.getPropertiesWithPrefix(props, "xasecure.audit.hdfs.config.");
+
+		String encoding                                = hdfsProps.get("encoding");
+
+		String hdfsDestinationDirectory                = hdfsProps.get("destination.directory");
+		String hdfsDestinationFile                     = hdfsProps.get("destination.file");
+		int    hdfsDestinationFlushIntervalSeconds     = MiscUtil.parseInteger(hdfsProps.get("destination.flush.interval.seconds"), 15 * 60);
+		int    hdfsDestinationRolloverIntervalSeconds  = MiscUtil.parseInteger(hdfsProps.get("destination.rollover.interval.seconds"), 24 * 60 * 60);
+		int    hdfsDestinationOpenRetryIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.open.retry.interval.seconds"), 60);
+
+		String localFileBufferDirectory               = hdfsProps.get("local.buffer.directory");
+		String localFileBufferFile                    = hdfsProps.get("local.buffer.file");
+		int    localFileBufferFlushIntervalSeconds    = MiscUtil.parseInteger(hdfsProps.get("local.buffer.flush.interval.seconds"), 1 * 60);
+		int    localFileBufferFileBufferSizeBytes     = MiscUtil.parseInteger(hdfsProps.get("local.buffer.file.buffer.size.bytes"), 8 * 1024);
+		int    localFileBufferRolloverIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("local.buffer.rollover.interval.seconds"), 10 * 60);
+		String localFileBufferArchiveDirectory        = hdfsProps.get("local.archive.directory");
+		int    localFileBufferArchiveFileCount        = MiscUtil.parseInteger(hdfsProps.get("local.archive.max.file.count"), 10);
+
+		DebugTracer tracer = new Log4jTracer(LOG);
+
+		HdfsLogDestination<AuditEventBase> mHdfsDestination = new HdfsLogDestination<AuditEventBase>(tracer);
+
+		mHdfsDestination.setDirectory(hdfsDestinationDirectory);
+		mHdfsDestination.setFile(hdfsDestinationFile);
+		mHdfsDestination.setFlushIntervalSeconds(hdfsDestinationFlushIntervalSeconds);
+		mHdfsDestination.setEncoding(encoding);
+		mHdfsDestination.setRolloverIntervalSeconds(hdfsDestinationRolloverIntervalSeconds);
+		mHdfsDestination.setOpenRetryIntervalSeconds(hdfsDestinationOpenRetryIntervalSeconds);
+
+		LocalFileLogBuffer<AuditEventBase> mLocalFileBuffer = new LocalFileLogBuffer<AuditEventBase>(tracer);
+
+		mLocalFileBuffer.setDirectory(localFileBufferDirectory);
+		mLocalFileBuffer.setFile(localFileBufferFile);
+		mLocalFileBuffer.setFlushIntervalSeconds(localFileBufferFlushIntervalSeconds);
+		mLocalFileBuffer.setFileBufferSizeBytes(localFileBufferFileBufferSizeBytes);
+		mLocalFileBuffer.setEncoding(encoding);
+		mLocalFileBuffer.setRolloverIntervalSeconds(localFileBufferRolloverIntervalSeconds);
+		mLocalFileBuffer.setArchiveDirectory(localFileBufferArchiveDirectory);
+		mLocalFileBuffer.setArchiveFileCount(localFileBufferArchiveFileCount);
+		
+		setBufferAndDestination(mLocalFileBuffer, mHdfsDestination);
+	}
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
new file mode 100644
index 0000000..6b5cb4b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ranger.audit.provider.hdfs;
+
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ranger.audit.provider.DebugTracer;
+import org.apache.ranger.audit.provider.LogDestination;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class HdfsLogDestination<T> implements LogDestination<T> {
+	public final static String EXCP_MSG_FILESYSTEM_CLOSED = "Filesystem closed";
+
+	private String  mDirectory                = null;
+	private String  mFile                     = null;
+	private int     mFlushIntervalSeconds     = 1 * 60;
+	private String  mEncoding                 = null;
+	private boolean mIsAppend                 = false;
+	private int     mRolloverIntervalSeconds  = 24 * 60 * 60;
+	private int     mOpenRetryIntervalSeconds = 60;
+	private DebugTracer mLogger               = null;
+
+	private FSDataOutputStream mFsDataOutStream    = null;
+	private OutputStreamWriter mWriter             = null; 
+	private String             mHdfsFilename       = null;
+	private long               mNextRolloverTime   = 0;
+	private long               mNextFlushTime      = 0;
+	private long               mLastOpenFailedTime = 0;
+	private boolean            mIsStopInProgress   = false;
+
+	public HdfsLogDestination(DebugTracer tracer) {
+		mLogger = tracer;
+	}
+
+	public String getDirectory() {
+		return mDirectory;
+	}
+
+	public void setDirectory(String directory) {
+		this.mDirectory = directory;
+	}
+
+	public String getFile() {
+		return mFile;
+	}
+
+	public void setFile(String file) {
+		this.mFile = file;
+	}
+
+	public int getFlushIntervalSeconds() {
+		return mFlushIntervalSeconds;
+	}
+
+	public void setFlushIntervalSeconds(int flushIntervalSeconds) {
+		mFlushIntervalSeconds = flushIntervalSeconds;
+	}
+
+	public String getEncoding() {
+		return mEncoding;
+	}
+
+	public void setEncoding(String encoding) {
+		mEncoding = encoding;
+	}
+
+	public int getRolloverIntervalSeconds() {
+		return mRolloverIntervalSeconds;
+	}
+
+	public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) {
+		this.mRolloverIntervalSeconds = rolloverIntervalSeconds;
+	}
+
+	public int getOpenRetryIntervalSeconds() {
+		return mOpenRetryIntervalSeconds;
+	}
+
+	public void setOpenRetryIntervalSeconds(int minIntervalOpenRetrySeconds) {
+		this.mOpenRetryIntervalSeconds = minIntervalOpenRetrySeconds;
+	}
+
+	@Override
+	public void start() {
+		mLogger.debug("==> HdfsLogDestination.start()");
+
+		openFile();
+
+		mLogger.debug("<== HdfsLogDestination.start()");
+	}
+
+	@Override
+	public void stop() {
+		mLogger.debug("==> HdfsLogDestination.stop()");
+
+		mIsStopInProgress = true;
+
+		closeFile();
+
+		mIsStopInProgress = false;
+
+		mLogger.debug("<== HdfsLogDestination.stop()");
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return mWriter != null;
+	}
+
+	@Override
+	public boolean send(T log) {
+		boolean ret = false;
+		
+		if(log != null) {
+			String msg = log.toString();
+
+			ret = sendStringified(msg);
+		}
+
+		return ret;
+	}
+
+	@Override
+	public boolean sendStringified(String log) {
+		boolean ret = false;
+
+		checkFileStatus();
+
+		OutputStreamWriter writer = mWriter;
+
+		if(writer != null) {
+			try {
+				writer.write(log + MiscUtil.LINE_SEPARATOR);
+
+				ret = true;
+			} catch (IOException excp) {
+				mLogger.warn("HdfsLogDestination.sendStringified(): write failed", excp);
+
+				closeFile();
+			}
+		}
+
+		return ret;
+	}
+
+	@Override
+	public boolean flush() {
+		mLogger.debug("==> HdfsLogDestination.flush()");
+
+		boolean ret = false;
+
+		OutputStreamWriter writer  = mWriter;
+
+		if(writer != null) {
+			try {
+				writer.flush();
+				
+				ret = true;
+			} catch (IOException excp) {
+				logException("HdfsLogDestination: flush() failed", excp);
+			}
+		}
+
+		FSDataOutputStream ostream = mFsDataOutStream;
+
+		if(ostream != null) {
+			try {
+				ostream.hflush();
+
+				ret = true;
+			} catch (IOException excp) {
+				logException("HdfsLogDestination: hflush() failed", excp);
+			}
+		}
+
+		if(ret) {
+			mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
+		}
+
+		mLogger.debug("<== HdfsLogDestination.flush()");
+
+		return ret;
+	}
+
+	private void openFile() {
+		mLogger.debug("==> HdfsLogDestination.openFile()");
+
+		closeFile();
+
+		mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+		long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L));
+
+		mHdfsFilename = MiscUtil.replaceTokens(mDirectory + org.apache.hadoop.fs.Path.SEPARATOR + mFile, startTime);
+
+		FSDataOutputStream ostream     = null;
+		FileSystem         fileSystem  = null;
+		Path               pathLogfile = null;
+		Configuration      conf        = null;
+		boolean            bOverwrite  = false;
+
+		try {
+			mLogger.debug("HdfsLogDestination.openFile(): opening file " + mHdfsFilename);
+
+			URI uri = URI.create(mHdfsFilename);
+
+			// TODO: mechanism to XA-HDFS plugin to disable auditing of access checks to the current HDFS file
+
+			conf        = new Configuration();
+			pathLogfile = new Path(mHdfsFilename);
+			fileSystem  = FileSystem.get(uri, conf);
+
+			try {
+				if(fileSystem.exists(pathLogfile)) { // file already exists. either append to the file or write to a new file
+					if(mIsAppend) {
+						mLogger.info("HdfsLogDestination.openFile(): opening file for append " + mHdfsFilename);
+
+						ostream = fileSystem.append(pathLogfile);
+					} else {
+						mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem);
+						pathLogfile   = new Path(mHdfsFilename);
+					}
+				}
+
+				// if file does not exist or if mIsAppend==false, create the file
+				if(ostream == null) {
+					mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename);
+
+					createParents(pathLogfile, fileSystem);
+					ostream = fileSystem.create(pathLogfile, bOverwrite);
+				}
+			} catch(IOException excp) {
+				// append may not be supported by the filesystem; or the file might already be open by another application. Try a different filename
+				String failedFilename = mHdfsFilename;
+
+				mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem);
+				pathLogfile   = new Path(mHdfsFilename);
+
+				mLogger.info("HdfsLogDestination.openFile(): failed in opening file " + failedFilename + ". Will try opening " + mHdfsFilename);
+			}
+
+			if(ostream == null){
+				mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename);
+
+				createParents(pathLogfile, fileSystem);
+				ostream = fileSystem.create(pathLogfile, bOverwrite);
+			}
+		} catch(Throwable ex) {
+			mLogger.warn("HdfsLogDestination.openFile() failed", ex);
+		} finally {
+			// TODO: unset the property set above to exclude auditing of logfile opening
+			//        System.setProperty(hdfsCurrentFilenameProperty, null);
+		}
+
+		mWriter = createWriter(ostream);
+
+		if(mWriter != null) {
+			mLogger.debug("HdfsLogDestination.openFile(): opened file " + mHdfsFilename);
+
+			mFsDataOutStream    = ostream;
+			mNextFlushTime      = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L);
+			mLastOpenFailedTime = 0;
+		} else {
+			mLogger.warn("HdfsLogDestination.openFile(): failed to open file for write " + mHdfsFilename);
+
+			mHdfsFilename = null;
+			mLastOpenFailedTime = System.currentTimeMillis();
+		}
+
+		mLogger.debug("<== HdfsLogDestination.openFile(" + mHdfsFilename + ")");
+	}
+
+	private void closeFile() {
+		mLogger.debug("==> HdfsLogDestination.closeFile()");
+		
+		flush();
+
+		OutputStreamWriter writer = mWriter;
+
+		mWriter          = null;
+		mFsDataOutStream = null;
+
+		if(writer != null) {
+			try {
+				mLogger.info("HdfsLogDestination.closeFile(): closing file " + mHdfsFilename);
+
+				writer.close();
+			} catch(IOException excp) {
+				logException("HdfsLogDestination: failed to close file " + mHdfsFilename, excp);
+			}
+		}
+
+		mLogger.debug("<== HdfsLogDestination.closeFile()");
+	}
+
+	private void rollover() {
+		mLogger.debug("==> HdfsLogDestination.rollover()");
+
+		closeFile();
+
+		openFile();
+
+		mLogger.debug("<== HdfsLogDestination.rollover()");
+	}
+
+	private void checkFileStatus() {
+		long now = System.currentTimeMillis();
+
+		if(mWriter == null) {
+			if(now > (mLastOpenFailedTime + (mOpenRetryIntervalSeconds * 1000L))) {
+				openFile();
+			}
+		} else  if(now > mNextRolloverTime) {
+			rollover();
+		} else if(now > mNextFlushTime) {
+			flush();
+		}
+	}
+
+	private OutputStreamWriter createWriter(OutputStream os ) {
+	    OutputStreamWriter writer = null;
+
+	    if(os != null) {
+			if(mEncoding != null) {
+				try {
+					writer = new OutputStreamWriter(os, mEncoding);
+				} catch(UnsupportedEncodingException excp) {
+					mLogger.warn("HdfsLogDestination.createWriter(): failed to create output writer.", excp);
+				}
+			}
+	
+			if(writer == null) {
+				writer = new OutputStreamWriter(os);
+			}
+	    }
+
+	    return writer;
+	}
+	
+	private void createParents(Path pathLogfile, FileSystem fileSystem) {
+		try {
+			Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;
+
+			if(parentPath != null && fileSystem != null && !fileSystem.exists(parentPath)) {
+				 fileSystem.mkdirs(parentPath);
+			}
+		} catch (IOException e) {
+			logException("HdfsLogDestination.createParents() failed", e);
+		} catch (Throwable e) {
+			mLogger.warn("HdfsLogDestination.createParents() failed", e);
+		}
+	}
+
+    private String getNewFilename(String fileName, FileSystem fileSystem) {
+    	if(fileName == null) {
+    		return "";
+    	}
+
+        for(int i = 1; ; i++) {
+        	String ret = fileName;
+
+	        String strToAppend = "-" + Integer.toString(i);
+	
+	        int extnPos = ret.lastIndexOf(".");
+	
+	        if(extnPos < 0) {
+	            ret += strToAppend;
+	        } else {
+	            String extn = ret.substring(extnPos);
+	
+	            ret = ret.substring(0, extnPos) + strToAppend + extn;
+	        }
+	        
+	        if(fileSystem != null && fileExists(ret, fileSystem)) {
+        		continue;
+	        } else {
+	        	return ret;
+	        }
+    	}
+    }
+    
+    private boolean fileExists(String fileName, FileSystem fileSystem) {
+    	boolean ret = false;
+
+    	if(fileName != null && fileSystem != null) {
+    		Path path = new Path(fileName);
+
+    		try {
+    			ret = fileSystem.exists(path);
+    		} catch(IOException excp) {
+    			// ignore
+    		}
+    	}
+ 
+    	return ret;
+    }
+
+    private void logException(String msg, IOException excp) {
+		// during shutdown, the underlying FileSystem might already be closed; so don't print error details
+
+		if(mIsStopInProgress) {
+			return;
+		}
+
+		String  excpMsgToExclude   = EXCP_MSG_FILESYSTEM_CLOSED;;
+		String  excpMsg            = excp != null ? excp.getMessage() : null;
+		boolean excpExcludeLogging = (excpMsg != null && excpMsg.contains(excpMsgToExclude));
+		
+		if(! excpExcludeLogging) {
+			mLogger.warn(msg, excp);
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+
+		sb.append("HdfsLogDestination {");
+		sb.append("Directory=").append(mDirectory).append("; ");
+		sb.append("File=").append(mFile).append("; ");
+		sb.append("RolloverIntervalSeconds=").append(mRolloverIntervalSeconds);
+		sb.append("}");
+		
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
new file mode 100644
index 0000000..34b8e4b
--- /dev/null
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.ranger.audit.test;
+import org.apache.commons.logging.Log;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.HBaseAuditEvent;
+import org.apache.ranger.audit.model.HdfsAuditEvent;
+import org.apache.ranger.audit.model.HiveAuditEvent;
+import org.apache.ranger.audit.model.KnoxAuditEvent;
+import org.apache.ranger.audit.model.StormAuditEvent;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditProviderFactory;
+import org.apache.ranger.audit.provider.AuditProviderFactory.ApplicationType;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Date;
+import java.util.Properties;
+
+public class TestEvents {
+
+	private static final Log LOG = LogFactory.getLog(TestEvents.class);
+
+    public static void main(String[] args) {
+    	DOMConfigurator.configure("log4j.xml");
+
+        LOG.info("==> TestEvents.main()");
+        
+        try {
+        	Properties auditProperties = new Properties();
+        	
+        	String AUDIT_PROPERTIES_FILE = "xasecure-audit.properties";
+        	
+        	File propFile = new File(AUDIT_PROPERTIES_FILE);
+        	
+        	if(propFile.exists()) {
+            	LOG.info("Loading Audit properties file" + AUDIT_PROPERTIES_FILE);
+
+            	auditProperties.load(new FileInputStream(propFile));
+        	} else {
+            	LOG.info("Audit properties file missing: " + AUDIT_PROPERTIES_FILE);
+
+            	auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.url", "jdbc:mysql://localhost:3306/xa_db");
+	        	auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.user", "xaaudit");
+	        	auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.password", "xaaudit");
+	        	auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.driver", "com.mysql.jdbc.Driver");
+	
+	        	auditProperties.setProperty("xasecure.audit.is.enabled", "true");
+	        	auditProperties.setProperty("xasecure.audit.log4j.is.enabled", "false");
+	        	auditProperties.setProperty("xasecure.audit.log4j.is.async", "false");
+	        	auditProperties.setProperty("xasecure.audit.log4j.async.max.queue.size", "100000");
+	        	auditProperties.setProperty("xasecure.audit.log4j.async.max.flush.interval.ms", "30000");
+	        	auditProperties.setProperty("xasecure.audit.db.is.enabled", "true");
+	        	auditProperties.setProperty("xasecure.audit.db.is.async", "true");
+	        	auditProperties.setProperty("xasecure.audit.db.async.max.queue.size", "100000");
+	        	auditProperties.setProperty("xasecure.audit.db.async.max.flush.interval.ms", "30000");
+	        	auditProperties.setProperty("xasecure.audit.db.batch.size", "100");
+        	}
+        	
+        	AuditProviderFactory.getInstance().init(auditProperties, ApplicationType.Hdfs);
+
+        	AuditProvider provider = AuditProviderFactory.getAuditProvider();
+
+        	LOG.info("provider=" + provider.toString());
+
+        	String strEventCount          = args.length > 0 ? args[0] : auditProperties.getProperty("xasecure.audit.test.event.count");
+        	String strEventPauseTimeInMs  = args.length > 1 ? args[1] : auditProperties.getProperty("xasecure.audit.test.event.pause.time.ms");
+        	String strSleepTimeBeforeExit = args.length > 2 ? args[2] : auditProperties.getProperty("xasecure.audit.test.sleep.time.before.exit.seconds");
+
+        	int eventCount          = (strEventCount == null) ? 1024 : Integer.parseInt(strEventCount);
+        	int eventPauseTime      = (strEventPauseTimeInMs == null) ? 0 : Integer.parseInt(strEventPauseTimeInMs);
+        	int sleepTimeBeforeExit = ((strSleepTimeBeforeExit == null) ? 0 : Integer.parseInt(strSleepTimeBeforeExit)) * 1000;
+
+        	for(int i = 0; i < eventCount; i++) {
+        		AuditEventBase event = getTestEvent(i);
+
+	            LOG.info("==> TestEvents.main(" + (i+1) + "): adding " + event.getClass().getName());
+        		provider.log(event);
+
+        		if(eventPauseTime > 0) {
+        			Thread.sleep(eventPauseTime);
+        		}
+        	}
+
+            provider.waitToComplete();
+            
+            // incase of HdfsAuditProvider, logs are saved to local file system which gets sent to HDFS asynchronusly in a separate thread.
+            // So, at this point it is possible that few local log files haven't made to HDFS.
+            if(sleepTimeBeforeExit > 0) {
+            	LOG.info("waiting for " + sleepTimeBeforeExit + "ms before exiting..");
+
+            	try {
+	                Thread.sleep(sleepTimeBeforeExit);
+            	} catch(Exception excp) {
+                	LOG.info("error while waiting before exiting..");
+            	}
+            }
+
+            provider.stop();
+        } catch(Exception excp) {
+            LOG.info(excp.getLocalizedMessage());
+        	excp.printStackTrace();
+        }
+
+        LOG.info("<== TestEvents.main()");
+    }
+    
+    private static AuditEventBase getTestEvent(int idx) {
+    	AuditEventBase event = null;
+ 
+		switch(idx % 5) {
+			case 0:
+				event = new HdfsAuditEvent();
+			break;
+			case 1:
+				event = new HBaseAuditEvent();
+			break;
+			case 2:
+				event = new HiveAuditEvent();
+			break;
+			case 3:
+				event = new KnoxAuditEvent();
+			break;
+			case 4:
+				event = new StormAuditEvent();
+			break;
+		}
+		event.setEventTime(new Date());
+		event.setResultReason(Integer.toString(idx));
+
+		return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/agents-audit/src/main/resources/META-INF/persistence.xml b/agents-audit/src/main/resources/META-INF/persistence.xml
index 0b87ab9..21b8f06 100644
--- a/agents-audit/src/main/resources/META-INF/persistence.xml
+++ b/agents-audit/src/main/resources/META-INF/persistence.xml
@@ -17,12 +17,12 @@
 -->
 <persistence version="2.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">
 	<persistence-unit name="xa_server">
-		<class>com.xasecure.audit.entity.XXBaseAuditEvent</class>
-		<class>com.xasecure.audit.entity.XXHBaseAuditEvent</class>
-		<class>com.xasecure.audit.entity.XXHdfsAuditEvent</class>
-		<class>com.xasecure.audit.entity.XXHiveAuditEvent</class>
-		<class>com.xasecure.audit.entity.XXKnoxAuditEvent</class>
-		<class>com.xasecure.audit.entity.XXStormAuditEvent</class>
+		<class>org.apache.ranger.audit.entity.XXBaseAuditEvent</class>
+		<class>org.apache.ranger.audit.entity.XXHBaseAuditEvent</class>
+		<class>org.apache.ranger.audit.entity.XXHdfsAuditEvent</class>
+		<class>org.apache.ranger.audit.entity.XXHiveAuditEvent</class>
+		<class>org.apache.ranger.audit.entity.XXKnoxAuditEvent</class>
+		<class>org.apache.ranger.audit.entity.XXStormAuditEvent</class>
 
 		<properties>
 			<property name="eclipselink.logging.level" value="SEVERE"/>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/agents-common/conf/log4j.properties b/agents-common/conf/log4j.properties
index ca599f2..dd22c6d 100644
--- a/agents-common/conf/log4j.properties
+++ b/agents-common/conf/log4j.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.logger.xaaudit.com.xasecure.audit.provider.Log4jAuditProvider=INFO, hdfsAppender
+log4j.logger.xaaudit.org.apache.ranger.audit.provider.Log4jAuditProvider=INFO, hdfsAppender
 
 
 log4j.appender.hdfsAppender=org.apache.log4j.HdfsRollingFileAppender

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/scripts/enable-agent.sh
----------------------------------------------------------------------
diff --git a/agents-common/scripts/enable-agent.sh b/agents-common/scripts/enable-agent.sh
index 8024b74..f8d90ad 100755
--- a/agents-common/scripts/enable-agent.sh
+++ b/agents-common/scripts/enable-agent.sh
@@ -185,7 +185,7 @@ create_jceks() {
 
 	tempFile=/tmp/jce.$$.out
 
-    $JAVA_HOME/bin/java -cp ":${PROJ_INSTALL_LIB_DIR}/*:" com.hortonworks.credentialapi.buildks create "${alias}" -value "${pass}" -provider "jceks://file${jceksFile}" > ${tempFile} 2>&1
+    $JAVA_HOME/bin/java -cp ":${PROJ_INSTALL_LIB_DIR}/*:" org.apache.ranger.credentialapi.buildks create "${alias}" -value "${pass}" -provider "jceks://file${jceksFile}" > ${tempFile} 2>&1
 
 	if [ $? -ne 0 ]
 	then
@@ -359,7 +359,7 @@ then
             cp ${fullpathorgfn} ${archivefn}
 			if [ $? -eq 0 ]
 			then
-				${JAVA} -cp "${INSTALL_CP}" com.xasecure.utils.install.XmlConfigChanger -i ${archivefn} -o ${newfn} -c ${f} -p  ${INSTALL_ARGS}
+				${JAVA} -cp "${INSTALL_CP}" org.apache.ranger.utils.install.XmlConfigChanger -i ${archivefn} -o ${newfn} -c ${f} -p  ${INSTALL_ARGS}
 				if [ $? -eq 0 ]
                 then
                 	diff -w ${newfn} ${fullpathorgfn} > /dev/null 2>&1
@@ -544,13 +544,13 @@ then
 			}
 			{
     			if ($1 == "nimbus.authorizer") {
-        			if ($2 ~ /^[ \t]*"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer"[ \t]*$/) {
+        			if ($2 ~ /^[ \t]*"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer"[ \t]*$/) {
             			configured = 1 ;
             			printf("%s\n",$0) ;
         			}
         			else {
             			printf("#%s\n",$0);
-            			printf("nimbus.authorizer: \"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer\"\n") ;
+            			printf("nimbus.authorizer: \"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer\"\n") ;
             			configured = 1 ;
         			}
     			}
@@ -560,7 +560,7 @@ then
 			}
 			END {
     			if (configured == 0) {
-        			printf("nimbus.authorizer: \"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer\"\n") ;
+        			printf("nimbus.authorizer: \"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer\"\n") ;
     			}
 			}' ${CFG_FILE} > ${CFG_FILE}.new &&  cat ${CFG_FILE}.new > ${CFG_FILE} && rm -f ${CFG_FILE}.new
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java b/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java
deleted file mode 100644
index 74d096a..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package com.xasecure.admin.client;
-
-
-import com.xasecure.admin.client.datatype.GrantRevokeData;
-
-
-public interface XaAdminClient {
-	String getPolicies(String repositoryName, long lastModifiedTime, int policyCount, String agentName);
-
-	void grantPrivilege(GrantRevokeData grData) throws Exception;
-
-	void revokePrivilege(GrantRevokeData grData) throws Exception;
-}