You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2014/12/12 02:30:29 UTC

[46/51] [partial] incubator-ranger git commit: RANGER-194: Rename packages from xasecure to apache ranger

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java b/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java
deleted file mode 100644
index c64b02d..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package com.xasecure.admin.client;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HTTPSProperties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import com.xasecure.admin.client.datatype.GrantRevokeData;
-import com.xasecure.admin.client.datatype.RESTResponse;
-import com.xasecure.authorization.utils.StringUtil;
-import com.xasecure.authorization.hadoop.config.XaSecureConfiguration;
-import com.xasecure.authorization.hadoop.utils.XaSecureCredentialProvider;
-
-
-public class XaAdminRESTClient implements XaAdminClient {
-	private static final Log LOG = LogFactory.getLog(XaAdminRESTClient.class);
-
-	public static final String XASECURE_PROP_POLICYMGR_URL                         = "xasecure.policymgr.url";
-	public static final String XASECURE_PROP_POLICYMGR_SSLCONFIG_FILENAME          = "xasecure.policymgr.sslconfig.filename";
-
-	public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE                  = "xasecure.policymgr.clientssl.keystore";	
-	public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_PASSWORD         = "xasecure.policymgr.clientssl.keystore.password";	
-	public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE             = "xasecure.policymgr.clientssl.keystore.type";
-	public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL       = "xasecure.policymgr.clientssl.keystore.credential.file";
-	public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL_ALIAS = "sslKeyStore";
-	public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE_DEFAULT     = "jks";	
-
-	public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE                  = "xasecure.policymgr.clientssl.truststore";	
-	public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_PASSWORD         = "xasecure.policymgr.clientssl.truststore.password";	
-	public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE             = "xasecure.policymgr.clientssl.truststore.type";	
-	public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL       = "xasecure.policymgr.clientssl.truststore.credential.file";
-	public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL_ALIAS = "sslTrustStore";
-	public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE_DEFAULT     = "jks";	
-
-	public static final String XASECURE_SSL_KEYMANAGER_ALGO_TYPE						  = "SunX509" ;
-	public static final String XASECURE_SSL_TRUSTMANAGER_ALGO_TYPE						  = "SunX509" ;
-	public static final String XASECURE_SSL_CONTEXT_ALGO_TYPE						      = "SSL" ;
-	
-	public static final String REST_EXPECTED_MIME_TYPE = "application/json" ;
-
-	private static final String REST_URL_PATH_POLICYLIST        = "/service/assets/policyList/";
-	private static final String REST_URL_PATH_GRANT             = "/service/assets/resources/grant";
-	private static final String REST_URL_PATH_REVOKE            = "/service/assets/resources/revoke";
-	private static final String REST_URL_PARAM_LASTUPDATED_TIME = "epoch";
-	private static final String REST_URL_PARAM_POLICY_COUNT     = "policyCount";
-	private static final String REST_URL_PARAM_AGENT_NAME       = "agentId";
-
-	private String  mUrl               = null;
-	private String  mSslConfigFileName = null;
-	private boolean mIsSSL             = false;
-
-	private String mKeyStoreURL     = null;
-	private String mKeyStoreAlias   = null;
-	private String mKeyStoreFile    = null;
-	private String mKeyStoreType    = null;
-	private String mTrustStoreURL   = null;
-	private String mTrustStoreAlias = null;
-	private String mTrustStoreFile  = null;
-	private String mTrustStoreType  = null;
-
-
-	public XaAdminRESTClient() {
-		mUrl               = XaSecureConfiguration.getInstance().get(XASECURE_PROP_POLICYMGR_URL);
-		mSslConfigFileName = XaSecureConfiguration.getInstance().get(XASECURE_PROP_POLICYMGR_SSLCONFIG_FILENAME);
-
-		init();
-	}
-
-	public XaAdminRESTClient(String url, String sslConfigFileName) {
-		mUrl               = url;
-		mSslConfigFileName = sslConfigFileName;
-
-		init();
-	}
-
-	@Override
-	public String getPolicies(String repositoryName, long lastModifiedTime, int policyCount, String agentName) {
-		String ret    = null;
-		Client client = null;
-
-		try {
-			client = buildClient();
-
-			WebResource webResource = client.resource(mUrl + REST_URL_PATH_POLICYLIST + repositoryName)
-						.queryParam(REST_URL_PARAM_LASTUPDATED_TIME, String.valueOf(lastModifiedTime))
-						.queryParam(REST_URL_PARAM_POLICY_COUNT, String.valueOf(policyCount))
-						.queryParam(REST_URL_PARAM_AGENT_NAME, agentName);
-
-			ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).get(ClientResponse.class);
-
-			if(response != null && response.getStatus() == 200) {
-				ret = response.getEntity(String.class);
-			}
-		} finally {
-			destroy(client);
-		}
-
-		return ret;
-	}
-
-	@Override
-	public void grantPrivilege(GrantRevokeData grData) throws Exception {
-		Client client = null;
-
-		try {
-			client = buildClient();
-
-			WebResource webResource = client.resource(mUrl + REST_URL_PATH_GRANT);
-
-			ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).type(REST_EXPECTED_MIME_TYPE).post(ClientResponse.class, grData.toJson());
-
-			if(response == null || response.getStatus() != 200) {
-				RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-				throw new Exception(resp.getMessage());
-			}
-		} finally {
-			destroy(client);
-		}
-	}
-
-	@Override
-	public void revokePrivilege(GrantRevokeData grData) throws Exception {
-		Client client = null;
-		
-		try {
-			client = buildClient();
-
-			WebResource webResource = client.resource(mUrl + REST_URL_PATH_REVOKE);
-
-			ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).type(REST_EXPECTED_MIME_TYPE).post(ClientResponse.class, grData.toJson());
-
-			if(response == null || response.getStatus() != 200) {
-				RESTResponse resp = RESTResponse.fromClientResponse(response);
-
-				throw new Exception(resp.getMessage());
-			}
-		} finally {
-			destroy(client);
-		}
-	}
-	private void init() {
-		mIsSSL = StringUtil.containsIgnoreCase(mUrl, "https");
-
-		InputStream in =  null ;
-
-		try {
-			Configuration conf = new Configuration() ;
-
-			in = getFileInputStream(mSslConfigFileName) ;
-
-			if (in != null) {
-				conf.addResource(in);
-			}
-
-			mKeyStoreURL   = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL);
-			mKeyStoreAlias = XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL_ALIAS;
-			mKeyStoreType  = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE, XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE_DEFAULT);
-			mKeyStoreFile  = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE);
-
-			mTrustStoreURL   = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL);
-			mTrustStoreAlias = XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL_ALIAS;
-			mTrustStoreType  = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE, XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE_DEFAULT);
-			mTrustStoreFile  = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE);
-		}
-		catch(IOException ioe) {
-			LOG.error("Unable to load SSL Config FileName: [" + mSslConfigFileName + "]", ioe);
-		}
-		finally {
-			close(in, mSslConfigFileName);
-		}
-	}
-
-	private synchronized Client buildClient() {
-		Client client = null;
-
-		if (mIsSSL) {
-			KeyManager[]   kmList     = getKeyManagers();
-			TrustManager[] tmList     = getTrustManagers();
-			SSLContext     sslContext = getSSLContext(kmList, tmList);
-			ClientConfig   config     = new DefaultClientConfig();
-
-			HostnameVerifier hv = new HostnameVerifier() {
-				public boolean verify(String urlHostName, SSLSession session) {
-					return session.getPeerHost().equals(urlHostName);
-				}
-			};
-
-			config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, new HTTPSProperties(hv, sslContext));
-
-			client = Client.create(config);
-		}
-
-		if(client == null) {
-			client = Client.create();
-		}
-
-		return client;
-	}
-
-	private KeyManager[] getKeyManagers() {
-		KeyManager[] kmList = null;
-
-		String keyStoreFilepwd = getCredential(mKeyStoreURL, mKeyStoreAlias);
-
-		if (!StringUtil.isEmpty(mKeyStoreFile) && !StringUtil.isEmpty(keyStoreFilepwd)) {
-			InputStream in =  null ;
-
-			try {
-				in = getFileInputStream(mKeyStoreFile) ;
-
-				if (in != null) {
-					KeyStore keyStore = KeyStore.getInstance(mKeyStoreType);
-
-					keyStore.load(in, keyStoreFilepwd.toCharArray());
-
-					KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(XASECURE_SSL_KEYMANAGER_ALGO_TYPE);
-
-					keyManagerFactory.init(keyStore, keyStoreFilepwd.toCharArray());
-
-					kmList = keyManagerFactory.getKeyManagers();
-				} else {
-					LOG.error("Unable to obtain keystore from file [" + mKeyStoreFile + "]");
-				}
-			} catch (KeyStoreException e) {
-				LOG.error("Unable to obtain from KeyStore", e);
-			} catch (NoSuchAlgorithmException e) {
-				LOG.error("SSL algorithm is available in the environment", e);
-			} catch (CertificateException e) {
-				LOG.error("Unable to obtain the requested certification ", e);
-			} catch (FileNotFoundException e) {
-				LOG.error("Unable to find the necessary SSL Keystore and TrustStore Files", e);
-			} catch (IOException e) {
-				LOG.error("Unable to read the necessary SSL Keystore and TrustStore Files", e);
-			} catch (UnrecoverableKeyException e) {
-				LOG.error("Unable to recover the key from keystore", e);
-			} finally {
-				close(in, mKeyStoreFile);
-			}
-		}
-
-		return kmList;
-	}
-
-	private TrustManager[] getTrustManagers() {
-		TrustManager[] tmList = null;
-
-		String trustStoreFilepwd = getCredential(mTrustStoreURL, mTrustStoreAlias);
-
-		if (!StringUtil.isEmpty(mTrustStoreFile) && !StringUtil.isEmpty(trustStoreFilepwd)) {
-			InputStream in =  null ;
-
-			try {
-				in = getFileInputStream(mTrustStoreFile) ;
-
-				if (in != null) {
-					KeyStore trustStore = KeyStore.getInstance(mTrustStoreType);
-
-					trustStore.load(in, trustStoreFilepwd.toCharArray());
-
-					TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(XASECURE_SSL_TRUSTMANAGER_ALGO_TYPE);
-
-					trustManagerFactory.init(trustStore);
-
-					tmList = trustManagerFactory.getTrustManagers();
-				} else {
-					LOG.error("Unable to obtain keystore from file [" + mTrustStoreFile + "]");
-				}
-			} catch (KeyStoreException e) {
-				LOG.error("Unable to obtain from KeyStore", e);
-			} catch (NoSuchAlgorithmException e) {
-				LOG.error("SSL algorithm is available in the environment", e);
-			} catch (CertificateException e) {
-				LOG.error("Unable to obtain the requested certification ", e);
-			} catch (FileNotFoundException e) {
-				LOG.error("Unable to find the necessary SSL Keystore and TrustStore Files", e);
-			} catch (IOException e) {
-				LOG.error("Unable to read the necessary SSL Keystore and TrustStore Files", e);
-			} finally {
-				close(in, mTrustStoreFile);
-			}
-		}
-		
-		return tmList;
-	}
-	
-	private SSLContext getSSLContext(KeyManager[] kmList, TrustManager[] tmList) {
-		try {
-			if(kmList != null && tmList != null) {
-				SSLContext sslContext = SSLContext.getInstance(XASECURE_SSL_CONTEXT_ALGO_TYPE);
-	
-				sslContext.init(kmList, tmList, new SecureRandom());
-				
-				return sslContext;
-			}
-		} catch (NoSuchAlgorithmException e) {
-			LOG.error("SSL algorithm is available in the environment", e);
-		} catch (KeyManagementException e) {
-			LOG.error("Unable to initials the SSLContext", e);
-		}
-		
-		return null;
-	}
-
-	private String getCredential(String url, String alias) {
-		char[] credStr = XaSecureCredentialProvider.getInstance().getCredentialString(url, alias);
-
-		return credStr == null ? null : new String(credStr);
-	}
-
-	private InputStream getFileInputStream(String fileName)  throws IOException {
-		InputStream in = null ;
-
-		if(! StringUtil.isEmpty(fileName)) {
-			File f = new File(fileName) ;
-
-			if (f.exists()) {
-				in = new FileInputStream(f) ;
-			}
-			else {
-				in = ClassLoader.getSystemResourceAsStream(fileName) ;
-			}
-		}
-
-		return in ;
-	}
-
-	private void close(InputStream str, String filename) {
-		if (str != null) {
-			try {
-				str.close() ;
-			} catch (IOException excp) {
-				LOG.error("Error while closing file: [" + filename + "]", excp) ;
-			}
-		}
-	}
-
-	private void destroy(Client client) {
-		if(client != null) {
-			client.destroy();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java b/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java
deleted file mode 100644
index b5228db..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package com.xasecure.admin.client.datatype;
-
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.xasecure.authorization.utils.StringUtil;
-
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class GrantRevokeData implements java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private String        grantor;
-	private String        repositoryName;
-	private String        repositoryType;
-	private String        databases;
-	private String        tables;
-	private String        columns;
-	private String        columnFamilies;
-	private boolean       isEnabled;
-	private boolean       isAuditEnabled;
-	private boolean       replacePerm;
-	private List<PermMap> permMapList = new ArrayList<PermMap>();
-	
-	private static String WILDCARD_ASTERISK = "*";
-
-
-	public GrantRevokeData() {
-	}
-
-	public String getGrantor() {
-		return grantor;
-	}
-
-	public void setGrantor(String grantor) {
-		this.grantor = grantor;
-	}
-
-	public String getRepositoryName() {
-		return repositoryName;
-	}
-
-	public void setRepositoryName(String repositoryName) {
-		this.repositoryName = repositoryName;
-	}
-
-	public String getRepositoryType() {
-		return repositoryType;
-	}
-
-	public void setRepositoryType(String repositoryType) {
-		this.repositoryType = repositoryType;
-	}
-
-	public String getDatabases() {
-		return databases;
-	}
-
-	public void setDatabases(String databases) {
-		this.databases = databases;
-	}
-
-	public String getTables() {
-		return tables;
-	}
-
-	public void setTables(String tables) {
-		this.tables = tables;
-	}
-
-	public String getColumns() {
-		return columns;
-	}
-
-	public void setColumns(String columns) {
-		this.columns = columns;
-	}
-
-	public String getColumnFamilies() {
-		return columnFamilies;
-	}
-
-	public void setColumnFamilies(String columnFamilies) {
-		this.columnFamilies = columnFamilies;
-	}
-
-	public List<PermMap> getPermMapList() {
-		return permMapList;
-	}
-
-	public void setPermMapList(List<PermMap> permMapList) {
-		this.permMapList = permMapList;
-	}
-
-
-	public void setHiveData(String  grantor,
-							String  repositoryName,
-							String  databases,
-							String  tables,
-							String  columns,
-							PermMap permMap) {
-		this.grantor         = grantor;
-		this.repositoryName = repositoryName;
-		this.repositoryType = "hive";
-		this.databases      = StringUtil.isEmpty(databases) ? WILDCARD_ASTERISK : databases;
-		this.tables         = StringUtil.isEmpty(tables)    ? WILDCARD_ASTERISK : tables;
-		this.columns        = StringUtil.isEmpty(columns)   ? WILDCARD_ASTERISK : columns;
-		this.isAuditEnabled = true;
-		this.isEnabled      = true;
-		this.replacePerm    = false;
-		this.permMapList.add(permMap);
-	}
-
-	public void setHBaseData(String  grantor,
-							 String  repositoryName,
-							 String  tables,
-							 String  columns,
-							 String  columnFamilies,
-							 PermMap permMap) {
-		this.grantor         = grantor;
-		this.repositoryName = repositoryName;
-		this.repositoryType = "hbase";
-		this.tables         = StringUtil.isEmpty(tables)         ? WILDCARD_ASTERISK : tables;
-		this.columns        = StringUtil.isEmpty(columns)        ? WILDCARD_ASTERISK : columns;
-		this.columnFamilies = StringUtil.isEmpty(columnFamilies) ? WILDCARD_ASTERISK : columnFamilies;
-		this.isAuditEnabled = true;
-		this.isEnabled      = true;
-		this.replacePerm    = true;
-		this.permMapList.add(permMap);
-	}
-	
-	public String toJson() {
-		try {
-			ObjectMapper om = new ObjectMapper();
-
-			return om.writeValueAsString(this);
-		} catch (JsonGenerationException e) {
-			e.printStackTrace();
-		} catch (JsonMappingException e) {
-			e.printStackTrace();
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-		
-		return "";
-	}
-
-	@Override
-	public String toString() {
-		return toJson();
-	}
-
-
-	@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-	@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-	@JsonIgnoreProperties(ignoreUnknown = true)
-	public static class PermMap {
-		private List<String> userList  = new ArrayList<String>();
-		private List<String> groupList = new ArrayList<String>();
-		private List<String> permList  = new ArrayList<String>();
-
-		public PermMap() {
-		}
-
-		public PermMap(String user, String group, String perm) {
-			addUser(user);
-			addGroup(group);
-			addPerm(perm);
-		}
-
-		public PermMap(List<String> userList, List<String> groupList, List<String> permList) {
-			copyList(userList, this.userList);
-			copyList(groupList, this.groupList);
-			copyList(permList, this.permList);
-		}
-
-		public List<String> getUserList() {
-			return userList;
-		}
-
-		public List<String> getGroupList() {
-			return groupList;
-		}
-
-		public List<String> getPermList() {
-			return permList;
-		}
-
-		public void addUser(String user) {
-			addToList(user, userList);
-		}
-
-		public void addGroup(String group) {
-			addToList(group, groupList);
-		}
-
-		public void addPerm(String perm) {
-			addToList(perm, permList);
-		}
-
-		private void addToList(String str, List<String> list) {
-			if(list != null && !StringUtil.isEmpty(str)) {
-				list.add(str);
-			}
-		}
-
-		private void copyList(List<String> fromList, List<String> toList) {
-			if(fromList != null && toList != null) {
-				for(String str : fromList) {
-					addToList(str, toList);
-				}
-			}
-		}
-
-		public String toJson() {
-			try {
-				ObjectMapper om = new ObjectMapper();
-
-				return om.writeValueAsString(this);
-			} catch (JsonGenerationException e) {
-				e.printStackTrace();
-			} catch (JsonMappingException e) {
-				e.printStackTrace();
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
-			
-			return "";
-		}
-
-		@Override
-		public String toString() {
-			return toJson();
-		}
-	}
-	
-	public static void main(String[] args) {
-		GrantRevokeData grData = new GrantRevokeData();
-		
-		System.out.println(grData.toString());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java b/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java
deleted file mode 100644
index 389b547..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xasecure.admin.client.datatype;
-
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import com.sun.jersey.api.client.ClientResponse;
-import com.xasecure.authorization.utils.StringUtil;
-
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class RESTResponse {
-	private static Logger LOG = Logger.getLogger(RESTResponse.class);
-
-	private int           httpStatusCode;
-	private int           statusCode;
-	private String        msgDesc;
-	private List<Message> messageList;
-
-
-	public int getHttpStatusCode() {
-		return httpStatusCode;
-	}
-
-	public void setHttpStatusCode(int httpStatusCode) {
-		this.httpStatusCode = httpStatusCode;
-	}
-
-	public int getStatusCode() {
-		return statusCode;
-	}
-
-	public void setStatusCode(int statusCode) {
-		this.statusCode = statusCode;
-	}
-
-	public String getMsgDesc() {
-		return msgDesc;
-	}
-
-	public void setMsgDesc(String msgDesc) {
-		this.msgDesc = msgDesc;
-	}
-
-	public List<Message> getMessageList() {
-		return messageList;
-	}
-
-	public void setMessageList(List<Message> messageList) {
-		this.messageList = messageList;
-	}
-	
-	public String getMessage() {
-		return StringUtil.isEmpty(msgDesc) ? ("HTTP " + httpStatusCode) : msgDesc;
-	}
-
-	public static RESTResponse fromClientResponse(ClientResponse response) {
-		RESTResponse ret = null;
-
-		String jsonString = response == null ? null : response.getEntity(String.class);
-		int    httpStatus = response == null ? 0 : response.getStatus();
-
-		if(! StringUtil.isEmpty(jsonString)) {
-			ret = RESTResponse.fromJson(jsonString);
-		}
-
-		if(ret == null) {
-			ret = new RESTResponse();
-		}
-
-		ret.setHttpStatusCode(httpStatus);
-
-		return ret;
-	}
-
-	public String toJson() {
-		try {
-			ObjectMapper om = new ObjectMapper();
-
-			return om.writeValueAsString(this);
-		} catch (Exception e) {
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("toJson() failed", e);
-			}
-		}
-
-		return "";
-	}
-
-	public static RESTResponse fromJson(String jsonString) {
-		try {
-			ObjectMapper om = new ObjectMapper();
-
-			return om.readValue(jsonString, RESTResponse.class);
-		} catch (Exception e) {
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("fromJson('" + jsonString + "') failed", e);
-			}
-		}
-
-		return null;
-	}
-
-	@Override
-	public String toString() {
-		return toJson();
-	}
-
-	@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-	@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-	@JsonIgnoreProperties(ignoreUnknown = true)
-	public static class Message {
-		private String name;
-		private String rbKey;
-		private String message;
-		private Long   objectId;
-		private String fieldName;
-
-		public String getName() {
-			return name;
-		}
-		public void setName(String name) {
-			this.name = name;
-		}
-		public String getRbKey() {
-			return rbKey;
-		}
-		public void setRbKey(String rbKey) {
-			this.rbKey = rbKey;
-		}
-		public String getMessage() {
-			return message;
-		}
-		public void setMessage(String message) {
-			this.message = message;
-		}
-		public Long getObjectId() {
-			return objectId;
-		}
-		public void setObjectId(Long objectId) {
-			this.objectId = objectId;
-		}
-		public String getFieldName() {
-			return fieldName;
-		}
-		public void setFieldName(String fieldName) {
-			this.fieldName = fieldName;
-		}
-
-		public String toJson() {
-			try {
-				ObjectMapper om = new ObjectMapper();
-
-				return om.writeValueAsString(this);
-			} catch (Exception e) {
-				if(LOG.isDebugEnabled()) {
-					LOG.debug("toJson() failed", e);
-				}
-			}
-			
-			return "";
-		}
-
-		public static RESTResponse fromJson(String jsonString) {
-			try {
-				ObjectMapper om = new ObjectMapper();
-
-				return om.readValue(jsonString, RESTResponse.class);
-			} catch (Exception e) {
-				if(LOG.isDebugEnabled()) {
-					LOG.debug("fromJson('" + jsonString + "') failed", e);
-				}
-			}
-			
-			return null;
-		}
-
-		@Override
-		public String toString() {
-			return toJson();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java
deleted file mode 100644
index f32c5b7..0000000
--- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package com.xasecure.authorization.hadoop.config;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-
-import com.xasecure.audit.provider.AuditProviderFactory;
-import com.xasecure.authorization.hadoop.constants.XaSecureHadoopConstants;
-
-public class XaSecureConfiguration extends Configuration {
-	
-	private static final Logger LOG = Logger.getLogger(XaSecureConfiguration.class) ;
-	
-	private static XaSecureConfiguration config = null;
-	
-	private XaSecureConfiguration() {
-		super(false) ;
-		
-		//
-		// WorkAround for having all Hadoop Configuration in the CLASSPATH first, even if it is invoked by Hive Engine.
-		// 
-		//   So, we look for "hive-site.xml", if it is available, take the xasecure-audit.xml file from the same location.
-		//   If we do not see "hive-site.xml", we look for "hbase-site.xml", if found, take the xasecure-audit.xml file from the same location.
-		//   If we do not see "hbase-site.xml", we look for "hdfs-site.xml", if found, take the xasecure-audit.xml file from the same location.
-		//   If we do not see, we let the CLASSPATH based search to find xasecure-audit.xml file.
-		
-		
-		URL auditFileLocation = getXAAuditXMLFileLocation() ;
-		
-		if (auditFileLocation != null) {
-			addResource(auditFileLocation) ;
-		}
-		else {
-			addResourceIfReadable(XaSecureHadoopConstants.XASECURE_AUDIT_FILE) ;
-		}
-		addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HDFS_SECURITY_FILE);
-		addResourceIfReadable(XaSecureHadoopConstants.XASECURE_KNOX_SECURITY_FILE);
-		addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HBASE_SECURITY_FILE) ;
-		addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HIVE_SECURITY_FILE) ;
-		addResourceIfReadable(XaSecureHadoopConstants.XASECURE_STORM_SECURITY_FILE);
-		
-	}
-	
-	@SuppressWarnings("deprecation")
-	private void addResourceIfReadable(String aResourceName) {
-		String fName = getFileLocation(aResourceName) ;
-		if (fName != null) {
-			File f = new File(fName) ;
-			if (f.exists() && f.canRead()) {
-				URL fUrl = null ;
-				try {
-					fUrl = f.toURL() ;
-					addResource(fUrl) ;
-				} catch (MalformedURLException e) {
-					LOG.debug("Unable to find URL for the resource name [" + aResourceName +"]. Ignoring the resource:" + aResourceName);
-				}
-			}
-		}
-	}
-	
-
-	public static XaSecureConfiguration getInstance() {
-		if (config == null) {
-			synchronized (XaSecureConfiguration.class) {
-				XaSecureConfiguration temp = config;
-				if (temp == null) {
-					config = new XaSecureConfiguration();
-				}
-			}
-		}
-		return config;
-	}
-
-	public void initAudit(AuditProviderFactory.ApplicationType appType) {
-		AuditProviderFactory auditFactory = AuditProviderFactory.getInstance();
-
-		if(auditFactory == null) {
-			LOG.error("Unable to find the AuditProviderFactory. (null) found") ;
-			return;
-		}
-
-		Properties props = getProps();
-
-		if(props == null) {
-			return;
-		}
-
-		if(! auditFactory.isInitDone()) {
-			auditFactory.init(props, appType);
-		}
-	}
-
-	public boolean isAuditInitDone() {
-		AuditProviderFactory auditFactory = AuditProviderFactory.getInstance();
-
-		return auditFactory != null && auditFactory.isInitDone();
-	}
-
-	
-	@SuppressWarnings("deprecation")
-	public  URL getXAAuditXMLFileLocation() {
-		URL ret = null ;
-
-		try {
-			for(String  cfgFile : 	new String[] {  "hive-site.xml",  "hbase-site.xml",  "hdfs-site.xml" } ) {
-				String loc = getFileLocation(cfgFile) ;
-				if (loc != null) {
-					if (new File(loc).canRead()) {
-						File parentFile = new File(loc).getParentFile() ;
-						ret = new File(parentFile, XaSecureHadoopConstants.XASECURE_AUDIT_FILE).toURL() ;
-						break ;
-					}
-				}
-			}
-		}
-		catch(Throwable t) {
-			LOG.error("Unable to locate audit file location." , t) ;
-			ret = null ;
-		}
-		
-		return ret ;
-	}
-	
-	private String getFileLocation(String fileName) {
-		
-		String ret = null ;
-		
-		URL lurl = XaSecureConfiguration.class.getClassLoader().getResource(fileName) ;
-		
-		if (lurl == null ) {
-			lurl = XaSecureConfiguration.class.getClassLoader().getResource("/" + fileName) ;
-		}
-		
-		if (lurl != null) {
-			ret = lurl.getFile() ;
-		}
-		
-		return ret ;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java
deleted file mode 100644
index 64794d1..0000000
--- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.xasecure.authorization.hadoop.constants;
-
-public class XaSecureHadoopConstants {
-	
-	public static final String XASECURE_AUDIT_FILE          = "xasecure-audit.xml" ; 
-	public static final String XASECURE_HDFS_SECURITY_FILE  = "xasecure-hdfs-security.xml" ; 
-	public static final String XASECURE_KNOX_SECURITY_FILE  = "xasecure-knox-security.xml" ; 
-	public static final String XASECURE_HBASE_SECURITY_FILE = "xasecure-hbase-security.xml" ; 
-	public static final String XASECURE_HIVE_SECURITY_FILE  = "xasecure-hive-security.xml" ; 
-	public static final String XASECURE_POLICYMGR_SSL_FILE  = "xasecure-policymgr-ssl.xml"  ;
-	public static final String XASECURE_STORM_SECURITY_FILE = "xasecure-storm-security.xml" ;
-	
-	public static final String XASECURE_ADD_HDFS_PERMISSION_PROP = "xasecure.add-hadoop-authorization" ;
-	public static final boolean XASECURE_ADD_HDFS_PERMISSION_DEFAULT = false ;
-	public static final String READ_ACCCESS_TYPE = "read";
-	public static final String WRITE_ACCCESS_TYPE = "write";
-	public static final String EXECUTE_ACCCESS_TYPE = "execute";
-
-	public static final String HDFS_ROOT_FOLDER_PATH_ALT = "";
-	public static final String HDFS_ROOT_FOLDER_PATH = "/";
-	
-	public static final String HDFS_ACCESS_VERIFIER_CLASS_NAME_PROP 	= "hdfs.authorization.verifier.classname" ;
-	public static final String HDFS_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hdfs.XASecureAuthorizer" ;	
-
-	public static final String HIVE_ACCESS_VERIFIER_CLASS_NAME_PROP 	= "hive.authorization.verifier.classname" ;
-	public static final String HIVE_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hive.XASecureAuthorizer" ;
-
-	public static final String  HIVE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_PROP 	     = "xasecure.hive.update.xapolicies.on.grant.revoke" ;
-	public static final boolean HIVE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE = true;
-
-	public static final String  HBASE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_PROP 	     = "xasecure.hbase.update.xapolicies.on.grant.revoke" ;
-	public static final boolean HBASE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE = true;
-	
-	public static final String KNOX_ACCESS_VERIFIER_CLASS_NAME_PROP 	= "knox.authorization.verifier.classname" ;
-	public static final String KNOX_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.knox.XASecureAuthorizer" ;
-
-	public static final String HBASE_ACCESS_VERIFIER_CLASS_NAME_PROP 	= "hbase.authorization.verifier.classname" ;
-	public static final String HBASE_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hbase.XASecureAuthorizer" ;
-	
-	public static final String STORM_ACCESS_VERIFIER_CLASS_NAME_PROP 	= "storm.authorization.verifier.classname" ;
-	public static final String STORM_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.storm.XASecureAuthorizer" ;
-
-	//
-	// Loging constants 
-	//
-	public static final String AUDITLOG_FIELD_DELIMITER_PROP 			= "xasecure.auditlog.fieldDelimiterString";
-	public static final String AUDITLOG_XASECURE_MODULE_ACL_NAME_PROP  	= "xasecure.auditlog.xasecureAcl.name" ;
-	public static final String AUDITLOG_HADOOP_MODULE_ACL_NAME_PROP    	= "xasecure.auditlog.hadoopAcl.name" ;
-	
-	public static final String DEFAULT_LOG_FIELD_DELIMITOR  			= "|" ;
-	public static final String DEFAULT_XASECURE_MODULE_ACL_NAME  		= "xasecure-acl" ;
-	public static final String DEFAULT_HADOOP_MODULE_ACL_NAME    		= "hadoop-acl" ;
-	
-
-	public static final String AUDITLOG_FIELDINFO_VISIBLE_PROP 			= "xasecure.auditlog.fieldInfoVisible" ;
-	public static final boolean DEFAULT_AUDITLOG_FIELDINFO_VISIBLE    	= false ;
-
-	public static final String AUDITLOG_ACCESS_GRANTED_TEXT_PROP 		= "xasecure.auditlog.accessgranted.text" ;
-	public static final String AUDITLOG_ACCESS_DENIED_TEXT_PROP 		= "xasecure.auditlog.accessdenied.text" ;
-
-	public static final String DEFAULT_ACCESS_GRANTED_TEXT 				= "granted" ;
-	public static final String DEFAULT_ACCESS_DENIED_TEXT 				= "denied" ;
-	
-	public static final String AUDITLOG_EMPTY_STRING 					= "" ;
-	
-	public static final String AUDITLOG_HDFS_EXCLUDE_LIST_PROP 			= "xasecure.auditlog.hdfs.excludeusers" ;
-	public static final String AUDITLOG_REPOSITORY_NAME_PROP 			= "xasecure.audit.repository.name" ;
-	public static final String AUDITLOG_IS_ENABLED_PROP 			    = "xasecure.audit.is.enabled" ;
-	
-	public static final String KEYMGR_URL_PROP = "hdfs.keymanager.url" ;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
deleted file mode 100644
index 973897f..0000000
--- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
+++ /dev/null
@@ -1,1376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xasecure.authorization.hadoop.log;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.Locale;
-import java.util.TimeZone;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Layout;
-import org.apache.log4j.spi.LoggingEvent;
-import org.apache.log4j.helpers.LogLog;
-
-
-/********************************
-* HdfsFileAppender
-* 
-**********************************/
-public class HdfsFileAppender extends FileAppender {
-   
-	   
-    // Constants for checking the DatePattern 
-    public static final String MINUTES ="Min";
-	public static final String HOURS ="Hr";
-	public static final String DAYS ="Day";
-	public static final String WEEKS ="Week";
-	public static final String MONTHS ="Month";
-
-    // The code assumes that the following constants are in a increasing sequence.
-    public static final int TOP_OF_TROUBLE = -1;
-    public static final int TOP_OF_MINUTE = 0;
-    public static final int TOP_OF_HOUR = 1;
-    public static final int HALF_DAY = 2;
-    public static final int TOP_OF_DAY = 3;
-    public static final int TOP_OF_WEEK = 4;
-    public static final int TOP_OF_MONTH = 5;
-    
-    /**
-     * The date pattern. By default, the pattern is set to "1Day" meaning daily rollover.
-     */
-    private String hdfsFileRollingInterval = "1Day";
-    
-    private String fileRollingInterval = "1Day";
-
-     
-    private String scheduledFileCache;
-
-    /**
-     * The next time we estimate a rollover should occur.
-     */
-    private long nextCheck = System.currentTimeMillis() - 1;
-    
-    private long prevnextCheck = nextCheck;
-
-    private long nextCheckLocal = System.currentTimeMillis() -1;
-    
-    private long prevnextCheckLocal = nextCheckLocal;
-    
-    private Date now = new Date();
-    
-    private Date nowLocal = now;
-
-    private SimpleDateFormat sdf;
-
-    private RollingCalendar rc = new RollingCalendar();
-    
-    private RollingCalendar rcLocal = new RollingCalendar();
-    
-    private FileOutputStream ostream = null;
-    
-    private String fileSystemName;
-    
-    private Layout layout = null;
-    
-    private String encoding = null;
-    
-    private String hdfsfileName = null;
-    
-    private String actualHdfsfileName = null;
-    
-    private String scheduledHdfsFileName = null;
-    
-    private String fileCache = null;
-    
-    private HdfsSink hs = null;  
-    
-    private Writer cacheWriter = null;
-    
-	private FileOutputStream cacheOstream = null;
-	
-	private  boolean hdfsAvailable = false;
-	
-    private long hdfsNextCheck = System.currentTimeMillis() - 1;
-    
-    private boolean timeCheck = false; 
-
-    private int hdfsFileRollOffset = 0;    
-    
-    private int fileRollOffset = 0;
-    
-    private boolean firstTime = true;
-    
-    private boolean firstTimeLocal = true;
-    
-    private String hdfsLiveUpdate = "true";
-        
-    boolean hdfsUpdateAllowed = true;
-    
-    private String hdfsCheckInterval=null;
-    
-    private String processUser = null;
-        
-    private String datePattern = "'.'yyyy-MM-dd-HH-mm";
-
-     /**
-     * The gmtTimeZone is used only in computeCheckPeriod() method.
-     */
-    private static final TimeZone gmtTimeZone = TimeZone.getTimeZone("GMT+0");
-    
-	private static final String DEFAULT_HDFSCHECKINTERVAL = "2min";
-
-    /**
-     * The default constructor does nothing.
-     */
-    public HdfsFileAppender() {
-    	
-    }
-
-
-    /**
-     * The <b>hdfsFileRollingInterval</b> takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for hdfs File rollover schedule.
-     */
-    public void setHdfsFileRollingInterval(String pattern) {
-        hdfsFileRollingInterval = pattern;
-    }
-
-    /** Returns the value of the <b>hdfsFileRollingInterval</b> option. */
-    public String getHdfsFileRollingInterval() {
-        return hdfsFileRollingInterval;
-    }
-    
-    /**
-     * The <b>LocalDatePattern</b> takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for local cache File rollover schedule.
-     */
-    public void setFileRollingInterval(String pattern) {
-    	fileRollingInterval = pattern;
-    }
-
-    /** Returns the value of the <b>FileRollingInterval</b> option. */
-    public String getFileRollingInterval() {
-        return fileRollingInterval;
-    }
-
-    /**
-     * This will set liveHdfsUpdate flag , where true  will update hdfs live else false will create local cache files and copy the files to hdfs
-     */
-    public void setHdfsLiveUpdate(String val) {
-    	hdfsLiveUpdate=val;
-     }
-
-    /** Returns the value of the <b>FileRollingInterval</b> option. */
-    public String getHdfsLiveUpdate() {
-        return hdfsLiveUpdate;
-    }
-    
-    public String getHdfsCheckInterval() {
-    	return hdfsCheckInterval;
-    }
-    
-    public void setHdfsCheckInterval(String val){
-    	hdfsCheckInterval = ( hdfsCheckInterval != null) ? val : DEFAULT_HDFSCHECKINTERVAL;
-    }
-    
-    public String getEncoding() {
-        return encoding;
-      }
-
-      public void setEncoding(String value) {
-        encoding = value;
-    }
-    
-    public void activateOptions() {
-        super.activateOptions();
-        
-        sdf = new SimpleDateFormat(datePattern); 
-        processUser=System.getProperties().getProperty("user.name");
-       
-        if(hdfsFileRollingInterval != null && fileName != null) {
-        	now.setTime(System.currentTimeMillis());
-        	int type = computeCheckPeriod(hdfsFileRollingInterval);
-            hdfsFileRollOffset = getTimeOffset(hdfsFileRollingInterval);
-            printHdfsPeriodicity(type,hdfsFileRollOffset);
-            rc.setType(type);
-            LogLog.debug("File name: " + fileName);
-            File file = new File(fileName);
-            scheduledHdfsFileName = hdfsfileName+sdf.format(new Date(file.lastModified()));
-       	    firstTime = true;
-            LogLog.debug("Local and hdfs Files" + scheduledHdfsFileName + " " +scheduledHdfsFileName) ;
-           
-        } else {
-        	LogLog.error("Either File or hdfsFileRollingInterval options are not set for appender [" + name + "].");
-        }
-        
-        // Local Cache File 
-
-        if (fileRollingInterval != null && fileCache != null){
-    	   nowLocal.setTime(System.currentTimeMillis());
-           int localtype = computeCheckPeriod(fileRollingInterval);
-           fileRollOffset = getTimeOffset(fileRollingInterval);
-           printLocalPeriodicity(localtype,fileRollOffset);
-           rcLocal.setType(localtype);
-           LogLog.debug("LocalCacheFile name: " + fileCache);
-           File fileCachehandle = new File(fileCache);
-           scheduledFileCache = fileCache+sdf.format(new Date(fileCachehandle.lastModified()));
-      	   firstTimeLocal = true;
-      	   
-        } else {
-        	LogLog.error("Either File or LocalDatePattern options are not set for appender [" + name + "].");
-        }
-        
-        hdfsUpdateAllowed = Boolean.parseBoolean(hdfsLiveUpdate);
-        actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
-    }
-    
-    public static int containsIgnoreCase(String str1, String str2) {
-    	return str1.toLowerCase().indexOf(str2.toLowerCase());
-    }
-        
-    
-    public int computeCheckPeriod(String timePattern){
-
-    	if(containsIgnoreCase(timePattern, MINUTES) > 0) {
-    		return TOP_OF_MINUTE;
-    	}
-
-    	if(containsIgnoreCase(timePattern, HOURS) > 0) {
-    		return TOP_OF_HOUR;
-    	}
-    	
-    	if(containsIgnoreCase(timePattern, DAYS) > 0) {
-    		return TOP_OF_DAY;
-    	}
-    	
-    	if(containsIgnoreCase(timePattern, WEEKS) > 0) {
-    		return TOP_OF_WEEK;
-    	}
-    	
-    	if(containsIgnoreCase(timePattern, MONTHS) > 0) {
-    		return TOP_OF_MONTH;
-    	}
-    	
-    	return TOP_OF_TROUBLE;
-    }
-    	
-    
-    private void printHdfsPeriodicity(int type, int offset) {
-        switch(type) {
-            case TOP_OF_MINUTE:
-            	LogLog.debug("Appender [" + name + "] to be rolled every " + offset +  " minute.");
-                break;
-            case TOP_OF_HOUR:
-            	LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset +  " hour.");
-                break;
-            case HALF_DAY:
-            	LogLog.debug("Appender [" + name + "] to be rolled at midday and midnight.");
-                break;
-            case TOP_OF_DAY:
-            	LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset +  " day.");
-                break;
-            case TOP_OF_WEEK:
-            	LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset +  " week.");
-                break;
-            case TOP_OF_MONTH:
-            	LogLog.debug("Appender [" + name + "] to be rolled at start of every  " + offset +  " month.");
-                break;
-            default:
-            	LogLog.warn("Unknown periodicity for appender [" + name + "].");
-        }
-    }
-    
-    
-    public int getTimeOffset(String timePattern){
-	    int index;	
-	    int offset=-1;
-	    
-		if ((index = containsIgnoreCase(timePattern, MINUTES)) > 0) {
-			offset = Integer.parseInt(timePattern.substring(0,index));
-		}
-		
-		if ((index = containsIgnoreCase(timePattern, HOURS)) > 0) {
-			offset = Integer.parseInt(timePattern.substring(0,index));
-			
-		}
-		
-		if ((index = containsIgnoreCase(timePattern, DAYS)) > 0) {
-			offset = Integer.parseInt(timePattern.substring(0,index));
-			
-		}
-		
-		if ((index = containsIgnoreCase(timePattern, WEEKS)) > 0) {
-			offset = Integer.parseInt(timePattern.substring(0,index));
-			
-		}
-		
-		if ((index = containsIgnoreCase(timePattern, MONTHS)) > 0) {
-			offset = Integer.parseInt(timePattern.substring(0,index));
-			
-		}
-		
-		return offset;
-    }
-    
-    private void printLocalPeriodicity(int type, int offset) {
-        switch(type) {
-            case TOP_OF_MINUTE:
-            	LogLog.debug("Appender [" + name + "] Local File to be rolled every " + offset +  " minute.");
-                break;
-            case TOP_OF_HOUR:
-            	LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset +  " hour.");
-                break;
-            case HALF_DAY:
-            	LogLog.debug("Appender [" + name + "] Local File to be rolled at midday and midnight.");
-                break;
-            case TOP_OF_DAY:
-            	LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset +  " day.");
-                break;
-            case TOP_OF_WEEK:
-            	LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset +  " week.");
-                break;
-            case TOP_OF_MONTH:
-            	LogLog.debug("Appender [" + name + "] Local File to be rolled at start of every  " + offset +  " month.");
-                break;
-            default:
-            	LogLog.warn("Unknown periodicity for appender [" + name + "].");
-        }
-    }
-    
-   
-    
-
-    /**
-     * Rollover the current file to a new file.
-     */
-    private void rollOver() throws IOException {
-        /* Compute filename, but only if hdfsFileRollingInterval is specified */
-        if(hdfsFileRollingInterval == null) {
-            errorHandler.error("Missing hdfsFileRollingInterval option in rollOver().");
-            return;
-        }
-        
-        long epochNow = System.currentTimeMillis();
-         
-        String datedhdfsFileName = hdfsfileName+sdf.format(epochNow);
-        
-        LogLog.debug("In rollOver epochNow" + epochNow + "  " + "nextCheck: " + prevnextCheck );
-        
-      
-        // It is too early to roll over because we are still within the bounds of the current interval. Rollover will occur once the next interval is reached.
-           
-        if (epochNow < prevnextCheck) {
-        	return;
-        }
-
-        // close current file, and rename it to datedFilename
-        this.closeFile();
-        
-        LogLog.debug("Rolling Over hdfs file to " + scheduledHdfsFileName);
-        
-                
-        if ( hdfsAvailable ) {
-           // for hdfs file we don't rollover the fike, we rename the file.
-           actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
-        }         
-         
-        try {
-            // This will also close the file. This is OK since multiple close operations are safe.
-            this.setFile(fileName, false, this.bufferedIO, this.bufferSize);
-        } catch(IOException e) {
-            errorHandler.error("setFile(" + fileName + ", false) call failed.");
-        }
-        scheduledHdfsFileName = datedhdfsFileName;
-    }
-    
-    
-    /**
-     * Rollover the current Local file to a new file.
-     */
-    private void rollOverLocal() throws IOException {
-        /* Compute filename, but only if datePattern is specified */
-        if(fileRollingInterval == null) {
-            errorHandler.error("Missing LocalDatePattern option in rollOverLocal().");
-            return;
-        }
-        
-        long epochNow = System.currentTimeMillis();
-       
-        String datedCacheFileName = fileCache+sdf.format(epochNow);
-        LogLog.debug("In rollOverLocal() epochNow" + epochNow + "  " + "nextCheckLocal: " + prevnextCheckLocal );
-        
-        // It is too early to roll over because we are still within the bounds of the current interval. Rollover will occur once the next interval is reached.
-        if (epochNow < prevnextCheckLocal ) {
-        	return;
-        }
-
-        if (new File(fileCache).length() != 0 ) {    
-	        LogLog.debug("Rolling Local cache to " + scheduledFileCache);
-	        
-	  	    this.closeCacheWriter();
-		  
-		    File target  = new File(scheduledFileCache);
-	        if (target.exists()) {
-	          target.delete();
-	        }
-	
-	        File file = new File(fileCache);
-	      
-	        boolean result = file.renameTo(target);
-	      
-	        if(result) {
-	          LogLog.debug(fileCache +" -> "+ scheduledFileCache);
-	        } else {
-	          LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+scheduledFileCache+"].");
-	        }
-	        setFileCacheWriter();
-	        scheduledFileCache =  datedCacheFileName;
-        }
-    }
-
-    
-    /**
-     * <p>
-     * Sets and <i>opens</i> the file where the log output will go. The specified file must be writable.
-     * <p>
-     * If there was already an opened file, then the previous file is closed first.
-     * <p>
-     * <b>Do not use this method directly. To configure a FileAppender or one of its subclasses, set its properties one by one and then call
-     * activateOptions.</b>
-     * 
-     * @param fileName The path to the log file.
-     * @param append If true will append to fileName. Otherwise will truncate fileName.
-     */
-    public void setFile(String file) {
-        // Trim spaces from both ends. The users probably does not want
-        // trailing spaces in file names.
-        String val = file.trim();
-        
-        fileName=val;
-        fileCache=val+".cache";
-        
-      } 
-
-    @Override
-    public synchronized void setFile(String fileName, boolean append, boolean bufferedIO, int bufferSize) throws IOException {
-    	LogLog.debug("setFile called: "+fileName+", "+append);
-
-        // It does not make sense to have immediate flush and bufferedIO.
-        if(bufferedIO) {
-          setImmediateFlush(false);
-        }
-
-        reset();
-        
-        try {
-              //
-              //   attempt to create file
-              //
-        	  ostream = new FileOutputStream(fileName, append);
-          } catch(FileNotFoundException ex) {
-              //
-              //   if parent directory does not exist then
-              //      attempt to create it and try to create file
-              //      see bug 9150
-              //
-        	   File umFile = new File(fileName);
-        	   String parentName = umFile.getParent();
-            	
-               if (parentName != null) {
-                  File parentDir = new File(parentName);
-                  if(!parentDir.exists() && parentDir.mkdirs()) {
-                   	 ostream = new FileOutputStream(fileName, append);
-                  } else {
-                     throw ex;
-                  }
-               } else {
-                  throw ex;
-               }
-        }
-        
-        Writer fw = createWriter(ostream);
-        if(bufferedIO) {
-          fw = new BufferedWriter(fw, bufferSize);
-        }
-        this.setQWForFiles(fw);
-        this.fileName = fileName;
-        this.fileAppend = append;
-        this.bufferedIO = bufferedIO;
-        this.bufferSize = bufferSize;
-        
-        //set cache file
-        setFileCacheWriter();
-        
-        writeHeader();
-                
-        LogLog.debug("setFile ended");
-    }
-
-    public void setHdfsDestination(final String name) {
-    	//Setting the fileSystemname
-    	
-    	String hostName = null;
-    	
-    	String val = name.trim();
-    	
-        try {
-        	
-			hostName = InetAddress.getLocalHost().getHostName();
-		    val=val.replaceAll("%hostname%", hostName);
-	        String hostStr[] = val.split(":");
-	        if ( hostStr.length > 0 ) {
-	           fileSystemName = hostStr[0]+":"+hostStr[1]+":"+hostStr[2];
-	           
-	           hdfsfileName = hostStr[3];
-	           
-	        } else {
-	        	LogLog.error("Failed to set HdfsSystem and File"); 	
-	        }
-	        			
-		} catch (UnknownHostException uhe) {
-			LogLog.error("Setting the Hdfs Desitination Failed", uhe);
-		}
-        
-    	LogLog.debug("FileSystemName:" + fileSystemName + "fileName:"+ hdfsfileName);   
-     	
-     }
-    
-    /**
-     * This method differentiates HdfsFileAppender from its super class.
-     * <p>
-     * Before actually logging, this method will check whether it is time to do a rollover. If it is, it will schedule the next rollover time and then rollover.
-     */
-    @Override
-    protected void subAppend(LoggingEvent event) {
-    	LogLog.debug("Called subAppend for logging into hdfs...");   
-       
-        long n = System.currentTimeMillis();
-        if(n >= nextCheck) {
-            now.setTime(n);
-            prevnextCheck = nextCheck;
-            nextCheck = rc.getNextCheckMillis(now,hdfsFileRollOffset);
-            if ( firstTime) {
-            	 prevnextCheck = nextCheck;
-            	 firstTime = false;
-            }
-            try {
-            	if (hdfsUpdateAllowed) {
-            		rollOver();
-            	}
-            } catch(IOException e) {
-            	LogLog.error("rollOver() failed.", e);
-            }
-        }
-   
-        long nLocal = System.currentTimeMillis();
-        if ( nLocal > nextCheckLocal ) {
-    	  nowLocal.setTime(nLocal);
-    	  prevnextCheckLocal = nextCheckLocal;
-    	  nextCheckLocal = rcLocal.getNextCheckMillis(nowLocal, fileRollOffset);
-    	   if ( firstTimeLocal) {
-          	 prevnextCheckLocal = nextCheckLocal;
-          	 firstTimeLocal = false;
-          }
-    	  try {
-          	rollOverLocal();
-          } catch(IOException e) {
-          	LogLog.error("rollOverLocal() failed.", e);
-          }
-        }
-        
-        this.layout = this.getLayout();
-        this.encoding = this.getEncoding();
-        
-        // Append HDFS
-        appendHDFSFileSystem(event); 
-        
-   
-        //super.subAppend(event);
-    }
-   
-    @Override
-    protected
-    void reset() {
-      closeWriter();
-      this.qw = null;
-      //this.
-      this.closeHdfsWriter();
-      this.closeCacheWriter();
-   }
-    
-    @Override
-    public synchronized void close() {
-    	LogLog.debug("Closing all resource..");
-        this.closeFile();
-        this.closeHdfsWriter();
-        this.closeHdfsOstream();
-        this.closeFileSystem();
-    }
-
-    @Override
-    protected void closeFile() {
-        try {
-            if(this.ostream != null) {
-                this.ostream.close();
-                this.ostream = null;
-            }
-        } catch(IOException ie) {
-        	LogLog.error("unable to close output stream", ie);
-        }
-        this.closeHdfsWriter();
-        this.closeHdfsOstream();
-     }
-
-    @Override
-    protected void closeWriter() {
-        try {
-            if(this.qw != null) {
-                this.qw.close();
-                this.qw = null;
-            }
-        } catch(IOException ie) {
-            LogLog.error("unable to close writer", ie);
-        }
-    }
-    
-    @Override
-    public void finalize() {
-        super.finalize();
-        close();
-    }
-  
-  
- /******* HDFS Appender methods ***********/
-     
- private void appendHDFSFileSystem(LoggingEvent event) {
-
-	long currentTime = System.currentTimeMillis();
-	
-    try {
-    	
-    	 if ( currentTime >= hdfsNextCheck ) {
-    		 
-    		LogLog.debug("About to Open fileSystem" + fileSystemName+" "+actualHdfsfileName) ;
-	      	hs = openHdfsSink(fileSystemName,actualHdfsfileName,fileCache,fileAppend,bufferedIO,bufferSize,layout,encoding,scheduledFileCache,cacheWriter,hdfsUpdateAllowed,processUser);
-	      	if (hdfsUpdateAllowed) {
-	      	    // stream into hdfs only when  liveHdfsUpdate flag is true else write to cache file.
-	      		hs.setOsteam();
-	      		hs.setWriter();
-	      		hs.append(event);
-		    } else {
-		    	 writeToCache(event);
-		    }
- 	        hdfsAvailable = true;
-  	   
-    	   } else {
-    		// Write the Log To cache file util time to check hdfs availability
- 		    hdfsAvailable = false;
- 		    LogLog.debug("Hdfs Down..Will check hdfs vailability after " + hdfsNextCheck + "Current Time :" +hdfsNextCheck ) ;
- 		    writeToCache(event);
-    	  }
-       }
- 	   catch(Throwable t) {
- 		  // Write the Log To cache file if hdfs connect error out.
- 		  hdfsAvailable = false;
- 		  if ( !timeCheck ) {
- 			 int hdfscheckInterval =  getTimeOffset(hdfsCheckInterval);
- 			 hdfsNextCheck = System.currentTimeMillis()+(1000*60*hdfscheckInterval);
- 			 timeCheck = true;
- 		     LogLog.debug("Hdfs Down..Will check hdfs vailability after " + hdfsCheckInterval , t) ;
- 		     
- 		     }
- 		  	 writeToCache(event);
-	   }
-
-    }
-    
-   
-    private HdfsSink openHdfsSink(String fileSystemName,String filename, String fileCache, boolean append, boolean bufferedIO,int bufferSize,Layout layout, String encoding, String scheduledCacheFile, Writer cacheWriter,boolean hdfsUpdateAllowed,String processUser) throws Throwable { 
-       
-        HdfsSink hs = null;
-        hs = HdfsSink.getInstance();
-        if ( hs != null) 
-        	
-        	LogLog.debug("Hdfs Sink successfully instatiated");
-        	try {
-        		hs.init(fileSystemName, filename, fileCache, append, bufferedIO, bufferSize, layout, encoding,scheduledCacheFile,cacheWriter,hdfsUpdateAllowed,processUser);
-        		
-        	} catch (Throwable t) {
-        		 throw t;
-        	}
-        return hs;
- 	
-    }
-    
-    private void closeHdfsOstream() {
-    	if (hs != null ){
-    		LogLog.debug("Closing hdfs outstream") ;
-    		hs.closeHdfsOstream();
-       }
-    }
-    
-    private void closeHdfsWriter() {
-    	
-    	if (hs != null) {
-    	 LogLog.debug("Closing hdfs Writer") ;
-    	 hs.closeHdfsWriter();
-    	}
-    }
-    
-    private void closeFileSystem() {
-    	hs.closeHdfsSink();
-    }
-    
-   
-    
-    /****** Cache File Methods **/
-    
-    
-    public void setFileCacheWriter()  {
-     
-     try {
-    	 setFileCacheOstream(fileCache);
-   	 } catch(IOException ie) {
-   		 LogLog.error("Logging failed while tring to write into Cache File..", ie);
-   	 }
-	 LogLog.debug("Setting Cache Writer..");
-	 cacheWriter = createCacheFileWriter(cacheOstream);
-	 if(bufferedIO) {
-		 cacheWriter = new BufferedWriter(cacheWriter, bufferSize);
-	  }		  
-	}
-    
-    
-    private  void setFileCacheOstream(String fileCache) throws IOException {
-    
-     try {
-          cacheOstream = new FileOutputStream(fileCache, true);
-    	 } catch(FileNotFoundException ex) {
-          String parentName = new File(fileCache).getParent();
-          if (parentName != null) {
-             File parentDir = new File(parentName);
-             if(!parentDir.exists() && parentDir.mkdirs()) {
-            	 cacheOstream = new FileOutputStream(fileName, true);
-             } else {
-                throw ex;
-             }
-          } else {
-             throw ex;
-          }
-    	}
-    }
-    
-    
-    public OutputStreamWriter createCacheFileWriter(OutputStream os ) {
-	    OutputStreamWriter retval = null;
-	  
-	    if(encoding != null) {
-	      try {
-	    	   retval = new OutputStreamWriter(os, encoding);
-	      	   } catch(IOException ie) {
-	      	     LogLog.warn("Error initializing output writer.");
-	      	     LogLog.warn("Unsupported encoding?");
-	      	   }
-	        }
-	     if(retval == null) {
-	        retval = new OutputStreamWriter(os);
-	     }
-	    return retval;
-	  }
-  
-    
-   public void writeToCache(LoggingEvent event) {
-	 
-	   try {  
-		     LogLog.debug("Writing log to Cache.." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + cacheWriter.toString());
-		    		     
-		     cacheWriter.write(this.layout.format(event));
-		     cacheWriter.flush();
-		     
-		     if(layout.ignoresThrowable()) {
-		      	 String[] s = event.getThrowableStrRep();
-		      	 if (s != null) {
-			  	   int len = s.length;
-			   	   for(int i = 0; i < len; i++) {
-				  	  LogLog.debug("Log:" + s[i]);
-				  	  cacheWriter.write(s[i]);
-				  	  cacheWriter.write(Layout.LINE_SEP);
-				  	  cacheWriter.flush();
-				  }
-		       }
-		     }
-          } catch (IOException ie) {
-            LogLog.error("Unable to log event message to hdfs:", ie);  
-          }
-  }
-   
-  public void rollOverCacheFile() {
-	  
-	  if (new File(fileCache).length() != 0 ) {
-		  
-		  long epochNow = System.currentTimeMillis();
-	       
-	      String datedCacheFileName = fileCache + "." + epochNow;             
-		  LogLog.debug("Rolling over remaining cache File to new file"+ datedCacheFileName);
-		  closeCacheWriter();
-		  
-		  File target  = new File(datedCacheFileName);
-	      if (target.exists()) {
-	        target.delete();
-	      }
-	
-	      File file = new File(fileCache);
-	      
-	      boolean result = file.renameTo(target);
-	      
-	      if(result) {
-	        LogLog.debug(fileCache +" -> "+ datedCacheFileName);
-	      } else {
-	        LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+datedCacheFileName+"].");
-	      }
-	  }
-  }
-    
-   public void closeCacheWriter() {
-		  try {
-	            if(cacheWriter != null) {
-	            	cacheWriter.close();
-	            	cacheWriter = null;
-	            }
-	        } catch(IOException ie) {
-	        	 LogLog.error("unable to close cache writer", ie);
-	        }
-	}
-}
-
-/**
- * RollingCalendar is a helper class to HdfsFileAppender. Given a periodicity type and the current time, it computes the start of the next interval.
- */
-
-class RollingCalendar extends GregorianCalendar {
-    private static final long serialVersionUID = 1L;
-    
-    private int type = HdfsFileAppender.TOP_OF_TROUBLE;
-
-    RollingCalendar() {
-        super();
-    }
-
-    RollingCalendar(TimeZone tz, Locale locale) {
-        super(tz, locale);
-    }
-
-    void setType(int type) {
-        this.type = type;
-    }
-
-    public long getNextCheckMillis(Date now, int offset) {
-        return getNextCheckDate(now,offset).getTime();
-    }
-
-    public Date getNextCheckDate(Date now,int offset) {
-        this.setTime(now);
-
-        switch(this.type) {
-            case HdfsFileAppender.TOP_OF_MINUTE:
-                this.set(Calendar.SECOND, 0);
-                this.set(Calendar.MILLISECOND, 0);
-                this.add(Calendar.MINUTE, offset);
-                break;
-            case HdfsFileAppender.TOP_OF_HOUR:
-                this.set(Calendar.MINUTE, 0);
-                this.set(Calendar.SECOND, 0);
-                this.set(Calendar.MILLISECOND, 0);
-                this.add(Calendar.HOUR_OF_DAY, offset);
-                break;
-            case HdfsFileAppender.HALF_DAY:
-                this.set(Calendar.MINUTE, 0);
-                this.set(Calendar.SECOND, 0);
-                this.set(Calendar.MILLISECOND, 0);
-                int hour = get(Calendar.HOUR_OF_DAY);
-                if(hour < 12) {
-                    this.set(Calendar.HOUR_OF_DAY, 12);
-                } else {
-                    this.set(Calendar.HOUR_OF_DAY, 0);
-                    this.add(Calendar.DAY_OF_MONTH, 1);
-                }
-                break;
-            case HdfsFileAppender.TOP_OF_DAY:
-                this.set(Calendar.HOUR_OF_DAY, 0);
-                this.set(Calendar.MINUTE, 0);
-                this.set(Calendar.SECOND, 0);
-                this.set(Calendar.MILLISECOND, 0);
-                this.add(Calendar.DATE, offset);
-                break;
-            case HdfsFileAppender.TOP_OF_WEEK:
-                this.set(Calendar.DAY_OF_WEEK, getFirstDayOfWeek());
-                this.set(Calendar.HOUR_OF_DAY, 0);
-                this.set(Calendar.SECOND, 0);
-                this.set(Calendar.MILLISECOND, 0);
-                this.add(Calendar.WEEK_OF_YEAR, offset);
-                break;
-            case HdfsFileAppender.TOP_OF_MONTH:
-                this.set(Calendar.DATE, 1);
-                this.set(Calendar.HOUR_OF_DAY, 0);
-                this.set(Calendar.SECOND, 0);
-                this.set(Calendar.MILLISECOND, 0);
-                this.add(Calendar.MONTH, offset);
-                break;
-            default:
-                throw new IllegalStateException("Unknown periodicity type.");
-        }
-        return getTime();
-    }
-    
-  
-}
-
-
-/*************
- * Hdfs Sink
- *  
- *************/
-
-class HdfsSink {
-
-	  private static final String DS_REPLICATION_VAL = "1";
-	  private static final String DS_REPLICATION_KEY = "dfs.replication";
-	  private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
-	  private Configuration conf = null;
-	  private FileSystem fs= null;
-      private Path pt = null;
-	  private FSDataOutputStream hdfsostream = null;
-	  private String fsName = null;
-	  private String fileName = null;
-	  private String fileCache = null;
-	  private Layout layout = null;
-	  private String encoding = null;
-	  private Writer hdfswriter = null;
-	  private int bufferSize;
-	  private boolean bufferedIO=false;
-	  private static int fstime=0;
-	  private CacheFileWatcher cfw = null;
-	  private boolean hdfsUpdateAllowed=true;
-	  private String processUser=null;
-
-	  
-	  HdfsSink() {
-	  } 
-      
-	  private static final ThreadLocal<HdfsSink> hdfssink = new ThreadLocal<HdfsSink>() {
-	     	  protected HdfsSink initialValue()  {
-	          return new HdfsSink();
-			 }
- 	  };
-	
-	 public static  HdfsSink getInstance() {
-		return hdfssink.get();
-	 }
-  
-	 public void init(String fileSystemName, String fileName, String fileCache,boolean append, boolean bufferedIO, int bufferSize, Layout layout, String encoding, String scheduledCacheFile, Writer cacheWriter, boolean hdfsUpdateAllowed, String processUser) throws Exception{
-		   
-		   this.fsName=fileSystemName;
-		   this.fileName=fileName;
-		   this.layout=layout;
-		   this.encoding=encoding;
-		   this.bufferSize=bufferSize;
-		   this.bufferedIO=bufferedIO;
-		   this.fileCache=fileCache;
-		   this.hdfsUpdateAllowed=hdfsUpdateAllowed;
-		   this.processUser=processUser;
-		   
-		   final Configuration conf= new Configuration();
-      	   conf.set(DS_REPLICATION_KEY,DS_REPLICATION_VAL);
-      	   conf.set(FS_DEFAULT_NAME_KEY, fsName);
-      	   
-           try {
-        	    if ( fs == null) {
-        		 LogLog.debug("Opening Connection to hdfs Sytem" + this.fsName);
-        		         		        		 
-        		 UserGroupInformation ugi = UserGroupInformation.createProxyUser(this.processUser, UserGroupInformation.getLoginUser());
-        		 fs = ugi.doAs( new PrivilegedExceptionAction<FileSystem>() {
-    		    	  public FileSystem run() throws Exception {
-    		    		 FileSystem filesystem = FileSystem.get(conf); 
-    		    		 LogLog.debug("Inside UGI.."  + fsName + " " + filesystem);
-    		    		 return filesystem;
-    		    	 }
-    		     });
-        		 
-        		 if ( cfw == null) {
-      	           // Start the CacheFileWatcher to move the Cache file.
-              	      	   
-      	          LogLog.debug("About to run CacheFilWatcher...");
-      	          Path hdfsfilePath = getParent();
-      	          cfw = new CacheFileWatcher(this.fs,this.fileCache,hdfsfilePath,cacheWriter,this.hdfsUpdateAllowed,conf);
-      	          cfw.start();
-                 }
-        	     
-        	    }
-        	   
-	           } catch(Exception ie) {
-	        	 
-            	 LogLog.error("Unable to Create hdfs logfile:" + ie.getMessage());  
-            	 throw ie;
-	       }
-           
-           LogLog.debug("HdfsSystem up: " + fsName + "FS Object:" + fs);
-	  }
-	  
-	  public int getfstime() {
-		  return fstime;
-	  }
-	  public FileSystem getFileSystem() {
-		  return fs;
-	  }
-	  
-	  public Path getPath() {
-		  return pt;
-	  }
-	  
-	  public Path getParent() {
-		  Path pt = new Path(this.fileName);
-		  return pt.getParent();
-	  }	 
-	  
-	  public void setOsteam() throws IOException {
-	    try {
-	    	  pt = new Path(this.fileName);
-	    	  // if file Exist append it
-	    	  if(fs.exists(pt)) {
-	        	  LogLog.debug("Appending File: "+ this.fsName+":"+this.fileName+fs);  
-		      	  if (hdfsostream !=null) {
-		      		  hdfsostream.close();
-		          }
-		      	  hdfsostream=fs.append(pt);	
-		      	  
-	           } else {
-	        	   LogLog.debug("Creating File directories in hdfs if not present.."+ this.fsName+":"+this.fileName + fs);  
-		           String parentName = new Path(this.fileName).getParent().toString();
-		           if(parentName != null) {
-		              Path parentDir = new Path(parentName);
-		              if (!fs.exists(parentDir) ) {
-		               	 LogLog.debug("Creating Parent Directory: " + parentDir );  
-		               	 fs.mkdirs(parentDir);
-		                 }	
-		             }
-	           	     hdfsostream = fs.create(pt);
-	           }  
-	      }  catch (IOException ie) {
-	    	 LogLog.debug("Error While appending hdfsd file." + ie); 
-             throw ie;
-          }
-	 }
-	 	  
-	  public void setWriter() {
-		  LogLog.debug("Setting Writer..");
-		  hdfswriter = createhdfsWriter(hdfsostream);
-		  if(bufferedIO) {
-			  hdfswriter = new BufferedWriter(hdfswriter, bufferSize);
-		  }		  
-	  }
-	  
-	  public Writer getWriter() {
-		  return hdfswriter;
-	  }	  	  
-	  
-	  public void append(LoggingEvent event) throws IOException { 
-		try {  
-		     LogLog.debug("Writing log to HDFS." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + hdfswriter.toString());
-		    
-			 hdfswriter.write(this.layout.format(event));
-			 hdfswriter.flush();
-		     if(layout.ignoresThrowable()) {
-		      String[] s = event.getThrowableStrRep();
-		      if (s != null) {
-			  int len = s.length;
-			  for(int i = 0; i < len; i++) {
-				  LogLog.debug("Log:" + s[i]);
-				  hdfswriter.write(s[i]);
-				  hdfswriter.write(Layout.LINE_SEP);
-				  hdfswriter.flush();
-				  }
-		       }
-		     }
-            } catch (IOException ie) {
-              LogLog.error("Unable to log event message to hdfs:", ie);  
-              throw ie;
-            }
-       }
-	  
-	   public void writeHeader() throws IOException {
-		 LogLog.debug("Writing log header...");
-		 try {
-		    if(layout != null) {
-		      String h = layout.getHeader();
-		      if(h != null && hdfswriter != null)
-		    	  LogLog.debug("Log header:" + h);
-		    	  hdfswriter.write(h);
-		    	  hdfswriter.flush();
-		      }
-		 } catch (IOException ie) {
-             LogLog.error("Unable to log header message to hdfs:", ie); 
-             throw ie;
-         }
-	   }
-	   
-	   public
-	   void writeFooter() throws IOException{
-		LogLog.debug("Writing footer header...");
-	    try {
-	    	if(layout != null) {
-	    		String f = layout.getFooter();
-	    		if(f != null && hdfswriter != null) {
-	    			LogLog.debug("Log:" + f);
-	    			hdfswriter.write(f);
-	    			hdfswriter.flush();
-	    	    }
-	    	}
-	      } catch (IOException ie) {
-            LogLog.debug("Unable to log header message to hdfs:", ie);  
-            throw ie;
-          }
-	   
-	   }
-
-	   public void closeHdfsOstream() {
-	        try {
-	            if(this.hdfsostream != null) {
-	                this.hdfsostream.close();
-	                this.hdfsostream = null;
-	            }
-	        } catch(IOException ie) {
-	        	 LogLog.error("unable to close output stream", ie);
-	        }
-	        
-	     }
-	   
-	   public void closeHdfsWriter() {
-		  try {
-	            if(hdfswriter != null) {
-	            	hdfswriter.close();
-	            	hdfswriter = null;
-	            }
-	        } catch(IOException ie) {
-	        	 LogLog.error("unable to hfds writer", ie);
-	        }
-	   }
-	   
-	   public void closeHdfsSink() {
-		  try {
-			  if (fs !=null) {
-			  		fs.close();
-			  	}		
-		  	} catch (IOException ie) {
-			 LogLog.error("Unable to close hdfs " + fs ,ie);
-		}
-	   }
-		  
-	
-	  public OutputStreamWriter createhdfsWriter(FSDataOutputStream os ) {
-		    OutputStreamWriter retval = null;
-		  
-		    if(encoding != null) {
-		      try {
-		    	   retval = new OutputStreamWriter(os, encoding);
-		      	   } catch(IOException ie) {
-		      	     LogLog.warn("Error initializing output writer.");
-		      	     LogLog.warn("Unsupported encoding?");
-		      	   }
-		        }
-		     if(retval == null) {
-		        retval = new OutputStreamWriter(os);
-		     }
-		    return retval;
-		   }	  
-	 	  
-	
-  }
-
-
-// CacheFileWatcher Thread 
-
-class CacheFileWatcher extends Thread {
-	
-	long CACHEFILE_WATCHER_SLEEP_TIME = 1000*60*2;
-	
-	Configuration conf = null;
-	private FileSystem fs = null;
-	private String cacheFile = null;
-	private File parentDir = null;
-	private File[] files = null;
-	private Path fsPath = null;
-	private Path hdfsfilePath = null;
-	private Writer cacheWriter = null;
-
-	private boolean hdfsUpdateAllowed=true;
-	private boolean cacheFilesCopied = false;
-	
-	CacheFileWatcher(FileSystem fs, String cacheFile, Path hdfsfilePath, Writer cacheWriter, boolean hdfsUpdateAllowed, Configuration conf) {
-		this.fs = fs;
-		this.cacheFile = cacheFile;
-		this.conf = conf;
-		this.hdfsfilePath = hdfsfilePath;
-		this.cacheWriter = cacheWriter;
-		this.hdfsUpdateAllowed = hdfsUpdateAllowed;
-	}
-	
-
-	public void run(){
-		
-	LogLog.debug("CacheFileWatcher Started");		
-	    
-		while (!cacheFilesCopied ){
-			
-			if (hdfsUpdateAllowed) {
-			   rollRemainingCacheFile();
-			}
-			
-		  	if ( !cacheFilePresent(cacheFile) ) {
-	  		
-				try {
-					Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME);
-				} catch (InterruptedException ie) {
-					LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie);
-				}
-			} else {
-					try {
-						copyCacheFilesToHdfs();
-						if (hdfsUpdateAllowed) {
-						cacheFilesCopied = true;
-						} else {
-						cacheFilesCopied = false;
-						}
-		    	   } catch (Throwable t) {
-			    		// Error While copying the file to hdfs and thread goes for sleep and check later
-			    		cacheFilesCopied = false;
-			    		LogLog. error("Error while copying Cache Files to hdfs..Sleeping for next try",t);		
-			    		
-				    	try {
-				    		Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME);
-				    	} catch (InterruptedException ie) {
-				    		LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie);
-			    	}
-		        }
-	        }
-	    }
-	}
-		
-	public boolean cacheFilePresent(String filename) {
-	    String parent = new File(filename).getParent();
-	    if ( parent != null ) {
-	    	parentDir = new File(parent);
-	    	fsPath = new Path(parent);
-	    	files = parentDir.listFiles(new FilenameFilter() {
-	    	@Override
-	    	     public boolean accept(File parentDir, String name) {
-	    	        return name.matches(".*cache.+");
-	    	    }
-	    	});
-	    	if ( files.length > 0) {
-	    		LogLog.debug("CacheFile Present..");
-	    		return true;
-	    	}
-	    }
-		return false;
-	 }
-	
-
-	public void copyCacheFilesToHdfs() throws Throwable{
-		
-		 try {
-		     
-			 if (!fs.exists(hdfsfilePath) ) {
-			  	 LogLog.debug("Creating Parent Directory: " + hdfsfilePath );  
-			   	 fs.mkdirs(hdfsfilePath);
-			 }	
-		 } catch ( Throwable  t) {
-		   throw t;
-		 }
-		 
-		 
-		 for ( File cacheFile : files) {
-			 try {
-				LogLog.debug("Copying Files..." + "File Path: " + fsPath + "CacheFile: " +cacheFile + "HDFS Path:" + hdfsfilePath);
-				    FileUtil.copy(cacheFile, this.fs, this.hdfsfilePath, true, this.conf);
-			    } catch (Throwable t) {
-			  
-			   throw t;
-			} 
-		 }
-	}
-	
-	public void rollRemainingCacheFile() {
-	  String datePattern = "'.'yyyy-MM-dd-HH-mm";
-	  SimpleDateFormat sdf = new SimpleDateFormat(datePattern); 
-	  if (new File(cacheFile).length() != 0 ) {
-		  long epochNow = System.currentTimeMillis();
-	       
-	      String datedCacheFileName = cacheFile + sdf.format(epochNow);
-	      
-		  LogLog.debug("Rolling over remaining cache File "+ datedCacheFileName);
-		  closeCacheFile();
-		  
-		  File target  = new File(datedCacheFileName);
-	      if (target.exists()) {
-	        target.delete();
-	      }
-	
-	      File file = new File(cacheFile);
-	      
-	      boolean result = file.renameTo(target);
-	      
-	      if(result) {
-	        LogLog.debug(cacheFile +" -> "+ datedCacheFileName);
-	      } else {
-	        LogLog.error("Failed to rename cache file ["+cacheFile+"] to ["+datedCacheFileName+"].");
-	      }
-	     		      
-	  }
-  }
-	
-   public void closeCacheFile() {
-	  try {
-            if(cacheWriter != null) {
-            	cacheWriter.close();
-            	cacheWriter = null;
-            }
-        } catch(IOException ie) {
-        	 LogLog.error("unable to close cache writer", ie);
-        }
-	}
-}
-
-