You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by ma...@apache.org on 2014/12/12 02:30:29 UTC
[46/51] [partial] incubator-ranger git commit: RANGER-194: Rename
packages from xasecure to apache ranger
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java b/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java
deleted file mode 100644
index c64b02d..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package com.xasecure.admin.client;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HTTPSProperties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import com.xasecure.admin.client.datatype.GrantRevokeData;
-import com.xasecure.admin.client.datatype.RESTResponse;
-import com.xasecure.authorization.utils.StringUtil;
-import com.xasecure.authorization.hadoop.config.XaSecureConfiguration;
-import com.xasecure.authorization.hadoop.utils.XaSecureCredentialProvider;
-
-
-public class XaAdminRESTClient implements XaAdminClient {
- private static final Log LOG = LogFactory.getLog(XaAdminRESTClient.class);
-
- public static final String XASECURE_PROP_POLICYMGR_URL = "xasecure.policymgr.url";
- public static final String XASECURE_PROP_POLICYMGR_SSLCONFIG_FILENAME = "xasecure.policymgr.sslconfig.filename";
-
- public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE = "xasecure.policymgr.clientssl.keystore";
- public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_PASSWORD = "xasecure.policymgr.clientssl.keystore.password";
- public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE = "xasecure.policymgr.clientssl.keystore.type";
- public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL = "xasecure.policymgr.clientssl.keystore.credential.file";
- public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL_ALIAS = "sslKeyStore";
- public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE_DEFAULT = "jks";
-
- public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE = "xasecure.policymgr.clientssl.truststore";
- public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_PASSWORD = "xasecure.policymgr.clientssl.truststore.password";
- public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE = "xasecure.policymgr.clientssl.truststore.type";
- public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL = "xasecure.policymgr.clientssl.truststore.credential.file";
- public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL_ALIAS = "sslTrustStore";
- public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE_DEFAULT = "jks";
-
- public static final String XASECURE_SSL_KEYMANAGER_ALGO_TYPE = "SunX509" ;
- public static final String XASECURE_SSL_TRUSTMANAGER_ALGO_TYPE = "SunX509" ;
- public static final String XASECURE_SSL_CONTEXT_ALGO_TYPE = "SSL" ;
-
- public static final String REST_EXPECTED_MIME_TYPE = "application/json" ;
-
- private static final String REST_URL_PATH_POLICYLIST = "/service/assets/policyList/";
- private static final String REST_URL_PATH_GRANT = "/service/assets/resources/grant";
- private static final String REST_URL_PATH_REVOKE = "/service/assets/resources/revoke";
- private static final String REST_URL_PARAM_LASTUPDATED_TIME = "epoch";
- private static final String REST_URL_PARAM_POLICY_COUNT = "policyCount";
- private static final String REST_URL_PARAM_AGENT_NAME = "agentId";
-
- private String mUrl = null;
- private String mSslConfigFileName = null;
- private boolean mIsSSL = false;
-
- private String mKeyStoreURL = null;
- private String mKeyStoreAlias = null;
- private String mKeyStoreFile = null;
- private String mKeyStoreType = null;
- private String mTrustStoreURL = null;
- private String mTrustStoreAlias = null;
- private String mTrustStoreFile = null;
- private String mTrustStoreType = null;
-
-
- public XaAdminRESTClient() {
- mUrl = XaSecureConfiguration.getInstance().get(XASECURE_PROP_POLICYMGR_URL);
- mSslConfigFileName = XaSecureConfiguration.getInstance().get(XASECURE_PROP_POLICYMGR_SSLCONFIG_FILENAME);
-
- init();
- }
-
- public XaAdminRESTClient(String url, String sslConfigFileName) {
- mUrl = url;
- mSslConfigFileName = sslConfigFileName;
-
- init();
- }
-
- @Override
- public String getPolicies(String repositoryName, long lastModifiedTime, int policyCount, String agentName) {
- String ret = null;
- Client client = null;
-
- try {
- client = buildClient();
-
- WebResource webResource = client.resource(mUrl + REST_URL_PATH_POLICYLIST + repositoryName)
- .queryParam(REST_URL_PARAM_LASTUPDATED_TIME, String.valueOf(lastModifiedTime))
- .queryParam(REST_URL_PARAM_POLICY_COUNT, String.valueOf(policyCount))
- .queryParam(REST_URL_PARAM_AGENT_NAME, agentName);
-
- ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).get(ClientResponse.class);
-
- if(response != null && response.getStatus() == 200) {
- ret = response.getEntity(String.class);
- }
- } finally {
- destroy(client);
- }
-
- return ret;
- }
-
- @Override
- public void grantPrivilege(GrantRevokeData grData) throws Exception {
- Client client = null;
-
- try {
- client = buildClient();
-
- WebResource webResource = client.resource(mUrl + REST_URL_PATH_GRANT);
-
- ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).type(REST_EXPECTED_MIME_TYPE).post(ClientResponse.class, grData.toJson());
-
- if(response == null || response.getStatus() != 200) {
- RESTResponse resp = RESTResponse.fromClientResponse(response);
-
- throw new Exception(resp.getMessage());
- }
- } finally {
- destroy(client);
- }
- }
-
- @Override
- public void revokePrivilege(GrantRevokeData grData) throws Exception {
- Client client = null;
-
- try {
- client = buildClient();
-
- WebResource webResource = client.resource(mUrl + REST_URL_PATH_REVOKE);
-
- ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).type(REST_EXPECTED_MIME_TYPE).post(ClientResponse.class, grData.toJson());
-
- if(response == null || response.getStatus() != 200) {
- RESTResponse resp = RESTResponse.fromClientResponse(response);
-
- throw new Exception(resp.getMessage());
- }
- } finally {
- destroy(client);
- }
- }
- private void init() {
- mIsSSL = StringUtil.containsIgnoreCase(mUrl, "https");
-
- InputStream in = null ;
-
- try {
- Configuration conf = new Configuration() ;
-
- in = getFileInputStream(mSslConfigFileName) ;
-
- if (in != null) {
- conf.addResource(in);
- }
-
- mKeyStoreURL = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL);
- mKeyStoreAlias = XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL_ALIAS;
- mKeyStoreType = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE, XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE_DEFAULT);
- mKeyStoreFile = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE);
-
- mTrustStoreURL = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL);
- mTrustStoreAlias = XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL_ALIAS;
- mTrustStoreType = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE, XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE_DEFAULT);
- mTrustStoreFile = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE);
- }
- catch(IOException ioe) {
- LOG.error("Unable to load SSL Config FileName: [" + mSslConfigFileName + "]", ioe);
- }
- finally {
- close(in, mSslConfigFileName);
- }
- }
-
- private synchronized Client buildClient() {
- Client client = null;
-
- if (mIsSSL) {
- KeyManager[] kmList = getKeyManagers();
- TrustManager[] tmList = getTrustManagers();
- SSLContext sslContext = getSSLContext(kmList, tmList);
- ClientConfig config = new DefaultClientConfig();
-
- HostnameVerifier hv = new HostnameVerifier() {
- public boolean verify(String urlHostName, SSLSession session) {
- return session.getPeerHost().equals(urlHostName);
- }
- };
-
- config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, new HTTPSProperties(hv, sslContext));
-
- client = Client.create(config);
- }
-
- if(client == null) {
- client = Client.create();
- }
-
- return client;
- }
-
- private KeyManager[] getKeyManagers() {
- KeyManager[] kmList = null;
-
- String keyStoreFilepwd = getCredential(mKeyStoreURL, mKeyStoreAlias);
-
- if (!StringUtil.isEmpty(mKeyStoreFile) && !StringUtil.isEmpty(keyStoreFilepwd)) {
- InputStream in = null ;
-
- try {
- in = getFileInputStream(mKeyStoreFile) ;
-
- if (in != null) {
- KeyStore keyStore = KeyStore.getInstance(mKeyStoreType);
-
- keyStore.load(in, keyStoreFilepwd.toCharArray());
-
- KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(XASECURE_SSL_KEYMANAGER_ALGO_TYPE);
-
- keyManagerFactory.init(keyStore, keyStoreFilepwd.toCharArray());
-
- kmList = keyManagerFactory.getKeyManagers();
- } else {
- LOG.error("Unable to obtain keystore from file [" + mKeyStoreFile + "]");
- }
- } catch (KeyStoreException e) {
- LOG.error("Unable to obtain from KeyStore", e);
- } catch (NoSuchAlgorithmException e) {
- LOG.error("SSL algorithm is available in the environment", e);
- } catch (CertificateException e) {
- LOG.error("Unable to obtain the requested certification ", e);
- } catch (FileNotFoundException e) {
- LOG.error("Unable to find the necessary SSL Keystore and TrustStore Files", e);
- } catch (IOException e) {
- LOG.error("Unable to read the necessary SSL Keystore and TrustStore Files", e);
- } catch (UnrecoverableKeyException e) {
- LOG.error("Unable to recover the key from keystore", e);
- } finally {
- close(in, mKeyStoreFile);
- }
- }
-
- return kmList;
- }
-
- private TrustManager[] getTrustManagers() {
- TrustManager[] tmList = null;
-
- String trustStoreFilepwd = getCredential(mTrustStoreURL, mTrustStoreAlias);
-
- if (!StringUtil.isEmpty(mTrustStoreFile) && !StringUtil.isEmpty(trustStoreFilepwd)) {
- InputStream in = null ;
-
- try {
- in = getFileInputStream(mTrustStoreFile) ;
-
- if (in != null) {
- KeyStore trustStore = KeyStore.getInstance(mTrustStoreType);
-
- trustStore.load(in, trustStoreFilepwd.toCharArray());
-
- TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(XASECURE_SSL_TRUSTMANAGER_ALGO_TYPE);
-
- trustManagerFactory.init(trustStore);
-
- tmList = trustManagerFactory.getTrustManagers();
- } else {
- LOG.error("Unable to obtain keystore from file [" + mTrustStoreFile + "]");
- }
- } catch (KeyStoreException e) {
- LOG.error("Unable to obtain from KeyStore", e);
- } catch (NoSuchAlgorithmException e) {
- LOG.error("SSL algorithm is available in the environment", e);
- } catch (CertificateException e) {
- LOG.error("Unable to obtain the requested certification ", e);
- } catch (FileNotFoundException e) {
- LOG.error("Unable to find the necessary SSL Keystore and TrustStore Files", e);
- } catch (IOException e) {
- LOG.error("Unable to read the necessary SSL Keystore and TrustStore Files", e);
- } finally {
- close(in, mTrustStoreFile);
- }
- }
-
- return tmList;
- }
-
- private SSLContext getSSLContext(KeyManager[] kmList, TrustManager[] tmList) {
- try {
- if(kmList != null && tmList != null) {
- SSLContext sslContext = SSLContext.getInstance(XASECURE_SSL_CONTEXT_ALGO_TYPE);
-
- sslContext.init(kmList, tmList, new SecureRandom());
-
- return sslContext;
- }
- } catch (NoSuchAlgorithmException e) {
- LOG.error("SSL algorithm is available in the environment", e);
- } catch (KeyManagementException e) {
- LOG.error("Unable to initials the SSLContext", e);
- }
-
- return null;
- }
-
- private String getCredential(String url, String alias) {
- char[] credStr = XaSecureCredentialProvider.getInstance().getCredentialString(url, alias);
-
- return credStr == null ? null : new String(credStr);
- }
-
- private InputStream getFileInputStream(String fileName) throws IOException {
- InputStream in = null ;
-
- if(! StringUtil.isEmpty(fileName)) {
- File f = new File(fileName) ;
-
- if (f.exists()) {
- in = new FileInputStream(f) ;
- }
- else {
- in = ClassLoader.getSystemResourceAsStream(fileName) ;
- }
- }
-
- return in ;
- }
-
- private void close(InputStream str, String filename) {
- if (str != null) {
- try {
- str.close() ;
- } catch (IOException excp) {
- LOG.error("Error while closing file: [" + filename + "]", excp) ;
- }
- }
- }
-
- private void destroy(Client client) {
- if(client != null) {
- client.destroy();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java b/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java
deleted file mode 100644
index b5228db..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package com.xasecure.admin.client.datatype;
-
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.xasecure.authorization.utils.StringUtil;
-
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class GrantRevokeData implements java.io.Serializable {
- private static final long serialVersionUID = 1L;
-
- private String grantor;
- private String repositoryName;
- private String repositoryType;
- private String databases;
- private String tables;
- private String columns;
- private String columnFamilies;
- private boolean isEnabled;
- private boolean isAuditEnabled;
- private boolean replacePerm;
- private List<PermMap> permMapList = new ArrayList<PermMap>();
-
- private static String WILDCARD_ASTERISK = "*";
-
-
- public GrantRevokeData() {
- }
-
- public String getGrantor() {
- return grantor;
- }
-
- public void setGrantor(String grantor) {
- this.grantor = grantor;
- }
-
- public String getRepositoryName() {
- return repositoryName;
- }
-
- public void setRepositoryName(String repositoryName) {
- this.repositoryName = repositoryName;
- }
-
- public String getRepositoryType() {
- return repositoryType;
- }
-
- public void setRepositoryType(String repositoryType) {
- this.repositoryType = repositoryType;
- }
-
- public String getDatabases() {
- return databases;
- }
-
- public void setDatabases(String databases) {
- this.databases = databases;
- }
-
- public String getTables() {
- return tables;
- }
-
- public void setTables(String tables) {
- this.tables = tables;
- }
-
- public String getColumns() {
- return columns;
- }
-
- public void setColumns(String columns) {
- this.columns = columns;
- }
-
- public String getColumnFamilies() {
- return columnFamilies;
- }
-
- public void setColumnFamilies(String columnFamilies) {
- this.columnFamilies = columnFamilies;
- }
-
- public List<PermMap> getPermMapList() {
- return permMapList;
- }
-
- public void setPermMapList(List<PermMap> permMapList) {
- this.permMapList = permMapList;
- }
-
-
- public void setHiveData(String grantor,
- String repositoryName,
- String databases,
- String tables,
- String columns,
- PermMap permMap) {
- this.grantor = grantor;
- this.repositoryName = repositoryName;
- this.repositoryType = "hive";
- this.databases = StringUtil.isEmpty(databases) ? WILDCARD_ASTERISK : databases;
- this.tables = StringUtil.isEmpty(tables) ? WILDCARD_ASTERISK : tables;
- this.columns = StringUtil.isEmpty(columns) ? WILDCARD_ASTERISK : columns;
- this.isAuditEnabled = true;
- this.isEnabled = true;
- this.replacePerm = false;
- this.permMapList.add(permMap);
- }
-
- public void setHBaseData(String grantor,
- String repositoryName,
- String tables,
- String columns,
- String columnFamilies,
- PermMap permMap) {
- this.grantor = grantor;
- this.repositoryName = repositoryName;
- this.repositoryType = "hbase";
- this.tables = StringUtil.isEmpty(tables) ? WILDCARD_ASTERISK : tables;
- this.columns = StringUtil.isEmpty(columns) ? WILDCARD_ASTERISK : columns;
- this.columnFamilies = StringUtil.isEmpty(columnFamilies) ? WILDCARD_ASTERISK : columnFamilies;
- this.isAuditEnabled = true;
- this.isEnabled = true;
- this.replacePerm = true;
- this.permMapList.add(permMap);
- }
-
- public String toJson() {
- try {
- ObjectMapper om = new ObjectMapper();
-
- return om.writeValueAsString(this);
- } catch (JsonGenerationException e) {
- e.printStackTrace();
- } catch (JsonMappingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return "";
- }
-
- @Override
- public String toString() {
- return toJson();
- }
-
-
- @JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
- @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
- @JsonIgnoreProperties(ignoreUnknown = true)
- public static class PermMap {
- private List<String> userList = new ArrayList<String>();
- private List<String> groupList = new ArrayList<String>();
- private List<String> permList = new ArrayList<String>();
-
- public PermMap() {
- }
-
- public PermMap(String user, String group, String perm) {
- addUser(user);
- addGroup(group);
- addPerm(perm);
- }
-
- public PermMap(List<String> userList, List<String> groupList, List<String> permList) {
- copyList(userList, this.userList);
- copyList(groupList, this.groupList);
- copyList(permList, this.permList);
- }
-
- public List<String> getUserList() {
- return userList;
- }
-
- public List<String> getGroupList() {
- return groupList;
- }
-
- public List<String> getPermList() {
- return permList;
- }
-
- public void addUser(String user) {
- addToList(user, userList);
- }
-
- public void addGroup(String group) {
- addToList(group, groupList);
- }
-
- public void addPerm(String perm) {
- addToList(perm, permList);
- }
-
- private void addToList(String str, List<String> list) {
- if(list != null && !StringUtil.isEmpty(str)) {
- list.add(str);
- }
- }
-
- private void copyList(List<String> fromList, List<String> toList) {
- if(fromList != null && toList != null) {
- for(String str : fromList) {
- addToList(str, toList);
- }
- }
- }
-
- public String toJson() {
- try {
- ObjectMapper om = new ObjectMapper();
-
- return om.writeValueAsString(this);
- } catch (JsonGenerationException e) {
- e.printStackTrace();
- } catch (JsonMappingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return "";
- }
-
- @Override
- public String toString() {
- return toJson();
- }
- }
-
- public static void main(String[] args) {
- GrantRevokeData grData = new GrantRevokeData();
-
- System.out.println(grData.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java b/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java
deleted file mode 100644
index 389b547..0000000
--- a/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xasecure.admin.client.datatype;
-
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import com.sun.jersey.api.client.ClientResponse;
-import com.xasecure.authorization.utils.StringUtil;
-
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class RESTResponse {
- private static Logger LOG = Logger.getLogger(RESTResponse.class);
-
- private int httpStatusCode;
- private int statusCode;
- private String msgDesc;
- private List<Message> messageList;
-
-
- public int getHttpStatusCode() {
- return httpStatusCode;
- }
-
- public void setHttpStatusCode(int httpStatusCode) {
- this.httpStatusCode = httpStatusCode;
- }
-
- public int getStatusCode() {
- return statusCode;
- }
-
- public void setStatusCode(int statusCode) {
- this.statusCode = statusCode;
- }
-
- public String getMsgDesc() {
- return msgDesc;
- }
-
- public void setMsgDesc(String msgDesc) {
- this.msgDesc = msgDesc;
- }
-
- public List<Message> getMessageList() {
- return messageList;
- }
-
- public void setMessageList(List<Message> messageList) {
- this.messageList = messageList;
- }
-
- public String getMessage() {
- return StringUtil.isEmpty(msgDesc) ? ("HTTP " + httpStatusCode) : msgDesc;
- }
-
- public static RESTResponse fromClientResponse(ClientResponse response) {
- RESTResponse ret = null;
-
- String jsonString = response == null ? null : response.getEntity(String.class);
- int httpStatus = response == null ? 0 : response.getStatus();
-
- if(! StringUtil.isEmpty(jsonString)) {
- ret = RESTResponse.fromJson(jsonString);
- }
-
- if(ret == null) {
- ret = new RESTResponse();
- }
-
- ret.setHttpStatusCode(httpStatus);
-
- return ret;
- }
-
- public String toJson() {
- try {
- ObjectMapper om = new ObjectMapper();
-
- return om.writeValueAsString(this);
- } catch (Exception e) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("toJson() failed", e);
- }
- }
-
- return "";
- }
-
- public static RESTResponse fromJson(String jsonString) {
- try {
- ObjectMapper om = new ObjectMapper();
-
- return om.readValue(jsonString, RESTResponse.class);
- } catch (Exception e) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("fromJson('" + jsonString + "') failed", e);
- }
- }
-
- return null;
- }
-
- @Override
- public String toString() {
- return toJson();
- }
-
- @JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
- @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
- @JsonIgnoreProperties(ignoreUnknown = true)
- public static class Message {
- private String name;
- private String rbKey;
- private String message;
- private Long objectId;
- private String fieldName;
-
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public String getRbKey() {
- return rbKey;
- }
- public void setRbKey(String rbKey) {
- this.rbKey = rbKey;
- }
- public String getMessage() {
- return message;
- }
- public void setMessage(String message) {
- this.message = message;
- }
- public Long getObjectId() {
- return objectId;
- }
- public void setObjectId(Long objectId) {
- this.objectId = objectId;
- }
- public String getFieldName() {
- return fieldName;
- }
- public void setFieldName(String fieldName) {
- this.fieldName = fieldName;
- }
-
- public String toJson() {
- try {
- ObjectMapper om = new ObjectMapper();
-
- return om.writeValueAsString(this);
- } catch (Exception e) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("toJson() failed", e);
- }
- }
-
- return "";
- }
-
- public static RESTResponse fromJson(String jsonString) {
- try {
- ObjectMapper om = new ObjectMapper();
-
- return om.readValue(jsonString, RESTResponse.class);
- } catch (Exception e) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("fromJson('" + jsonString + "') failed", e);
- }
- }
-
- return null;
- }
-
- @Override
- public String toString() {
- return toJson();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java
deleted file mode 100644
index f32c5b7..0000000
--- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package com.xasecure.authorization.hadoop.config;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-
-import com.xasecure.audit.provider.AuditProviderFactory;
-import com.xasecure.authorization.hadoop.constants.XaSecureHadoopConstants;
-
-public class XaSecureConfiguration extends Configuration {
-
- private static final Logger LOG = Logger.getLogger(XaSecureConfiguration.class) ;
-
- private static XaSecureConfiguration config = null;
-
- private XaSecureConfiguration() {
- super(false) ;
-
- //
- // WorkAround for having all Hadoop Configuration in the CLASSPATH first, even if it is invoked by Hive Engine.
- //
- // So, we look for "hive-site.xml", if it is available, take the xasecure-audit.xml file from the same location.
- // If we do not see "hive-site.xml", we look for "hbase-site.xml", if found, take the xasecure-audit.xml file from the same location.
- // If we do not see "hbase-site.xml", we look for "hdfs-site.xml", if found, take the xasecure-audit.xml file from the same location.
- // If we do not see, we let the CLASSPATH based search to find xasecure-audit.xml file.
-
-
- URL auditFileLocation = getXAAuditXMLFileLocation() ;
-
- if (auditFileLocation != null) {
- addResource(auditFileLocation) ;
- }
- else {
- addResourceIfReadable(XaSecureHadoopConstants.XASECURE_AUDIT_FILE) ;
- }
- addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HDFS_SECURITY_FILE);
- addResourceIfReadable(XaSecureHadoopConstants.XASECURE_KNOX_SECURITY_FILE);
- addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HBASE_SECURITY_FILE) ;
- addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HIVE_SECURITY_FILE) ;
- addResourceIfReadable(XaSecureHadoopConstants.XASECURE_STORM_SECURITY_FILE);
-
- }
-
- @SuppressWarnings("deprecation")
- private void addResourceIfReadable(String aResourceName) {
- String fName = getFileLocation(aResourceName) ;
- if (fName != null) {
- File f = new File(fName) ;
- if (f.exists() && f.canRead()) {
- URL fUrl = null ;
- try {
- fUrl = f.toURL() ;
- addResource(fUrl) ;
- } catch (MalformedURLException e) {
- LOG.debug("Unable to find URL for the resource name [" + aResourceName +"]. Ignoring the resource:" + aResourceName);
- }
- }
- }
- }
-
-
- public static XaSecureConfiguration getInstance() {
- if (config == null) {
- synchronized (XaSecureConfiguration.class) {
- XaSecureConfiguration temp = config;
- if (temp == null) {
- config = new XaSecureConfiguration();
- }
- }
- }
- return config;
- }
-
- public void initAudit(AuditProviderFactory.ApplicationType appType) {
- AuditProviderFactory auditFactory = AuditProviderFactory.getInstance();
-
- if(auditFactory == null) {
- LOG.error("Unable to find the AuditProviderFactory. (null) found") ;
- return;
- }
-
- Properties props = getProps();
-
- if(props == null) {
- return;
- }
-
- if(! auditFactory.isInitDone()) {
- auditFactory.init(props, appType);
- }
- }
-
- public boolean isAuditInitDone() {
- AuditProviderFactory auditFactory = AuditProviderFactory.getInstance();
-
- return auditFactory != null && auditFactory.isInitDone();
- }
-
-
- @SuppressWarnings("deprecation")
- public URL getXAAuditXMLFileLocation() {
- URL ret = null ;
-
- try {
- for(String cfgFile : new String[] { "hive-site.xml", "hbase-site.xml", "hdfs-site.xml" } ) {
- String loc = getFileLocation(cfgFile) ;
- if (loc != null) {
- if (new File(loc).canRead()) {
- File parentFile = new File(loc).getParentFile() ;
- ret = new File(parentFile, XaSecureHadoopConstants.XASECURE_AUDIT_FILE).toURL() ;
- break ;
- }
- }
- }
- }
- catch(Throwable t) {
- LOG.error("Unable to locate audit file location." , t) ;
- ret = null ;
- }
-
- return ret ;
- }
-
- private String getFileLocation(String fileName) {
-
- String ret = null ;
-
- URL lurl = XaSecureConfiguration.class.getClassLoader().getResource(fileName) ;
-
- if (lurl == null ) {
- lurl = XaSecureConfiguration.class.getClassLoader().getResource("/" + fileName) ;
- }
-
- if (lurl != null) {
- ret = lurl.getFile() ;
- }
-
- return ret ;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java
deleted file mode 100644
index 64794d1..0000000
--- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.xasecure.authorization.hadoop.constants;
-
-public class XaSecureHadoopConstants {
-
- public static final String XASECURE_AUDIT_FILE = "xasecure-audit.xml" ;
- public static final String XASECURE_HDFS_SECURITY_FILE = "xasecure-hdfs-security.xml" ;
- public static final String XASECURE_KNOX_SECURITY_FILE = "xasecure-knox-security.xml" ;
- public static final String XASECURE_HBASE_SECURITY_FILE = "xasecure-hbase-security.xml" ;
- public static final String XASECURE_HIVE_SECURITY_FILE = "xasecure-hive-security.xml" ;
- public static final String XASECURE_POLICYMGR_SSL_FILE = "xasecure-policymgr-ssl.xml" ;
- public static final String XASECURE_STORM_SECURITY_FILE = "xasecure-storm-security.xml" ;
-
- public static final String XASECURE_ADD_HDFS_PERMISSION_PROP = "xasecure.add-hadoop-authorization" ;
- public static final boolean XASECURE_ADD_HDFS_PERMISSION_DEFAULT = false ;
- public static final String READ_ACCCESS_TYPE = "read";
- public static final String WRITE_ACCCESS_TYPE = "write";
- public static final String EXECUTE_ACCCESS_TYPE = "execute";
-
- public static final String HDFS_ROOT_FOLDER_PATH_ALT = "";
- public static final String HDFS_ROOT_FOLDER_PATH = "/";
-
- public static final String HDFS_ACCESS_VERIFIER_CLASS_NAME_PROP = "hdfs.authorization.verifier.classname" ;
- public static final String HDFS_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hdfs.XASecureAuthorizer" ;
-
- public static final String HIVE_ACCESS_VERIFIER_CLASS_NAME_PROP = "hive.authorization.verifier.classname" ;
- public static final String HIVE_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hive.XASecureAuthorizer" ;
-
- public static final String HIVE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_PROP = "xasecure.hive.update.xapolicies.on.grant.revoke" ;
- public static final boolean HIVE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE = true;
-
- public static final String HBASE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_PROP = "xasecure.hbase.update.xapolicies.on.grant.revoke" ;
- public static final boolean HBASE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE = true;
-
- public static final String KNOX_ACCESS_VERIFIER_CLASS_NAME_PROP = "knox.authorization.verifier.classname" ;
- public static final String KNOX_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.knox.XASecureAuthorizer" ;
-
- public static final String HBASE_ACCESS_VERIFIER_CLASS_NAME_PROP = "hbase.authorization.verifier.classname" ;
- public static final String HBASE_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hbase.XASecureAuthorizer" ;
-
- public static final String STORM_ACCESS_VERIFIER_CLASS_NAME_PROP = "storm.authorization.verifier.classname" ;
- public static final String STORM_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.storm.XASecureAuthorizer" ;
-
- //
- // Loging constants
- //
- public static final String AUDITLOG_FIELD_DELIMITER_PROP = "xasecure.auditlog.fieldDelimiterString";
- public static final String AUDITLOG_XASECURE_MODULE_ACL_NAME_PROP = "xasecure.auditlog.xasecureAcl.name" ;
- public static final String AUDITLOG_HADOOP_MODULE_ACL_NAME_PROP = "xasecure.auditlog.hadoopAcl.name" ;
-
- public static final String DEFAULT_LOG_FIELD_DELIMITOR = "|" ;
- public static final String DEFAULT_XASECURE_MODULE_ACL_NAME = "xasecure-acl" ;
- public static final String DEFAULT_HADOOP_MODULE_ACL_NAME = "hadoop-acl" ;
-
-
- public static final String AUDITLOG_FIELDINFO_VISIBLE_PROP = "xasecure.auditlog.fieldInfoVisible" ;
- public static final boolean DEFAULT_AUDITLOG_FIELDINFO_VISIBLE = false ;
-
- public static final String AUDITLOG_ACCESS_GRANTED_TEXT_PROP = "xasecure.auditlog.accessgranted.text" ;
- public static final String AUDITLOG_ACCESS_DENIED_TEXT_PROP = "xasecure.auditlog.accessdenied.text" ;
-
- public static final String DEFAULT_ACCESS_GRANTED_TEXT = "granted" ;
- public static final String DEFAULT_ACCESS_DENIED_TEXT = "denied" ;
-
- public static final String AUDITLOG_EMPTY_STRING = "" ;
-
- public static final String AUDITLOG_HDFS_EXCLUDE_LIST_PROP = "xasecure.auditlog.hdfs.excludeusers" ;
- public static final String AUDITLOG_REPOSITORY_NAME_PROP = "xasecure.audit.repository.name" ;
- public static final String AUDITLOG_IS_ENABLED_PROP = "xasecure.audit.is.enabled" ;
-
- public static final String KEYMGR_URL_PROP = "hdfs.keymanager.url" ;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
----------------------------------------------------------------------
diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
deleted file mode 100644
index 973897f..0000000
--- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java
+++ /dev/null
@@ -1,1376 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xasecure.authorization.hadoop.log;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.Locale;
-import java.util.TimeZone;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.FileAppender;
-import org.apache.log4j.Layout;
-import org.apache.log4j.spi.LoggingEvent;
-import org.apache.log4j.helpers.LogLog;
-
-
-/********************************
-* HdfsFileAppender
-*
-**********************************/
-public class HdfsFileAppender extends FileAppender {
-
-
- // Constants for checking the DatePattern
- public static final String MINUTES ="Min";
- public static final String HOURS ="Hr";
- public static final String DAYS ="Day";
- public static final String WEEKS ="Week";
- public static final String MONTHS ="Month";
-
- // The code assumes that the following constants are in a increasing sequence.
- public static final int TOP_OF_TROUBLE = -1;
- public static final int TOP_OF_MINUTE = 0;
- public static final int TOP_OF_HOUR = 1;
- public static final int HALF_DAY = 2;
- public static final int TOP_OF_DAY = 3;
- public static final int TOP_OF_WEEK = 4;
- public static final int TOP_OF_MONTH = 5;
-
- /**
- * The date pattern. By default, the pattern is set to "1Day" meaning daily rollover.
- */
- private String hdfsFileRollingInterval = "1Day";
-
- private String fileRollingInterval = "1Day";
-
-
- private String scheduledFileCache;
-
- /**
- * The next time we estimate a rollover should occur.
- */
- private long nextCheck = System.currentTimeMillis() - 1;
-
- private long prevnextCheck = nextCheck;
-
- private long nextCheckLocal = System.currentTimeMillis() -1;
-
- private long prevnextCheckLocal = nextCheckLocal;
-
- private Date now = new Date();
-
- private Date nowLocal = now;
-
- private SimpleDateFormat sdf;
-
- private RollingCalendar rc = new RollingCalendar();
-
- private RollingCalendar rcLocal = new RollingCalendar();
-
- private FileOutputStream ostream = null;
-
- private String fileSystemName;
-
- private Layout layout = null;
-
- private String encoding = null;
-
- private String hdfsfileName = null;
-
- private String actualHdfsfileName = null;
-
- private String scheduledHdfsFileName = null;
-
- private String fileCache = null;
-
- private HdfsSink hs = null;
-
- private Writer cacheWriter = null;
-
- private FileOutputStream cacheOstream = null;
-
- private boolean hdfsAvailable = false;
-
- private long hdfsNextCheck = System.currentTimeMillis() - 1;
-
- private boolean timeCheck = false;
-
- private int hdfsFileRollOffset = 0;
-
- private int fileRollOffset = 0;
-
- private boolean firstTime = true;
-
- private boolean firstTimeLocal = true;
-
- private String hdfsLiveUpdate = "true";
-
- boolean hdfsUpdateAllowed = true;
-
- private String hdfsCheckInterval=null;
-
- private String processUser = null;
-
- private String datePattern = "'.'yyyy-MM-dd-HH-mm";
-
- /**
- * The gmtTimeZone is used only in computeCheckPeriod() method.
- */
- private static final TimeZone gmtTimeZone = TimeZone.getTimeZone("GMT+0");
-
- private static final String DEFAULT_HDFSCHECKINTERVAL = "2min";
-
- /**
- * The default constructor does nothing.
- */
- public HdfsFileAppender() {
-
- }
-
-
- /**
- * The <b>hdfsFileRollingInterval</b> takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for hdfs File rollover schedule.
- */
- public void setHdfsFileRollingInterval(String pattern) {
- hdfsFileRollingInterval = pattern;
- }
-
- /** Returns the value of the <b>hdfsFileRollingInterval</b> option. */
- public String getHdfsFileRollingInterval() {
- return hdfsFileRollingInterval;
- }
-
- /**
- * The <b>LocalDatePattern</b> takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for local cache File rollover schedule.
- */
- public void setFileRollingInterval(String pattern) {
- fileRollingInterval = pattern;
- }
-
- /** Returns the value of the <b>FileRollingInterval</b> option. */
- public String getFileRollingInterval() {
- return fileRollingInterval;
- }
-
- /**
- * This will set liveHdfsUpdate flag , where true will update hdfs live else false will create local cache files and copy the files to hdfs
- */
- public void setHdfsLiveUpdate(String val) {
- hdfsLiveUpdate=val;
- }
-
- /** Returns the value of the <b>FileRollingInterval</b> option. */
- public String getHdfsLiveUpdate() {
- return hdfsLiveUpdate;
- }
-
- public String getHdfsCheckInterval() {
- return hdfsCheckInterval;
- }
-
- public void setHdfsCheckInterval(String val){
- hdfsCheckInterval = ( hdfsCheckInterval != null) ? val : DEFAULT_HDFSCHECKINTERVAL;
- }
-
- public String getEncoding() {
- return encoding;
- }
-
- public void setEncoding(String value) {
- encoding = value;
- }
-
- public void activateOptions() {
- super.activateOptions();
-
- sdf = new SimpleDateFormat(datePattern);
- processUser=System.getProperties().getProperty("user.name");
-
- if(hdfsFileRollingInterval != null && fileName != null) {
- now.setTime(System.currentTimeMillis());
- int type = computeCheckPeriod(hdfsFileRollingInterval);
- hdfsFileRollOffset = getTimeOffset(hdfsFileRollingInterval);
- printHdfsPeriodicity(type,hdfsFileRollOffset);
- rc.setType(type);
- LogLog.debug("File name: " + fileName);
- File file = new File(fileName);
- scheduledHdfsFileName = hdfsfileName+sdf.format(new Date(file.lastModified()));
- firstTime = true;
- LogLog.debug("Local and hdfs Files" + scheduledHdfsFileName + " " +scheduledHdfsFileName) ;
-
- } else {
- LogLog.error("Either File or hdfsFileRollingInterval options are not set for appender [" + name + "].");
- }
-
- // Local Cache File
-
- if (fileRollingInterval != null && fileCache != null){
- nowLocal.setTime(System.currentTimeMillis());
- int localtype = computeCheckPeriod(fileRollingInterval);
- fileRollOffset = getTimeOffset(fileRollingInterval);
- printLocalPeriodicity(localtype,fileRollOffset);
- rcLocal.setType(localtype);
- LogLog.debug("LocalCacheFile name: " + fileCache);
- File fileCachehandle = new File(fileCache);
- scheduledFileCache = fileCache+sdf.format(new Date(fileCachehandle.lastModified()));
- firstTimeLocal = true;
-
- } else {
- LogLog.error("Either File or LocalDatePattern options are not set for appender [" + name + "].");
- }
-
- hdfsUpdateAllowed = Boolean.parseBoolean(hdfsLiveUpdate);
- actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
- }
-
- public static int containsIgnoreCase(String str1, String str2) {
- return str1.toLowerCase().indexOf(str2.toLowerCase());
- }
-
-
- public int computeCheckPeriod(String timePattern){
-
- if(containsIgnoreCase(timePattern, MINUTES) > 0) {
- return TOP_OF_MINUTE;
- }
-
- if(containsIgnoreCase(timePattern, HOURS) > 0) {
- return TOP_OF_HOUR;
- }
-
- if(containsIgnoreCase(timePattern, DAYS) > 0) {
- return TOP_OF_DAY;
- }
-
- if(containsIgnoreCase(timePattern, WEEKS) > 0) {
- return TOP_OF_WEEK;
- }
-
- if(containsIgnoreCase(timePattern, MONTHS) > 0) {
- return TOP_OF_MONTH;
- }
-
- return TOP_OF_TROUBLE;
- }
-
-
- private void printHdfsPeriodicity(int type, int offset) {
- switch(type) {
- case TOP_OF_MINUTE:
- LogLog.debug("Appender [" + name + "] to be rolled every " + offset + " minute.");
- break;
- case TOP_OF_HOUR:
- LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset + " hour.");
- break;
- case HALF_DAY:
- LogLog.debug("Appender [" + name + "] to be rolled at midday and midnight.");
- break;
- case TOP_OF_DAY:
- LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset + " day.");
- break;
- case TOP_OF_WEEK:
- LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset + " week.");
- break;
- case TOP_OF_MONTH:
- LogLog.debug("Appender [" + name + "] to be rolled at start of every " + offset + " month.");
- break;
- default:
- LogLog.warn("Unknown periodicity for appender [" + name + "].");
- }
- }
-
-
- public int getTimeOffset(String timePattern){
- int index;
- int offset=-1;
-
- if ((index = containsIgnoreCase(timePattern, MINUTES)) > 0) {
- offset = Integer.parseInt(timePattern.substring(0,index));
- }
-
- if ((index = containsIgnoreCase(timePattern, HOURS)) > 0) {
- offset = Integer.parseInt(timePattern.substring(0,index));
-
- }
-
- if ((index = containsIgnoreCase(timePattern, DAYS)) > 0) {
- offset = Integer.parseInt(timePattern.substring(0,index));
-
- }
-
- if ((index = containsIgnoreCase(timePattern, WEEKS)) > 0) {
- offset = Integer.parseInt(timePattern.substring(0,index));
-
- }
-
- if ((index = containsIgnoreCase(timePattern, MONTHS)) > 0) {
- offset = Integer.parseInt(timePattern.substring(0,index));
-
- }
-
- return offset;
- }
-
- private void printLocalPeriodicity(int type, int offset) {
- switch(type) {
- case TOP_OF_MINUTE:
- LogLog.debug("Appender [" + name + "] Local File to be rolled every " + offset + " minute.");
- break;
- case TOP_OF_HOUR:
- LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset + " hour.");
- break;
- case HALF_DAY:
- LogLog.debug("Appender [" + name + "] Local File to be rolled at midday and midnight.");
- break;
- case TOP_OF_DAY:
- LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset + " day.");
- break;
- case TOP_OF_WEEK:
- LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset + " week.");
- break;
- case TOP_OF_MONTH:
- LogLog.debug("Appender [" + name + "] Local File to be rolled at start of every " + offset + " month.");
- break;
- default:
- LogLog.warn("Unknown periodicity for appender [" + name + "].");
- }
- }
-
-
-
-
- /**
- * Rollover the current file to a new file.
- */
- private void rollOver() throws IOException {
- /* Compute filename, but only if hdfsFileRollingInterval is specified */
- if(hdfsFileRollingInterval == null) {
- errorHandler.error("Missing hdfsFileRollingInterval option in rollOver().");
- return;
- }
-
- long epochNow = System.currentTimeMillis();
-
- String datedhdfsFileName = hdfsfileName+sdf.format(epochNow);
-
- LogLog.debug("In rollOver epochNow" + epochNow + " " + "nextCheck: " + prevnextCheck );
-
-
- // It is too early to roll over because we are still within the bounds of the current interval. Rollover will occur once the next interval is reached.
-
- if (epochNow < prevnextCheck) {
- return;
- }
-
- // close current file, and rename it to datedFilename
- this.closeFile();
-
- LogLog.debug("Rolling Over hdfs file to " + scheduledHdfsFileName);
-
-
- if ( hdfsAvailable ) {
- // for hdfs file we don't rollover the fike, we rename the file.
- actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis());
- }
-
- try {
- // This will also close the file. This is OK since multiple close operations are safe.
- this.setFile(fileName, false, this.bufferedIO, this.bufferSize);
- } catch(IOException e) {
- errorHandler.error("setFile(" + fileName + ", false) call failed.");
- }
- scheduledHdfsFileName = datedhdfsFileName;
- }
-
-
- /**
- * Rollover the current Local file to a new file.
- */
- private void rollOverLocal() throws IOException {
- /* Compute filename, but only if datePattern is specified */
- if(fileRollingInterval == null) {
- errorHandler.error("Missing LocalDatePattern option in rollOverLocal().");
- return;
- }
-
- long epochNow = System.currentTimeMillis();
-
- String datedCacheFileName = fileCache+sdf.format(epochNow);
- LogLog.debug("In rollOverLocal() epochNow" + epochNow + " " + "nextCheckLocal: " + prevnextCheckLocal );
-
- // It is too early to roll over because we are still within the bounds of the current interval. Rollover will occur once the next interval is reached.
- if (epochNow < prevnextCheckLocal ) {
- return;
- }
-
- if (new File(fileCache).length() != 0 ) {
- LogLog.debug("Rolling Local cache to " + scheduledFileCache);
-
- this.closeCacheWriter();
-
- File target = new File(scheduledFileCache);
- if (target.exists()) {
- target.delete();
- }
-
- File file = new File(fileCache);
-
- boolean result = file.renameTo(target);
-
- if(result) {
- LogLog.debug(fileCache +" -> "+ scheduledFileCache);
- } else {
- LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+scheduledFileCache+"].");
- }
- setFileCacheWriter();
- scheduledFileCache = datedCacheFileName;
- }
- }
-
-
- /**
- * <p>
- * Sets and <i>opens</i> the file where the log output will go. The specified file must be writable.
- * <p>
- * If there was already an opened file, then the previous file is closed first.
- * <p>
- * <b>Do not use this method directly. To configure a FileAppender or one of its subclasses, set its properties one by one and then call
- * activateOptions.</b>
- *
- * @param fileName The path to the log file.
- * @param append If true will append to fileName. Otherwise will truncate fileName.
- */
- public void setFile(String file) {
- // Trim spaces from both ends. The users probably does not want
- // trailing spaces in file names.
- String val = file.trim();
-
- fileName=val;
- fileCache=val+".cache";
-
- }
-
- @Override
- public synchronized void setFile(String fileName, boolean append, boolean bufferedIO, int bufferSize) throws IOException {
- LogLog.debug("setFile called: "+fileName+", "+append);
-
- // It does not make sense to have immediate flush and bufferedIO.
- if(bufferedIO) {
- setImmediateFlush(false);
- }
-
- reset();
-
- try {
- //
- // attempt to create file
- //
- ostream = new FileOutputStream(fileName, append);
- } catch(FileNotFoundException ex) {
- //
- // if parent directory does not exist then
- // attempt to create it and try to create file
- // see bug 9150
- //
- File umFile = new File(fileName);
- String parentName = umFile.getParent();
-
- if (parentName != null) {
- File parentDir = new File(parentName);
- if(!parentDir.exists() && parentDir.mkdirs()) {
- ostream = new FileOutputStream(fileName, append);
- } else {
- throw ex;
- }
- } else {
- throw ex;
- }
- }
-
- Writer fw = createWriter(ostream);
- if(bufferedIO) {
- fw = new BufferedWriter(fw, bufferSize);
- }
- this.setQWForFiles(fw);
- this.fileName = fileName;
- this.fileAppend = append;
- this.bufferedIO = bufferedIO;
- this.bufferSize = bufferSize;
-
- //set cache file
- setFileCacheWriter();
-
- writeHeader();
-
- LogLog.debug("setFile ended");
- }
-
- public void setHdfsDestination(final String name) {
- //Setting the fileSystemname
-
- String hostName = null;
-
- String val = name.trim();
-
- try {
-
- hostName = InetAddress.getLocalHost().getHostName();
- val=val.replaceAll("%hostname%", hostName);
- String hostStr[] = val.split(":");
- if ( hostStr.length > 0 ) {
- fileSystemName = hostStr[0]+":"+hostStr[1]+":"+hostStr[2];
-
- hdfsfileName = hostStr[3];
-
- } else {
- LogLog.error("Failed to set HdfsSystem and File");
- }
-
- } catch (UnknownHostException uhe) {
- LogLog.error("Setting the Hdfs Desitination Failed", uhe);
- }
-
- LogLog.debug("FileSystemName:" + fileSystemName + "fileName:"+ hdfsfileName);
-
- }
-
- /**
- * This method differentiates HdfsFileAppender from its super class.
- * <p>
- * Before actually logging, this method will check whether it is time to do a rollover. If it is, it will schedule the next rollover time and then rollover.
- */
- @Override
- protected void subAppend(LoggingEvent event) {
- LogLog.debug("Called subAppend for logging into hdfs...");
-
- long n = System.currentTimeMillis();
- if(n >= nextCheck) {
- now.setTime(n);
- prevnextCheck = nextCheck;
- nextCheck = rc.getNextCheckMillis(now,hdfsFileRollOffset);
- if ( firstTime) {
- prevnextCheck = nextCheck;
- firstTime = false;
- }
- try {
- if (hdfsUpdateAllowed) {
- rollOver();
- }
- } catch(IOException e) {
- LogLog.error("rollOver() failed.", e);
- }
- }
-
- long nLocal = System.currentTimeMillis();
- if ( nLocal > nextCheckLocal ) {
- nowLocal.setTime(nLocal);
- prevnextCheckLocal = nextCheckLocal;
- nextCheckLocal = rcLocal.getNextCheckMillis(nowLocal, fileRollOffset);
- if ( firstTimeLocal) {
- prevnextCheckLocal = nextCheckLocal;
- firstTimeLocal = false;
- }
- try {
- rollOverLocal();
- } catch(IOException e) {
- LogLog.error("rollOverLocal() failed.", e);
- }
- }
-
- this.layout = this.getLayout();
- this.encoding = this.getEncoding();
-
- // Append HDFS
- appendHDFSFileSystem(event);
-
-
- //super.subAppend(event);
- }
-
- @Override
- protected
- void reset() {
- closeWriter();
- this.qw = null;
- //this.
- this.closeHdfsWriter();
- this.closeCacheWriter();
- }
-
- @Override
- public synchronized void close() {
- LogLog.debug("Closing all resource..");
- this.closeFile();
- this.closeHdfsWriter();
- this.closeHdfsOstream();
- this.closeFileSystem();
- }
-
- @Override
- protected void closeFile() {
- try {
- if(this.ostream != null) {
- this.ostream.close();
- this.ostream = null;
- }
- } catch(IOException ie) {
- LogLog.error("unable to close output stream", ie);
- }
- this.closeHdfsWriter();
- this.closeHdfsOstream();
- }
-
- @Override
- protected void closeWriter() {
- try {
- if(this.qw != null) {
- this.qw.close();
- this.qw = null;
- }
- } catch(IOException ie) {
- LogLog.error("unable to close writer", ie);
- }
- }
-
- @Override
- public void finalize() {
- super.finalize();
- close();
- }
-
-
- /******* HDFS Appender methods ***********/
-
- private void appendHDFSFileSystem(LoggingEvent event) {
-
- long currentTime = System.currentTimeMillis();
-
- try {
-
- if ( currentTime >= hdfsNextCheck ) {
-
- LogLog.debug("About to Open fileSystem" + fileSystemName+" "+actualHdfsfileName) ;
- hs = openHdfsSink(fileSystemName,actualHdfsfileName,fileCache,fileAppend,bufferedIO,bufferSize,layout,encoding,scheduledFileCache,cacheWriter,hdfsUpdateAllowed,processUser);
- if (hdfsUpdateAllowed) {
- // stream into hdfs only when liveHdfsUpdate flag is true else write to cache file.
- hs.setOsteam();
- hs.setWriter();
- hs.append(event);
- } else {
- writeToCache(event);
- }
- hdfsAvailable = true;
-
- } else {
- // Write the Log To cache file util time to check hdfs availability
- hdfsAvailable = false;
- LogLog.debug("Hdfs Down..Will check hdfs vailability after " + hdfsNextCheck + "Current Time :" +hdfsNextCheck ) ;
- writeToCache(event);
- }
- }
- catch(Throwable t) {
- // Write the Log To cache file if hdfs connect error out.
- hdfsAvailable = false;
- if ( !timeCheck ) {
- int hdfscheckInterval = getTimeOffset(hdfsCheckInterval);
- hdfsNextCheck = System.currentTimeMillis()+(1000*60*hdfscheckInterval);
- timeCheck = true;
- LogLog.debug("Hdfs Down..Will check hdfs vailability after " + hdfsCheckInterval , t) ;
-
- }
- writeToCache(event);
- }
-
- }
-
-
- private HdfsSink openHdfsSink(String fileSystemName,String filename, String fileCache, boolean append, boolean bufferedIO,int bufferSize,Layout layout, String encoding, String scheduledCacheFile, Writer cacheWriter,boolean hdfsUpdateAllowed,String processUser) throws Throwable {
-
- HdfsSink hs = null;
- hs = HdfsSink.getInstance();
- if ( hs != null)
-
- LogLog.debug("Hdfs Sink successfully instatiated");
- try {
- hs.init(fileSystemName, filename, fileCache, append, bufferedIO, bufferSize, layout, encoding,scheduledCacheFile,cacheWriter,hdfsUpdateAllowed,processUser);
-
- } catch (Throwable t) {
- throw t;
- }
- return hs;
-
- }
-
- private void closeHdfsOstream() {
- if (hs != null ){
- LogLog.debug("Closing hdfs outstream") ;
- hs.closeHdfsOstream();
- }
- }
-
- private void closeHdfsWriter() {
-
- if (hs != null) {
- LogLog.debug("Closing hdfs Writer") ;
- hs.closeHdfsWriter();
- }
- }
-
- private void closeFileSystem() {
- hs.closeHdfsSink();
- }
-
-
-
- /****** Cache File Methods **/
-
-
- public void setFileCacheWriter() {
-
- try {
- setFileCacheOstream(fileCache);
- } catch(IOException ie) {
- LogLog.error("Logging failed while tring to write into Cache File..", ie);
- }
- LogLog.debug("Setting Cache Writer..");
- cacheWriter = createCacheFileWriter(cacheOstream);
- if(bufferedIO) {
- cacheWriter = new BufferedWriter(cacheWriter, bufferSize);
- }
- }
-
-
- private void setFileCacheOstream(String fileCache) throws IOException {
-
- try {
- cacheOstream = new FileOutputStream(fileCache, true);
- } catch(FileNotFoundException ex) {
- String parentName = new File(fileCache).getParent();
- if (parentName != null) {
- File parentDir = new File(parentName);
- if(!parentDir.exists() && parentDir.mkdirs()) {
- cacheOstream = new FileOutputStream(fileName, true);
- } else {
- throw ex;
- }
- } else {
- throw ex;
- }
- }
- }
-
-
- public OutputStreamWriter createCacheFileWriter(OutputStream os ) {
- OutputStreamWriter retval = null;
-
- if(encoding != null) {
- try {
- retval = new OutputStreamWriter(os, encoding);
- } catch(IOException ie) {
- LogLog.warn("Error initializing output writer.");
- LogLog.warn("Unsupported encoding?");
- }
- }
- if(retval == null) {
- retval = new OutputStreamWriter(os);
- }
- return retval;
- }
-
-
- public void writeToCache(LoggingEvent event) {
-
- try {
- LogLog.debug("Writing log to Cache.." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + cacheWriter.toString());
-
- cacheWriter.write(this.layout.format(event));
- cacheWriter.flush();
-
- if(layout.ignoresThrowable()) {
- String[] s = event.getThrowableStrRep();
- if (s != null) {
- int len = s.length;
- for(int i = 0; i < len; i++) {
- LogLog.debug("Log:" + s[i]);
- cacheWriter.write(s[i]);
- cacheWriter.write(Layout.LINE_SEP);
- cacheWriter.flush();
- }
- }
- }
- } catch (IOException ie) {
- LogLog.error("Unable to log event message to hdfs:", ie);
- }
- }
-
- public void rollOverCacheFile() {
-
- if (new File(fileCache).length() != 0 ) {
-
- long epochNow = System.currentTimeMillis();
-
- String datedCacheFileName = fileCache + "." + epochNow;
- LogLog.debug("Rolling over remaining cache File to new file"+ datedCacheFileName);
- closeCacheWriter();
-
- File target = new File(datedCacheFileName);
- if (target.exists()) {
- target.delete();
- }
-
- File file = new File(fileCache);
-
- boolean result = file.renameTo(target);
-
- if(result) {
- LogLog.debug(fileCache +" -> "+ datedCacheFileName);
- } else {
- LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+datedCacheFileName+"].");
- }
- }
- }
-
- public void closeCacheWriter() {
- try {
- if(cacheWriter != null) {
- cacheWriter.close();
- cacheWriter = null;
- }
- } catch(IOException ie) {
- LogLog.error("unable to close cache writer", ie);
- }
- }
-}
-
-/**
- * RollingCalendar is a helper class to HdfsFileAppender. Given a periodicity type and the current time, it computes the start of the next interval.
- */
-
-class RollingCalendar extends GregorianCalendar {
- private static final long serialVersionUID = 1L;
-
- private int type = HdfsFileAppender.TOP_OF_TROUBLE;
-
- RollingCalendar() {
- super();
- }
-
- RollingCalendar(TimeZone tz, Locale locale) {
- super(tz, locale);
- }
-
- void setType(int type) {
- this.type = type;
- }
-
- public long getNextCheckMillis(Date now, int offset) {
- return getNextCheckDate(now,offset).getTime();
- }
-
- public Date getNextCheckDate(Date now,int offset) {
- this.setTime(now);
-
- switch(this.type) {
- case HdfsFileAppender.TOP_OF_MINUTE:
- this.set(Calendar.SECOND, 0);
- this.set(Calendar.MILLISECOND, 0);
- this.add(Calendar.MINUTE, offset);
- break;
- case HdfsFileAppender.TOP_OF_HOUR:
- this.set(Calendar.MINUTE, 0);
- this.set(Calendar.SECOND, 0);
- this.set(Calendar.MILLISECOND, 0);
- this.add(Calendar.HOUR_OF_DAY, offset);
- break;
- case HdfsFileAppender.HALF_DAY:
- this.set(Calendar.MINUTE, 0);
- this.set(Calendar.SECOND, 0);
- this.set(Calendar.MILLISECOND, 0);
- int hour = get(Calendar.HOUR_OF_DAY);
- if(hour < 12) {
- this.set(Calendar.HOUR_OF_DAY, 12);
- } else {
- this.set(Calendar.HOUR_OF_DAY, 0);
- this.add(Calendar.DAY_OF_MONTH, 1);
- }
- break;
- case HdfsFileAppender.TOP_OF_DAY:
- this.set(Calendar.HOUR_OF_DAY, 0);
- this.set(Calendar.MINUTE, 0);
- this.set(Calendar.SECOND, 0);
- this.set(Calendar.MILLISECOND, 0);
- this.add(Calendar.DATE, offset);
- break;
- case HdfsFileAppender.TOP_OF_WEEK:
- this.set(Calendar.DAY_OF_WEEK, getFirstDayOfWeek());
- this.set(Calendar.HOUR_OF_DAY, 0);
- this.set(Calendar.SECOND, 0);
- this.set(Calendar.MILLISECOND, 0);
- this.add(Calendar.WEEK_OF_YEAR, offset);
- break;
- case HdfsFileAppender.TOP_OF_MONTH:
- this.set(Calendar.DATE, 1);
- this.set(Calendar.HOUR_OF_DAY, 0);
- this.set(Calendar.SECOND, 0);
- this.set(Calendar.MILLISECOND, 0);
- this.add(Calendar.MONTH, offset);
- break;
- default:
- throw new IllegalStateException("Unknown periodicity type.");
- }
- return getTime();
- }
-
-
-}
-
-
-/*************
- * Hdfs Sink
- *
- *************/
-
-class HdfsSink {
-
- private static final String DS_REPLICATION_VAL = "1";
- private static final String DS_REPLICATION_KEY = "dfs.replication";
- private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
- private Configuration conf = null;
- private FileSystem fs= null;
- private Path pt = null;
- private FSDataOutputStream hdfsostream = null;
- private String fsName = null;
- private String fileName = null;
- private String fileCache = null;
- private Layout layout = null;
- private String encoding = null;
- private Writer hdfswriter = null;
- private int bufferSize;
- private boolean bufferedIO=false;
- private static int fstime=0;
- private CacheFileWatcher cfw = null;
- private boolean hdfsUpdateAllowed=true;
- private String processUser=null;
-
-
- HdfsSink() {
- }
-
- private static final ThreadLocal<HdfsSink> hdfssink = new ThreadLocal<HdfsSink>() {
- protected HdfsSink initialValue() {
- return new HdfsSink();
- }
- };
-
- public static HdfsSink getInstance() {
- return hdfssink.get();
- }
-
- public void init(String fileSystemName, String fileName, String fileCache,boolean append, boolean bufferedIO, int bufferSize, Layout layout, String encoding, String scheduledCacheFile, Writer cacheWriter, boolean hdfsUpdateAllowed, String processUser) throws Exception{
-
- this.fsName=fileSystemName;
- this.fileName=fileName;
- this.layout=layout;
- this.encoding=encoding;
- this.bufferSize=bufferSize;
- this.bufferedIO=bufferedIO;
- this.fileCache=fileCache;
- this.hdfsUpdateAllowed=hdfsUpdateAllowed;
- this.processUser=processUser;
-
- final Configuration conf= new Configuration();
- conf.set(DS_REPLICATION_KEY,DS_REPLICATION_VAL);
- conf.set(FS_DEFAULT_NAME_KEY, fsName);
-
- try {
- if ( fs == null) {
- LogLog.debug("Opening Connection to hdfs Sytem" + this.fsName);
-
- UserGroupInformation ugi = UserGroupInformation.createProxyUser(this.processUser, UserGroupInformation.getLoginUser());
- fs = ugi.doAs( new PrivilegedExceptionAction<FileSystem>() {
- public FileSystem run() throws Exception {
- FileSystem filesystem = FileSystem.get(conf);
- LogLog.debug("Inside UGI.." + fsName + " " + filesystem);
- return filesystem;
- }
- });
-
- if ( cfw == null) {
- // Start the CacheFileWatcher to move the Cache file.
-
- LogLog.debug("About to run CacheFilWatcher...");
- Path hdfsfilePath = getParent();
- cfw = new CacheFileWatcher(this.fs,this.fileCache,hdfsfilePath,cacheWriter,this.hdfsUpdateAllowed,conf);
- cfw.start();
- }
-
- }
-
- } catch(Exception ie) {
-
- LogLog.error("Unable to Create hdfs logfile:" + ie.getMessage());
- throw ie;
- }
-
- LogLog.debug("HdfsSystem up: " + fsName + "FS Object:" + fs);
- }
-
- public int getfstime() {
- return fstime;
- }
- public FileSystem getFileSystem() {
- return fs;
- }
-
- public Path getPath() {
- return pt;
- }
-
- public Path getParent() {
- Path pt = new Path(this.fileName);
- return pt.getParent();
- }
-
- public void setOsteam() throws IOException {
- try {
- pt = new Path(this.fileName);
- // if file Exist append it
- if(fs.exists(pt)) {
- LogLog.debug("Appending File: "+ this.fsName+":"+this.fileName+fs);
- if (hdfsostream !=null) {
- hdfsostream.close();
- }
- hdfsostream=fs.append(pt);
-
- } else {
- LogLog.debug("Creating File directories in hdfs if not present.."+ this.fsName+":"+this.fileName + fs);
- String parentName = new Path(this.fileName).getParent().toString();
- if(parentName != null) {
- Path parentDir = new Path(parentName);
- if (!fs.exists(parentDir) ) {
- LogLog.debug("Creating Parent Directory: " + parentDir );
- fs.mkdirs(parentDir);
- }
- }
- hdfsostream = fs.create(pt);
- }
- } catch (IOException ie) {
- LogLog.debug("Error While appending hdfsd file." + ie);
- throw ie;
- }
- }
-
- public void setWriter() {
- LogLog.debug("Setting Writer..");
- hdfswriter = createhdfsWriter(hdfsostream);
- if(bufferedIO) {
- hdfswriter = new BufferedWriter(hdfswriter, bufferSize);
- }
- }
-
- public Writer getWriter() {
- return hdfswriter;
- }
-
- public void append(LoggingEvent event) throws IOException {
- try {
- LogLog.debug("Writing log to HDFS." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + hdfswriter.toString());
-
- hdfswriter.write(this.layout.format(event));
- hdfswriter.flush();
- if(layout.ignoresThrowable()) {
- String[] s = event.getThrowableStrRep();
- if (s != null) {
- int len = s.length;
- for(int i = 0; i < len; i++) {
- LogLog.debug("Log:" + s[i]);
- hdfswriter.write(s[i]);
- hdfswriter.write(Layout.LINE_SEP);
- hdfswriter.flush();
- }
- }
- }
- } catch (IOException ie) {
- LogLog.error("Unable to log event message to hdfs:", ie);
- throw ie;
- }
- }
-
- public void writeHeader() throws IOException {
- LogLog.debug("Writing log header...");
- try {
- if(layout != null) {
- String h = layout.getHeader();
- if(h != null && hdfswriter != null)
- LogLog.debug("Log header:" + h);
- hdfswriter.write(h);
- hdfswriter.flush();
- }
- } catch (IOException ie) {
- LogLog.error("Unable to log header message to hdfs:", ie);
- throw ie;
- }
- }
-
- public
- void writeFooter() throws IOException{
- LogLog.debug("Writing footer header...");
- try {
- if(layout != null) {
- String f = layout.getFooter();
- if(f != null && hdfswriter != null) {
- LogLog.debug("Log:" + f);
- hdfswriter.write(f);
- hdfswriter.flush();
- }
- }
- } catch (IOException ie) {
- LogLog.debug("Unable to log header message to hdfs:", ie);
- throw ie;
- }
-
- }
-
- public void closeHdfsOstream() {
- try {
- if(this.hdfsostream != null) {
- this.hdfsostream.close();
- this.hdfsostream = null;
- }
- } catch(IOException ie) {
- LogLog.error("unable to close output stream", ie);
- }
-
- }
-
- public void closeHdfsWriter() {
- try {
- if(hdfswriter != null) {
- hdfswriter.close();
- hdfswriter = null;
- }
- } catch(IOException ie) {
- LogLog.error("unable to hfds writer", ie);
- }
- }
-
- public void closeHdfsSink() {
- try {
- if (fs !=null) {
- fs.close();
- }
- } catch (IOException ie) {
- LogLog.error("Unable to close hdfs " + fs ,ie);
- }
- }
-
-
- public OutputStreamWriter createhdfsWriter(FSDataOutputStream os ) {
- OutputStreamWriter retval = null;
-
- if(encoding != null) {
- try {
- retval = new OutputStreamWriter(os, encoding);
- } catch(IOException ie) {
- LogLog.warn("Error initializing output writer.");
- LogLog.warn("Unsupported encoding?");
- }
- }
- if(retval == null) {
- retval = new OutputStreamWriter(os);
- }
- return retval;
- }
-
-
- }
-
-
-// CacheFileWatcher Thread
-
-class CacheFileWatcher extends Thread {
-
- long CACHEFILE_WATCHER_SLEEP_TIME = 1000*60*2;
-
- Configuration conf = null;
- private FileSystem fs = null;
- private String cacheFile = null;
- private File parentDir = null;
- private File[] files = null;
- private Path fsPath = null;
- private Path hdfsfilePath = null;
- private Writer cacheWriter = null;
-
- private boolean hdfsUpdateAllowed=true;
- private boolean cacheFilesCopied = false;
-
- CacheFileWatcher(FileSystem fs, String cacheFile, Path hdfsfilePath, Writer cacheWriter, boolean hdfsUpdateAllowed, Configuration conf) {
- this.fs = fs;
- this.cacheFile = cacheFile;
- this.conf = conf;
- this.hdfsfilePath = hdfsfilePath;
- this.cacheWriter = cacheWriter;
- this.hdfsUpdateAllowed = hdfsUpdateAllowed;
- }
-
-
- public void run(){
-
- LogLog.debug("CacheFileWatcher Started");
-
- while (!cacheFilesCopied ){
-
- if (hdfsUpdateAllowed) {
- rollRemainingCacheFile();
- }
-
- if ( !cacheFilePresent(cacheFile) ) {
-
- try {
- Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME);
- } catch (InterruptedException ie) {
- LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie);
- }
- } else {
- try {
- copyCacheFilesToHdfs();
- if (hdfsUpdateAllowed) {
- cacheFilesCopied = true;
- } else {
- cacheFilesCopied = false;
- }
- } catch (Throwable t) {
- // Error While copying the file to hdfs and thread goes for sleep and check later
- cacheFilesCopied = false;
- LogLog. error("Error while copying Cache Files to hdfs..Sleeping for next try",t);
-
- try {
- Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME);
- } catch (InterruptedException ie) {
- LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie);
- }
- }
- }
- }
- }
-
- public boolean cacheFilePresent(String filename) {
- String parent = new File(filename).getParent();
- if ( parent != null ) {
- parentDir = new File(parent);
- fsPath = new Path(parent);
- files = parentDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File parentDir, String name) {
- return name.matches(".*cache.+");
- }
- });
- if ( files.length > 0) {
- LogLog.debug("CacheFile Present..");
- return true;
- }
- }
- return false;
- }
-
-
- public void copyCacheFilesToHdfs() throws Throwable{
-
- try {
-
- if (!fs.exists(hdfsfilePath) ) {
- LogLog.debug("Creating Parent Directory: " + hdfsfilePath );
- fs.mkdirs(hdfsfilePath);
- }
- } catch ( Throwable t) {
- throw t;
- }
-
-
- for ( File cacheFile : files) {
- try {
- LogLog.debug("Copying Files..." + "File Path: " + fsPath + "CacheFile: " +cacheFile + "HDFS Path:" + hdfsfilePath);
- FileUtil.copy(cacheFile, this.fs, this.hdfsfilePath, true, this.conf);
- } catch (Throwable t) {
-
- throw t;
- }
- }
- }
-
- public void rollRemainingCacheFile() {
- String datePattern = "'.'yyyy-MM-dd-HH-mm";
- SimpleDateFormat sdf = new SimpleDateFormat(datePattern);
- if (new File(cacheFile).length() != 0 ) {
- long epochNow = System.currentTimeMillis();
-
- String datedCacheFileName = cacheFile + sdf.format(epochNow);
-
- LogLog.debug("Rolling over remaining cache File "+ datedCacheFileName);
- closeCacheFile();
-
- File target = new File(datedCacheFileName);
- if (target.exists()) {
- target.delete();
- }
-
- File file = new File(cacheFile);
-
- boolean result = file.renameTo(target);
-
- if(result) {
- LogLog.debug(cacheFile +" -> "+ datedCacheFileName);
- } else {
- LogLog.error("Failed to rename cache file ["+cacheFile+"] to ["+datedCacheFileName+"].");
- }
-
- }
- }
-
- public void closeCacheFile() {
- try {
- if(cacheWriter != null) {
- cacheWriter.close();
- cacheWriter = null;
- }
- } catch(IOException ie) {
- LogLog.error("unable to close cache writer", ie);
- }
- }
-}
-
-