You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/13 10:41:31 UTC
[14/51] [abbrv] ambari git commit: AMBARI-18236. Fix package
structure in Logfeeder (Miklos Gergely via oleewere)
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
new file mode 100644
index 0000000..32029ff
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.util;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.LogFeeder;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.collect.ObjectArrays;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * This class contains utility methods used by LogFeeder
+ */
+public class LogFeederUtil {
+ private static final Logger logger = Logger.getLogger(LogFeederUtil.class);
+
+ private static final int HASH_SEED = 31174077;
+ public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ public final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
+
+ private static Properties props;
+
+ private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
+ private static int logInterval = 30000; // 30 seconds
+
+ public static String hostName = null;
+ public static String ipAddress = null;
+
+ private static String logfeederTempDir = null;
+
+ private static final Object _LOCK = new Object();
+
+ static{
+ setHostNameAndIP();
+ }
+
+ public static Gson getGson() {
+ return gson;
+ }
+
+ private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT);
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return sdf;
+ }
+ };
+
+ /**
+ * This method will read the properties from System, followed by propFile
+ * and finally from the map
+ */
+ public static void loadProperties(String propFile, String[] propNVList)
+ throws Exception {
+ logger.info("Loading properties. propFile=" + propFile);
+ props = new Properties(System.getProperties());
+ boolean propLoaded = false;
+
+ // First get properties file path from environment value
+ String propertiesFilePath = System.getProperty("properties");
+ if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) {
+ File propertiesFile = new File(propertiesFilePath);
+ if (propertiesFile.exists() && propertiesFile.isFile()) {
+ logger.info("Properties file path set in environment. Loading properties file="
+ + propertiesFilePath);
+ FileInputStream fileInputStream = null;
+ try {
+ fileInputStream = new FileInputStream(propertiesFile);
+ props.load(fileInputStream);
+ propLoaded = true;
+ } catch (Throwable t) {
+ logger.error("Error loading properties file. properties file="
+ + propertiesFile.getAbsolutePath());
+ } finally {
+ if (fileInputStream != null) {
+ try {
+ fileInputStream.close();
+ } catch (Throwable t) {
+ // Ignore error
+ }
+ }
+ }
+ } else {
+ logger.error("Properties file path set in environment, but file not found. properties file="
+ + propertiesFilePath);
+ }
+ }
+
+ if (!propLoaded) {
+ BufferedInputStream fileInputStream = null;
+ try {
+ // Properties not yet loaded, let's try from class loader
+ fileInputStream = (BufferedInputStream) LogFeeder.class
+ .getClassLoader().getResourceAsStream(propFile);
+ if (fileInputStream != null) {
+ logger.info("Loading properties file " + propFile
+ + " from classpath");
+ props.load(fileInputStream);
+ propLoaded = true;
+ } else {
+ logger.fatal("Properties file not found in classpath. properties file name= "
+ + propFile);
+ }
+ } finally {
+ if (fileInputStream != null) {
+ try {
+ fileInputStream.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+
+ if (!propLoaded) {
+ logger.fatal("Properties file is not loaded.");
+ throw new Exception("Properties not loaded");
+ } else {
+ updatePropertiesFromMap(propNVList);
+ }
+ }
+
+ private static void updatePropertiesFromMap(String[] nvList) {
+ if (nvList == null) {
+ return;
+ }
+ logger.info("Trying to load additional proeprties from argument paramters. nvList.length="
+ + nvList.length);
+ if (nvList != null && nvList.length > 0) {
+ for (String nv : nvList) {
+ logger.info("Passed nv=" + nv);
+ if (nv.startsWith("-") && nv.length() > 1) {
+ nv = nv.substring(1);
+ logger.info("Stripped nv=" + nv);
+ int i = nv.indexOf("=");
+ if (nv.length() > i) {
+ logger.info("Candidate nv=" + nv);
+ String name = nv.substring(0, i);
+ String value = nv.substring(i + 1);
+ logger.info("Adding property from argument to properties. name="
+ + name + ", value=" + value);
+ props.put(name, value);
+ }
+ }
+ }
+ }
+ }
+
+ static public String getStringProperty(String key) {
+ if (props != null) {
+ return props.getProperty(key);
+ }
+ return null;
+ }
+
+ static public String getStringProperty(String key, String defaultValue) {
+ if (props != null) {
+ return props.getProperty(key, defaultValue);
+ }
+ return defaultValue;
+ }
+
+ static public boolean getBooleanProperty(String key, boolean defaultValue) {
+ String strValue = getStringProperty(key);
+ return toBoolean(strValue, defaultValue);
+ }
+
+ private static boolean toBoolean(String strValue, boolean defaultValue) {
+ boolean retValue = defaultValue;
+ if (!StringUtils.isEmpty(strValue)) {
+ if (strValue.equalsIgnoreCase("true")
+ || strValue.equalsIgnoreCase("yes")) {
+ retValue = true;
+ } else {
+ retValue = false;
+ }
+ }
+ return retValue;
+ }
+
+ static public int getIntProperty(String key, int defaultValue) {
+ String strValue = getStringProperty(key);
+ int retValue = defaultValue;
+ retValue = objectToInt(strValue, retValue, ", key=" + key);
+ return retValue;
+ }
+
+ public static int objectToInt(Object objValue, int retValue,
+ String errMessage) {
+ if (objValue == null) {
+ return retValue;
+ }
+ String strValue = objValue.toString();
+ if (!StringUtils.isEmpty(strValue)) {
+ try {
+ retValue = Integer.parseInt(strValue);
+ } catch (Throwable t) {
+ logger.error("Error parsing integer value. str=" + strValue
+ + ", " + errMessage);
+ }
+ }
+ return retValue;
+ }
+
+ public static boolean isEnabled(Map<String, Object> conditionConfigs,
+ Map<String, Object> valueConfigs) {
+ boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> conditions = (Map<String, Object>) conditionConfigs
+ .get("conditions");
+ if (conditions != null && conditions.size() > 0) {
+ allow = false;
+ for (String conditionType : conditions.keySet()) {
+ if (conditionType.equalsIgnoreCase("fields")) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> fields = (Map<String, Object>) conditions
+ .get("fields");
+ for (String fieldName : fields.keySet()) {
+ Object values = fields.get(fieldName);
+ if (values instanceof String) {
+ allow = isFieldConditionMatch(valueConfigs,
+ fieldName, (String) values);
+ } else {
+ @SuppressWarnings("unchecked")
+ List<String> listValues = (List<String>) values;
+ for (String stringValue : listValues) {
+ allow = isFieldConditionMatch(valueConfigs,
+ fieldName, stringValue);
+ if (allow) {
+ break;
+ }
+ }
+ }
+ if (allow) {
+ break;
+ }
+ }
+ }
+ if (allow) {
+ break;
+ }
+ }
+ }
+ return allow;
+ }
+
+ public static boolean isFieldConditionMatch(Map<String, Object> configs,
+ String fieldName, String stringValue) {
+ boolean allow = false;
+ String fieldValue = (String) configs.get(fieldName);
+ if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
+ allow = true;
+ } else {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> addFields = (Map<String, Object>) configs
+ .get("add_fields");
+ if (addFields != null && addFields.get(fieldName) != null) {
+ String addFieldValue = (String) addFields.get(fieldName);
+ if (stringValue.equalsIgnoreCase(addFieldValue)) {
+ allow = true;
+ }
+ }
+
+ }
+ return allow;
+ }
+
+ public static void logStatForMetric(MetricCount metric, String prefixStr,
+ String postFix) {
+ long currStat = metric.count;
+ long currMS = System.currentTimeMillis();
+ if (currStat > metric.prevLogCount) {
+ if (postFix == null) {
+ postFix = "";
+ }
+ logger.info(prefixStr + ": total_count=" + metric.count
+ + ", duration=" + (currMS - metric.prevLogMS) / 1000
+ + " secs, count=" + (currStat - metric.prevLogCount)
+ + postFix);
+ }
+ metric.prevLogCount = currStat;
+ metric.prevLogMS = currMS;
+ }
+
+ public static Map<String, Object> cloneObject(Map<String, Object> map) {
+ if (map == null) {
+ return null;
+ }
+ String jsonStr = gson.toJson(map);
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ return gson.fromJson(jsonStr, type);
+ }
+
+ public static Map<String, Object> toJSONObject(String jsonStr) {
+ if(jsonStr==null || jsonStr.trim().isEmpty()){
+ return new HashMap<String, Object>();
+ }
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ return gson.fromJson(jsonStr, type);
+ }
+
+ static public boolean logErrorMessageByInterval(String key, String message,
+ Throwable e, Logger callerLogger, Level level) {
+
+ LogHistory log = logHistoryList.get(key);
+ if (log == null) {
+ log = new LogHistory();
+ logHistoryList.put(key, log);
+ }
+ if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
+ log.lastLogTime = System.currentTimeMillis();
+ int counter = log.counter;
+ log.counter = 0;
+ if (counter > 0) {
+ message += ". Messages suppressed before: " + counter;
+ }
+ if (e == null) {
+ callerLogger.log(level, message);
+ } else {
+ callerLogger.log(level, message, e);
+ }
+
+ return true;
+ } else {
+ log.counter++;
+ }
+ return false;
+
+ }
+
+ static public String subString(String str, int maxLength) {
+ if (str == null || str.length() == 0) {
+ return "";
+ }
+ maxLength = str.length() < maxLength ? str.length() : maxLength;
+ return str.substring(0, maxLength);
+ }
+
+ public static long genHash(String value) {
+ if (value == null) {
+ value = "null";
+ }
+ return MurmurHash.hash64A(value.getBytes(), HASH_SEED);
+ }
+
+ private static class LogHistory {
+ private long lastLogTime = 0;
+ private int counter = 0;
+ }
+
+ public static String getDate(String timeStampStr) {
+ try {
+ return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr)));
+ } catch (Exception ex) {
+ logger.error(ex);
+ return null;
+ }
+ }
+
+ public static String getActualDateStr() {
+ try {
+ return dateFormatter.get().format(new Date());
+ } catch (Exception ex) {
+ logger.error(ex);
+ return null;
+ }
+ }
+
+ public static File getFileFromClasspath(String filename) {
+ URL fileCompleteUrl = Thread.currentThread().getContextClassLoader()
+ .getResource(filename);
+ logger.debug("File Complete URI :" + fileCompleteUrl);
+ File file = null;
+ try {
+ file = new File(fileCompleteUrl.toURI());
+ } catch (Exception exception) {
+ logger.debug(exception.getMessage(), exception.getCause());
+ }
+ return file;
+ }
+
+ public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) {
+ Object instance = null;
+ try {
+ instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+ } catch (Exception exception) {
+ logger.error("Unsupported class =" + classFullName, exception.getCause());
+ }
+ // check instance class as par aliasType
+ if (instance != null) {
+ boolean isValid = false;
+ switch (aliasType) {
+ case FILTER:
+ isValid = Filter.class.isAssignableFrom(instance.getClass());
+ break;
+ case INPUT:
+ isValid = Input.class.isAssignableFrom(instance.getClass());
+ break;
+ case OUTPUT:
+ isValid = Output.class.isAssignableFrom(instance.getClass());
+ break;
+ case MAPPER:
+ isValid = Mapper.class.isAssignableFrom(instance.getClass());
+ break;
+ default:
+ // by default consider all are valid class
+ isValid = true;
+ }
+ if (!isValid) {
+ logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
+ }
+ }
+ return instance;
+ }
+
+ public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {
+ });
+ return jsonmap;
+ } catch (JsonParseException e) {
+ logger.error(e, e.getCause());
+ } catch (JsonMappingException e) {
+ logger.error(e, e.getCause());
+ } catch (IOException e) {
+ logger.error(e, e.getCause());
+ }
+ return new HashMap<String, Object>();
+ }
+
+ public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
+ if (list != null) {
+ for (String value : list) {
+ if (value != null) {
+ if (caseSensitive) {
+ if (value.equals(str)) {
+ return true;
+ }
+ } else {
+ if (value.equalsIgnoreCase(str)) {
+ return true;
+ }
+ }
+ if (value.equalsIgnoreCase(LogFeederConstants.ALL)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+
+ private static synchronized String setHostNameAndIP() {
+ if (hostName == null || ipAddress == null) {
+ try {
+ InetAddress ip = InetAddress.getLocalHost();
+ ipAddress = ip.getHostAddress();
+ String getHostName = ip.getHostName();
+ String getCanonicalHostName = ip.getCanonicalHostName();
+ if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
+ logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
+ hostName = getCanonicalHostName;
+ } else {
+ logger.info("Using getHostName()=" + getHostName);
+ hostName = getHostName;
+ }
+ logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName
+ + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName="
+ + hostName);
+ } catch (UnknownHostException e) {
+ logger.error("Error getting hostname.", e);
+ }
+ }
+ return hostName;
+ }
+
+ public static String[] mergeArray(String[] first, String[] second) {
+ if (first == null) {
+ first = new String[0];
+ }
+ if (second == null) {
+ second = new String[0];
+ }
+ String[] mergedArray = ObjectArrays.concat(first, second, String.class);
+ return mergedArray;
+ }
+
+ public static String getLogfeederTempDir() {
+ if (logfeederTempDir == null) {
+ synchronized (_LOCK) {
+ if (logfeederTempDir == null) {
+ String tempDirValue = getStringProperty("logfeeder.tmp.dir",
+ "/tmp/$username/logfeeder/");
+ HashMap<String, String> contextParam = new HashMap<String, String>();
+ String username = System.getProperty("user.name");
+ contextParam.put("username", username);
+ logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue,
+ contextParam);
+ }
+ }
+ }
+ return logfeederTempDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java
new file mode 100644
index 0000000..dbbefaf
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.util;
+
+import com.google.common.primitives.Ints;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general hash-based
+ * lookup. See http://murmurhash.googlepages.com/ for more details.
+ * <p/>
+ * <p>The C version of MurmurHash 2.0 found at that site was ported
+ * to Java by Andrzej Bialecki (ab at getopt org).</p>
+ */
+public final class MurmurHash {
+
+ private MurmurHash() {
+ }
+
+ /**
+ * Hashes an int.
+ *
+ * @param data The int to hash.
+ * @param seed The seed for the hash.
+ * @return The 32 bit hash of the bytes in question.
+ */
+ public static int hash(int data, int seed) {
+ return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed);
+ }
+
+ /**
+ * Hashes bytes in an array.
+ *
+ * @param data The bytes to hash.
+ * @param seed The seed for the hash.
+ * @return The 32 bit hash of the bytes in question.
+ */
+ public static int hash(byte[] data, int seed) {
+ return hash(ByteBuffer.wrap(data), seed);
+ }
+
+ /**
+ * Hashes bytes in part of an array.
+ *
+ * @param data The data to hash.
+ * @param offset Where to start munging.
+ * @param length How many bytes to process.
+ * @param seed The seed to start with.
+ * @return The 32-bit hash of the data in question.
+ */
+ public static int hash(byte[] data, int offset, int length, int seed) {
+ return hash(ByteBuffer.wrap(data, offset, length), seed);
+ }
+
+ /**
+ * Hashes the bytes in a buffer from the current position to the limit.
+ *
+ * @param buf The bytes to hash.
+ * @param seed The seed for the hash.
+ * @return The 32 bit murmur hash of the bytes in the buffer.
+ */
+ public static int hash(ByteBuffer buf, int seed) {
+ // save byte order for later restoration
+ ByteOrder byteOrder = buf.order();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+
+ int m = 0x5bd1e995;
+ int r = 24;
+
+ int h = seed ^ buf.remaining();
+
+ while (buf.remaining() >= 4) {
+ int k = buf.getInt();
+
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+
+ h *= m;
+ h ^= k;
+ }
+
+ if (buf.remaining() > 0) {
+ ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+ // for big-endian version, use this first:
+ // finish.position(4-buf.remaining());
+ finish.put(buf).rewind();
+ h ^= finish.getInt();
+ h *= m;
+ }
+
+ h ^= h >>> 13;
+ h *= m;
+ h ^= h >>> 15;
+
+ buf.order(byteOrder);
+ return h;
+ }
+
+
+ public static long hash64A(byte[] data, int seed) {
+ return hash64A(ByteBuffer.wrap(data), seed);
+ }
+
+ public static long hash64A(byte[] data, int offset, int length, int seed) {
+ return hash64A(ByteBuffer.wrap(data, offset, length), seed);
+ }
+
+ public static long hash64A(ByteBuffer buf, int seed) {
+ ByteOrder byteOrder = buf.order();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+
+ long m = 0xc6a4a7935bd1e995L;
+ int r = 47;
+
+ long h = seed ^ (buf.remaining() * m);
+
+ while (buf.remaining() >= 8) {
+ long k = buf.getLong();
+
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+
+ h ^= k;
+ h *= m;
+ }
+
+ if (buf.remaining() > 0) {
+ ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+ // for big-endian version, do this first:
+ // finish.position(8-buf.remaining());
+ finish.put(buf).rewind();
+ h ^= finish.getLong();
+ h *= m;
+ }
+
+ h ^= h >>> r;
+ h *= m;
+ h ^= h >>> r;
+
+ buf.order(byteOrder);
+ return h;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
new file mode 100644
index 0000000..10ea2c2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+
+/**
+ * Utility to connect to s3
+ */
+public class S3Util {
+ public static final S3Util INSTANCE = new S3Util();
+
+ private static final Logger LOG = Logger.getLogger(S3Util.class);
+
+ public static final String S3_PATH_START_WITH = "s3://";
+ public static final String S3_PATH_SEPARATOR = "/";
+
+ public AmazonS3 getS3Client(String accessKey, String secretKey) {
+ AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+ accessKey, secretKey);
+ AmazonS3 s3client;
+ if (awsCredentials != null) {
+ s3client = new AmazonS3Client(awsCredentials);
+ } else {
+ s3client = new AmazonS3Client();
+ }
+ return s3client;
+ }
+
+ public TransferManager getTransferManager(String accessKey, String secretKey) {
+ AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+ accessKey, secretKey);
+ TransferManager transferManager;
+ if (awsCredentials != null) {
+ transferManager = new TransferManager(awsCredentials);
+ } else {
+ transferManager = new TransferManager();
+ }
+ return transferManager;
+ }
+
+ public void shutdownTransferManager(TransferManager transferManager) {
+ if (transferManager != null) {
+ transferManager.shutdownNow();
+ }
+ }
+
+ public String getBucketName(String s3Path) {
+ String bucketName = null;
+ // s3path
+ if (s3Path != null) {
+ String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+ S3_PATH_SEPARATOR);
+ bucketName = s3PathParts[0];
+ }
+ return bucketName;
+ }
+
+ public String getS3Key(String s3Path) {
+ StringBuilder s3Key = new StringBuilder();
+ // s3path
+ if (s3Path != null) {
+ String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+ S3_PATH_SEPARATOR);
+ ArrayList<String> s3PathList = new ArrayList<String>(
+ Arrays.asList(s3PathParts));
+ s3PathList.remove(0);// remove bucketName
+ for (int index = 0; index < s3PathList.size(); index++) {
+ if (index > 0) {
+ s3Key.append(S3_PATH_SEPARATOR);
+ }
+ s3Key.append(s3PathList.get(index));
+ }
+ }
+ return s3Key.toString();
+ }
+
+ public void uploadFileTos3(String bucketName, String s3Key, File localFile,
+ String accessKey, String secretKey) {
+ TransferManager transferManager = getTransferManager(accessKey, secretKey);
+ try {
+ Upload upload = transferManager.upload(bucketName, s3Key, localFile);
+ upload.waitForUploadResult();
+ } catch (AmazonClientException | InterruptedException e) {
+ LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
+ e);
+ } finally {
+ shutdownTransferManager(transferManager);
+ }
+ }
+
+ /**
+ * Get the buffer reader to read s3 file as a stream
+ */
+ public BufferedReader getReader(String s3Path, String accessKey,
+ String secretKey) throws IOException {
+ // TODO error handling
+ // Compression support
+ // read header and decide the compression(auto detection)
+ // For now hard-code GZIP compression
+ String s3Bucket = getBucketName(s3Path);
+ String s3Key = getS3Key(s3Path);
+ S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
+ new GetObjectRequest(s3Bucket, s3Key));
+ GZIPInputStream objectInputStream;
+ try {
+ objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
+ objectInputStream));
+ return bufferedReader;
+ } catch (IOException e) {
+ LOG.error("Error in creating stream reader for s3 file :" + s3Path,
+ e.getCause());
+ throw e;
+ }
+ }
+
+ public void writeIntoS3File(String data, String bucketName, String s3Key,
+ String accessKey, String secretKey) {
+ InputStream in = null;
+ try {
+ in = IOUtils.toInputStream(data, "UTF-8");
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ if (in != null) {
+ TransferManager transferManager = getTransferManager(accessKey, secretKey);
+ try {
+ if (transferManager != null) {
+ transferManager.upload(
+ new PutObjectRequest(bucketName, s3Key, in,
+ new ObjectMetadata())).waitForUploadResult();
+ LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
+ + bucketName);
+ }
+ } catch (AmazonClientException | InterruptedException e) {
+ LOG.error(e);
+ } finally {
+ try {
+ shutdownTransferManager(transferManager);
+ in.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
index aaf809f..44113e1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.util;
import java.io.IOException;
import java.util.HashMap;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
index 9f943ec..3aa8d7b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -21,9 +21,9 @@ package org.apache.ambari.logfeeder.filter;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ambari.logfeeder.OutputMgr;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputMgr;
import org.apache.log4j.Logger;
import org.easymock.Capture;
import org.easymock.CaptureType;
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index cdec4df..64e9b69 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -25,10 +25,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.OutputMgr;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
import org.easymock.Capture;
import org.easymock.CaptureType;
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
index 58db8f2..849e4c3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -21,8 +21,8 @@ package org.apache.ambari.logfeeder.filter;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ambari.logfeeder.OutputMgr;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputMgr;
import org.apache.log4j.Logger;
import org.easymock.Capture;
import org.easymock.CaptureType;
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 2242a83..42e81da 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.InputMgr;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.commons.io.FileUtils;
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
index 2df03bd..0652182 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -24,7 +24,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.log4j.Logger;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
index 49cee56..cc6da56 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
@@ -18,11 +18,12 @@
package org.apache.ambari.logfeeder.output;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+
public class S3LogPathResolverTest {
@Test
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
index a0c398e..c64e0c5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
@@ -18,7 +18,7 @@
package org.apache.ambari.logfeeder.output;
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.S3Util;
import org.junit.Test;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
deleted file mode 100644
index 4f0d1aa..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-public class AWSUtilTest {
- public void testAWSUtil_getAwsUserName() throws Exception {
- String S3_ACCESS_KEY = "S3_ACCESS_KEY";
- String S3_SECRET_KEY = "S3_SECRET_KEY";
- AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
deleted file mode 100644
index af14140..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-import static org.junit.Assert.assertEquals;
-
-public class S3UtilTest {
- public void testS3Util_pathToBucketName() throws Exception {
- String s3Path = "s3://bucket_name/path/file.txt";
- String expectedBucketName = "bucket_name";
- String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path);
- assertEquals(expectedBucketName, actualBucketName);
- }
-
- public void testS3Util_pathToS3Key() throws Exception {
- String s3Path = "s3://bucket_name/path/file.txt";
- String expectedS3key = "path/file.txt";
- String actualS3key = S3Util.INSTANCE.getS3Key(s3Path);
- assertEquals(expectedS3key, actualS3key);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
new file mode 100644
index 0000000..6df2283
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import org.apache.ambari.logfeeder.util.AWSUtil;
+
+public class AWSUtilTest {
+ public void testAWSUtil_getAwsUserName() throws Exception {
+ String S3_ACCESS_KEY = "S3_ACCESS_KEY";
+ String S3_SECRET_KEY = "S3_SECRET_KEY";
+ AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
new file mode 100644
index 0000000..84554b0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.ambari.logfeeder.util.S3Util;
+
+public class S3UtilTest {
+ public void testS3Util_pathToBucketName() throws Exception {
+ String s3Path = "s3://bucket_name/path/file.txt";
+ String expectedBucketName = "bucket_name";
+ String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path);
+ assertEquals(expectedBucketName, actualBucketName);
+ }
+
+ public void testS3Util_pathToS3Key() throws Exception {
+ String s3Path = "s3://bucket_name/path/file.txt";
+ String expectedS3key = "path/file.txt";
+ String actualS3key = S3Util.INSTANCE.getS3Key(s3Path);
+ assertEquals(expectedS3key, actualS3key);
+ }
+
+}