You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/13 10:41:31 UTC

[14/51] [abbrv] ambari git commit: AMBARI-18236. Fix package structure in Logfeeder (Miklos Gergely via oleewere)

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
new file mode 100644
index 0000000..32029ff
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.util;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.LogFeeder;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.collect.ObjectArrays;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * This class contains utility methods used by LogFeeder
+ */
+public class LogFeederUtil {
+  private static final Logger logger = Logger.getLogger(LogFeederUtil.class);
+
+  private static final int HASH_SEED = 31174077;
+  public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+  public final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+  private static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
+
+  private static Properties props;
+
+  private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
+  private static int logInterval = 30000; // 30 seconds
+
+  public static String hostName = null;
+  public static String ipAddress = null;
+  
+  private static String logfeederTempDir = null;
+  
+  private static final Object _LOCK = new Object();
+  
+  static{
+    setHostNameAndIP();
+  }
+  
+  public static Gson getGson() {
+    return gson;
+  }
+
+  private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT);
+      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+      return sdf;
+    }
+  };
+
+  /**
+   * This method will read the properties from System, followed by propFile
+   * and finally from the map
+   */
+  public static void loadProperties(String propFile, String[] propNVList)
+    throws Exception {
+    logger.info("Loading properties. propFile=" + propFile);
+    props = new Properties(System.getProperties());
+    boolean propLoaded = false;
+
+    // First get properties file path from environment value
+    String propertiesFilePath = System.getProperty("properties");
+    if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) {
+      File propertiesFile = new File(propertiesFilePath);
+      if (propertiesFile.exists() && propertiesFile.isFile()) {
+        logger.info("Properties file path set in environment. Loading properties file="
+          + propertiesFilePath);
+        FileInputStream fileInputStream = null;
+        try {
+          fileInputStream = new FileInputStream(propertiesFile);
+          props.load(fileInputStream);
+          propLoaded = true;
+        } catch (Throwable t) {
+          logger.error("Error loading properties file. properties file="
+            + propertiesFile.getAbsolutePath());
+        } finally {
+          if (fileInputStream != null) {
+            try {
+              fileInputStream.close();
+            } catch (Throwable t) {
+              // Ignore error
+            }
+          }
+        }
+      } else {
+        logger.error("Properties file path set in environment, but file not found. properties file="
+          + propertiesFilePath);
+      }
+    }
+
+    if (!propLoaded) {
+      BufferedInputStream fileInputStream = null;
+      try {
+        // Properties not yet loaded, let's try from class loader
+        fileInputStream = (BufferedInputStream) LogFeeder.class
+          .getClassLoader().getResourceAsStream(propFile);
+        if (fileInputStream != null) {
+          logger.info("Loading properties file " + propFile
+            + " from classpath");
+          props.load(fileInputStream);
+          propLoaded = true;
+        } else {
+          logger.fatal("Properties file not found in classpath. properties file name= "
+            + propFile);
+        }
+      } finally {
+        if (fileInputStream != null) {
+          try {
+            fileInputStream.close();
+          } catch (IOException e) {
+          }
+        }
+      }
+    }
+
+    if (!propLoaded) {
+      logger.fatal("Properties file is not loaded.");
+      throw new Exception("Properties not loaded");
+    } else {
+      updatePropertiesFromMap(propNVList);
+    }
+  }
+
+  private static void updatePropertiesFromMap(String[] nvList) {
+    if (nvList == null) {
+      return;
+    }
+    logger.info("Trying to load additional proeprties from argument paramters. nvList.length="
+      + nvList.length);
+    if (nvList != null && nvList.length > 0) {
+      for (String nv : nvList) {
+        logger.info("Passed nv=" + nv);
+        if (nv.startsWith("-") && nv.length() > 1) {
+          nv = nv.substring(1);
+          logger.info("Stripped nv=" + nv);
+          int i = nv.indexOf("=");
+          if (nv.length() > i) {
+            logger.info("Candidate nv=" + nv);
+            String name = nv.substring(0, i);
+            String value = nv.substring(i + 1);
+            logger.info("Adding property from argument to properties. name="
+              + name + ", value=" + value);
+            props.put(name, value);
+          }
+        }
+      }
+    }
+  }
+
+  static public String getStringProperty(String key) {
+    if (props != null) {
+      return props.getProperty(key);
+    }
+    return null;
+  }
+
+  static public String getStringProperty(String key, String defaultValue) {
+    if (props != null) {
+      return props.getProperty(key, defaultValue);
+    }
+    return defaultValue;
+  }
+
+  static public boolean getBooleanProperty(String key, boolean defaultValue) {
+    String strValue = getStringProperty(key);
+    return toBoolean(strValue, defaultValue);
+  }
+
+  private static boolean toBoolean(String strValue, boolean defaultValue) {
+    boolean retValue = defaultValue;
+    if (!StringUtils.isEmpty(strValue)) {
+      if (strValue.equalsIgnoreCase("true")
+        || strValue.equalsIgnoreCase("yes")) {
+        retValue = true;
+      } else {
+        retValue = false;
+      }
+    }
+    return retValue;
+  }
+
+  static public int getIntProperty(String key, int defaultValue) {
+    String strValue = getStringProperty(key);
+    int retValue = defaultValue;
+    retValue = objectToInt(strValue, retValue, ", key=" + key);
+    return retValue;
+  }
+
+  public static int objectToInt(Object objValue, int retValue,
+                                String errMessage) {
+    if (objValue == null) {
+      return retValue;
+    }
+    String strValue = objValue.toString();
+    if (!StringUtils.isEmpty(strValue)) {
+      try {
+        retValue = Integer.parseInt(strValue);
+      } catch (Throwable t) {
+        logger.error("Error parsing integer value. str=" + strValue
+          + ", " + errMessage);
+      }
+    }
+    return retValue;
+  }
+
+  public static boolean isEnabled(Map<String, Object> conditionConfigs,
+                                  Map<String, Object> valueConfigs) {
+    boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> conditions = (Map<String, Object>) conditionConfigs
+      .get("conditions");
+    if (conditions != null && conditions.size() > 0) {
+      allow = false;
+      for (String conditionType : conditions.keySet()) {
+        if (conditionType.equalsIgnoreCase("fields")) {
+          @SuppressWarnings("unchecked")
+          Map<String, Object> fields = (Map<String, Object>) conditions
+            .get("fields");
+          for (String fieldName : fields.keySet()) {
+            Object values = fields.get(fieldName);
+            if (values instanceof String) {
+              allow = isFieldConditionMatch(valueConfigs,
+                fieldName, (String) values);
+            } else {
+              @SuppressWarnings("unchecked")
+              List<String> listValues = (List<String>) values;
+              for (String stringValue : listValues) {
+                allow = isFieldConditionMatch(valueConfigs,
+                  fieldName, stringValue);
+                if (allow) {
+                  break;
+                }
+              }
+            }
+            if (allow) {
+              break;
+            }
+          }
+        }
+        if (allow) {
+          break;
+        }
+      }
+    }
+    return allow;
+  }
+
+  public static boolean isFieldConditionMatch(Map<String, Object> configs,
+                                              String fieldName, String stringValue) {
+    boolean allow = false;
+    String fieldValue = (String) configs.get(fieldName);
+    if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
+      allow = true;
+    } else {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> addFields = (Map<String, Object>) configs
+        .get("add_fields");
+      if (addFields != null && addFields.get(fieldName) != null) {
+        String addFieldValue = (String) addFields.get(fieldName);
+        if (stringValue.equalsIgnoreCase(addFieldValue)) {
+          allow = true;
+        }
+      }
+
+    }
+    return allow;
+  }
+
+  public static void logStatForMetric(MetricCount metric, String prefixStr,
+                                      String postFix) {
+    long currStat = metric.count;
+    long currMS = System.currentTimeMillis();
+    if (currStat > metric.prevLogCount) {
+      if (postFix == null) {
+        postFix = "";
+      }
+      logger.info(prefixStr + ": total_count=" + metric.count
+        + ", duration=" + (currMS - metric.prevLogMS) / 1000
+        + " secs, count=" + (currStat - metric.prevLogCount)
+        + postFix);
+    }
+    metric.prevLogCount = currStat;
+    metric.prevLogMS = currMS;
+  }
+
+  public static Map<String, Object> cloneObject(Map<String, Object> map) {
+    if (map == null) {
+      return null;
+    }
+    String jsonStr = gson.toJson(map);
+    Type type = new TypeToken<Map<String, Object>>() {
+    }.getType();
+    return gson.fromJson(jsonStr, type);
+  }
+
+  public static Map<String, Object> toJSONObject(String jsonStr) {
+    if(jsonStr==null || jsonStr.trim().isEmpty()){
+      return new HashMap<String, Object>();
+    }
+    Type type = new TypeToken<Map<String, Object>>() {
+    }.getType();
+    return gson.fromJson(jsonStr, type);
+  }
+
+  static public boolean logErrorMessageByInterval(String key, String message,
+                                                  Throwable e, Logger callerLogger, Level level) {
+
+    LogHistory log = logHistoryList.get(key);
+    if (log == null) {
+      log = new LogHistory();
+      logHistoryList.put(key, log);
+    }
+    if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
+      log.lastLogTime = System.currentTimeMillis();
+      int counter = log.counter;
+      log.counter = 0;
+      if (counter > 0) {
+        message += ". Messages suppressed before: " + counter;
+      }
+      if (e == null) {
+        callerLogger.log(level, message);
+      } else {
+        callerLogger.log(level, message, e);
+      }
+
+      return true;
+    } else {
+      log.counter++;
+    }
+    return false;
+
+  }
+
+  static public String subString(String str, int maxLength) {
+    if (str == null || str.length() == 0) {
+      return "";
+    }
+    maxLength = str.length() < maxLength ? str.length() : maxLength;
+    return str.substring(0, maxLength);
+  }
+
+  public static long genHash(String value) {
+    if (value == null) {
+      value = "null";
+    }
+    return MurmurHash.hash64A(value.getBytes(), HASH_SEED);
+  }
+
+  private static class LogHistory {
+    private long lastLogTime = 0;
+    private int counter = 0;
+  }
+
+  public static String getDate(String timeStampStr) {
+    try {
+      return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr)));
+    } catch (Exception ex) {
+      logger.error(ex);
+      return null;
+    }
+  }
+
+  public static String getActualDateStr() {
+    try {
+      return dateFormatter.get().format(new Date());
+    } catch (Exception ex) {
+      logger.error(ex);
+      return null;
+    }
+  }
+
+  public static File getFileFromClasspath(String filename) {
+    URL fileCompleteUrl = Thread.currentThread().getContextClassLoader()
+      .getResource(filename);
+    logger.debug("File Complete URI :" + fileCompleteUrl);
+    File file = null;
+    try {
+      file = new File(fileCompleteUrl.toURI());
+    } catch (Exception exception) {
+      logger.debug(exception.getMessage(), exception.getCause());
+    }
+    return file;
+  }
+
+  public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) {
+    Object instance = null;
+    try {
+      instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+    } catch (Exception exception) {
+      logger.error("Unsupported class =" + classFullName, exception.getCause());
+    }
+    // check instance class as par aliasType
+    if (instance != null) {
+      boolean isValid = false;
+      switch (aliasType) {
+        case FILTER:
+          isValid = Filter.class.isAssignableFrom(instance.getClass());
+          break;
+        case INPUT:
+          isValid = Input.class.isAssignableFrom(instance.getClass());
+          break;
+        case OUTPUT:
+          isValid = Output.class.isAssignableFrom(instance.getClass());
+          break;
+        case MAPPER:
+          isValid = Mapper.class.isAssignableFrom(instance.getClass());
+          break;
+        default:
+          // by default consider all are valid class
+          isValid = true;
+      }
+      if (!isValid) {
+        logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
+      }
+    }
+    return instance;
+  }
+
+  public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {
+      });
+      return jsonmap;
+    } catch (JsonParseException e) {
+      logger.error(e, e.getCause());
+    } catch (JsonMappingException e) {
+      logger.error(e, e.getCause());
+    } catch (IOException e) {
+      logger.error(e, e.getCause());
+    }
+    return new HashMap<String, Object>();
+  }
+
+  public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
+    if (list != null) {
+      for (String value : list) {
+        if (value != null) {
+          if (caseSensitive) {
+            if (value.equals(str)) {
+              return true;
+            }
+          } else {
+            if (value.equalsIgnoreCase(str)) {
+              return true;
+            }
+          }
+          if (value.equalsIgnoreCase(LogFeederConstants.ALL)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+  
+  
+  private static synchronized String setHostNameAndIP() {
+    if (hostName == null || ipAddress == null) {
+      try {
+        InetAddress ip = InetAddress.getLocalHost();
+        ipAddress = ip.getHostAddress();
+        String getHostName = ip.getHostName();
+        String getCanonicalHostName = ip.getCanonicalHostName();
+        if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
+          logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
+          hostName = getCanonicalHostName;
+        } else {
+          logger.info("Using getHostName()=" + getHostName);
+          hostName = getHostName;
+        }
+        logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName
+            + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName="
+            + hostName);
+      } catch (UnknownHostException e) {
+        logger.error("Error getting hostname.", e);
+      }
+    }
+    return hostName;
+  }
+
+  public static String[] mergeArray(String[] first, String[] second) {
+    if (first == null) {
+      first = new String[0];
+    }
+    if (second == null) {
+      second = new String[0];
+    }
+    String[] mergedArray = ObjectArrays.concat(first, second, String.class);
+    return mergedArray;
+  }
+  
+  public static String getLogfeederTempDir() {
+    if (logfeederTempDir == null) {
+      synchronized (_LOCK) {
+        if (logfeederTempDir == null) {
+          String tempDirValue = getStringProperty("logfeeder.tmp.dir",
+              "/tmp/$username/logfeeder/");
+          HashMap<String, String> contextParam = new HashMap<String, String>();
+          String username = System.getProperty("user.name");
+          contextParam.put("username", username);
+          logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue,
+              contextParam);
+        }
+      }
+    }
+    return logfeederTempDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java
new file mode 100644
index 0000000..dbbefaf
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.util;
+
+import com.google.common.primitives.Ints;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general hash-based
+ * lookup.  See http://murmurhash.googlepages.com/ for more details.
+ * <p/>
+ * <p>The C version of MurmurHash 2.0 found at that site was ported
+ * to Java by Andrzej Bialecki (ab at getopt org).</p>
+ */
+public final class MurmurHash {
+
+  private MurmurHash() {
+  }
+
+  /**
+   * Hashes an int.
+   *
+   * @param data The int to hash.
+   * @param seed The seed for the hash.
+   * @return The 32 bit hash of the bytes in question.
+   */
+  public static int hash(int data, int seed) {
+    return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed);
+  }
+
+  /**
+   * Hashes bytes in an array.
+   *
+   * @param data The bytes to hash.
+   * @param seed The seed for the hash.
+   * @return The 32 bit hash of the bytes in question.
+   */
+  public static int hash(byte[] data, int seed) {
+    return hash(ByteBuffer.wrap(data), seed);
+  }
+
+  /**
+   * Hashes bytes in part of an array.
+   *
+   * @param data   The data to hash.
+   * @param offset Where to start munging.
+   * @param length How many bytes to process.
+   * @param seed   The seed to start with.
+   * @return The 32-bit hash of the data in question.
+   */
+  public static int hash(byte[] data, int offset, int length, int seed) {
+    return hash(ByteBuffer.wrap(data, offset, length), seed);
+  }
+
+  /**
+   * Hashes the bytes in a buffer from the current position to the limit.
+   *
+   * @param buf  The bytes to hash.
+   * @param seed The seed for the hash.
+   * @return The 32 bit murmur hash of the bytes in the buffer.
+   */
+  public static int hash(ByteBuffer buf, int seed) {
+    // save byte order for later restoration
+    ByteOrder byteOrder = buf.order();
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+
+    int m = 0x5bd1e995;
+    int r = 24;
+
+    int h = seed ^ buf.remaining();
+
+    while (buf.remaining() >= 4) {
+      int k = buf.getInt();
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h *= m;
+      h ^= k;
+    }
+
+    if (buf.remaining() > 0) {
+      ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+      // for big-endian version, use this first:
+      // finish.position(4-buf.remaining());
+      finish.put(buf).rewind();
+      h ^= finish.getInt();
+      h *= m;
+    }
+
+    h ^= h >>> 13;
+    h *= m;
+    h ^= h >>> 15;
+
+    buf.order(byteOrder);
+    return h;
+  }
+
+
+  public static long hash64A(byte[] data, int seed) {
+    return hash64A(ByteBuffer.wrap(data), seed);
+  }
+
+  public static long hash64A(byte[] data, int offset, int length, int seed) {
+    return hash64A(ByteBuffer.wrap(data, offset, length), seed);
+  }
+
+  public static long hash64A(ByteBuffer buf, int seed) {
+    ByteOrder byteOrder = buf.order();
+    buf.order(ByteOrder.LITTLE_ENDIAN);
+
+    long m = 0xc6a4a7935bd1e995L;
+    int r = 47;
+
+    long h = seed ^ (buf.remaining() * m);
+
+    while (buf.remaining() >= 8) {
+      long k = buf.getLong();
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h ^= k;
+      h *= m;
+    }
+
+    if (buf.remaining() > 0) {
+      ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+      // for big-endian version, do this first:
+      // finish.position(8-buf.remaining());
+      finish.put(buf).rewind();
+      h ^= finish.getLong();
+      h *= m;
+    }
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+
+    buf.order(byteOrder);
+    return h;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
new file mode 100644
index 0000000..10ea2c2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+
+/**
+ * Utility to connect to s3
+ */
+public class S3Util {
+  public static final S3Util INSTANCE = new S3Util();
+
+  private static final Logger LOG = Logger.getLogger(S3Util.class);
+
+  public static final String S3_PATH_START_WITH = "s3://";
+  public static final String S3_PATH_SEPARATOR = "/";
+
+  public AmazonS3 getS3Client(String accessKey, String secretKey) {
+    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+        accessKey, secretKey);
+    AmazonS3 s3client;
+    if (awsCredentials != null) {
+      s3client = new AmazonS3Client(awsCredentials);
+    } else {
+      s3client = new AmazonS3Client();
+    }
+    return s3client;
+  }
+
+  public TransferManager getTransferManager(String accessKey, String secretKey) {
+    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+        accessKey, secretKey);
+    TransferManager transferManager;
+    if (awsCredentials != null) {
+      transferManager = new TransferManager(awsCredentials);
+    } else {
+      transferManager = new TransferManager();
+    }
+    return transferManager;
+  }
+
+  public void shutdownTransferManager(TransferManager transferManager) {
+    if (transferManager != null) {
+      transferManager.shutdownNow();
+    }
+  }
+
+  public String getBucketName(String s3Path) {
+    String bucketName = null;
+    // s3path
+    if (s3Path != null) {
+      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+          S3_PATH_SEPARATOR);
+      bucketName = s3PathParts[0];
+    }
+    return bucketName;
+  }
+
+  public String getS3Key(String s3Path) {
+    StringBuilder s3Key = new StringBuilder();
+    // s3path
+    if (s3Path != null) {
+      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+          S3_PATH_SEPARATOR);
+      ArrayList<String> s3PathList = new ArrayList<String>(
+          Arrays.asList(s3PathParts));
+      s3PathList.remove(0);// remove bucketName
+      for (int index = 0; index < s3PathList.size(); index++) {
+        if (index > 0) {
+          s3Key.append(S3_PATH_SEPARATOR);
+        }
+        s3Key.append(s3PathList.get(index));
+      }
+    }
+    return s3Key.toString();
+  }
+
+  public void uploadFileTos3(String bucketName, String s3Key, File localFile,
+      String accessKey, String secretKey) {
+    TransferManager transferManager = getTransferManager(accessKey, secretKey);
+    try {
+      Upload upload = transferManager.upload(bucketName, s3Key, localFile);
+      upload.waitForUploadResult();
+    } catch (AmazonClientException | InterruptedException e) {
+      LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
+          e);
+    } finally {
+      shutdownTransferManager(transferManager);
+    }
+  }
+
+  /**
+   * Get the buffer reader to read s3 file as a stream
+   */
+  public BufferedReader getReader(String s3Path, String accessKey,
+      String secretKey) throws IOException {
+    // TODO error handling
+    // Compression support
+    // read header and decide the compression(auto detection)
+    // For now hard-code GZIP compression
+    String s3Bucket = getBucketName(s3Path);
+    String s3Key = getS3Key(s3Path);
+    S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
+        new GetObjectRequest(s3Bucket, s3Key));
+    GZIPInputStream objectInputStream;
+    try {
+      objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
+      BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
+          objectInputStream));
+      return bufferedReader;
+    } catch (IOException e) {
+      LOG.error("Error in creating stream reader for s3 file :" + s3Path,
+          e.getCause());
+      throw e;
+    }
+  }
+
+  public void writeIntoS3File(String data, String bucketName, String s3Key,
+      String accessKey, String secretKey) {
+    InputStream in = null;
+    try {
+      in = IOUtils.toInputStream(data, "UTF-8");
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    if (in != null) {
+      TransferManager transferManager = getTransferManager(accessKey, secretKey);
+      try {
+        if (transferManager != null) {
+          transferManager.upload(
+                  new PutObjectRequest(bucketName, s3Key, in,
+                  new ObjectMetadata())).waitForUploadResult();
+          LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
+              + bucketName);
+        }
+      } catch (AmazonClientException | InterruptedException e) {
+        LOG.error(e);
+      } finally {
+        try {
+          shutdownTransferManager(transferManager);
+          in.close();
+        } catch (IOException e) {
+          // ignore
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
index aaf809f..44113e1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.util;
 import java.io.IOException;
 import java.util.HashMap;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
index 9f943ec..3aa8d7b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -21,9 +21,9 @@ package org.apache.ambari.logfeeder.filter;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.OutputMgr;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputMgr;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index cdec4df..64e9b69 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -25,10 +25,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.OutputMgr;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
index 58db8f2..849e4c3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -21,8 +21,8 @@ package org.apache.ambari.logfeeder.filter;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.OutputMgr;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputMgr;
 import org.apache.log4j.Logger;
 import org.easymock.Capture;
 import org.easymock.CaptureType;

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 2242a83..42e81da 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.InputMgr;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.commons.io.FileUtils;

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
index 2df03bd..0652182 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -24,7 +24,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.log4j.Logger;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
index 49cee56..cc6da56 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
@@ -18,11 +18,12 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+
 public class S3LogPathResolverTest {
 
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
index a0c398e..c64e0c5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.S3Util;
 import org.junit.Test;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
deleted file mode 100644
index 4f0d1aa..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-public class AWSUtilTest {
-  public void testAWSUtil_getAwsUserName() throws Exception {
-    String S3_ACCESS_KEY = "S3_ACCESS_KEY";
-    String S3_SECRET_KEY = "S3_SECRET_KEY";
-    AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
deleted file mode 100644
index af14140..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-import static org.junit.Assert.assertEquals;
-
-public class S3UtilTest {
-  public void testS3Util_pathToBucketName() throws Exception {
-    String s3Path = "s3://bucket_name/path/file.txt";
-    String expectedBucketName = "bucket_name";
-    String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path);
-    assertEquals(expectedBucketName, actualBucketName);
-  }
-
-  public void testS3Util_pathToS3Key() throws Exception {
-    String s3Path = "s3://bucket_name/path/file.txt";
-    String expectedS3key = "path/file.txt";
-    String actualS3key = S3Util.INSTANCE.getS3Key(s3Path);
-    assertEquals(expectedS3key, actualS3key);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
new file mode 100644
index 0000000..6df2283
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import org.apache.ambari.logfeeder.util.AWSUtil;
+
+public class AWSUtilTest {
+  public void testAWSUtil_getAwsUserName() throws Exception {
+    String S3_ACCESS_KEY = "S3_ACCESS_KEY";
+    String S3_SECRET_KEY = "S3_SECRET_KEY";
+    AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/782c6b45/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
new file mode 100644
index 0000000..84554b0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.ambari.logfeeder.util.S3Util;
+
+public class S3UtilTest {
+  public void testS3Util_pathToBucketName() throws Exception {
+    String s3Path = "s3://bucket_name/path/file.txt";
+    String expectedBucketName = "bucket_name";
+    String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path);
+    assertEquals(expectedBucketName, actualBucketName);
+  }
+
+  public void testS3Util_pathToS3Key() throws Exception {
+    String s3Path = "s3://bucket_name/path/file.txt";
+    String expectedS3key = "path/file.txt";
+    String actualS3key = S3Util.INSTANCE.getS3Key(s3Path);
+    assertEquals(expectedS3key, actualS3key);
+  }
+
+}