You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/06/20 16:36:17 UTC

[2/2] ambari git commit: AMBARI-17045. Support loading logs to S3 (Hayat Behlim via oleewere)

AMBARI-17045. Support loading logs to S3 (Hayat Behlim via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/450eb3e4
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/450eb3e4
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/450eb3e4

Branch: refs/heads/trunk
Commit: 450eb3e4fb9af66eb09def974e9ba7d6e7a9ccc8
Parents: 9825bc2
Author: oleewere <ol...@gmail.com>
Authored: Mon Jun 20 18:29:55 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Mon Jun 20 18:31:24 2016 +0200

----------------------------------------------------------------------
 .../ambari-logsearch-logfeeder/pom.xml          |  17 +-
 .../apache/ambari/logfeeder/ConfigBlock.java    |   3 +-
 .../org/apache/ambari/logfeeder/InputMgr.java   |  35 +-
 .../org/apache/ambari/logfeeder/LogFeeder.java  | 100 +++-
 .../apache/ambari/logfeeder/LogFeederUtil.java  |  68 +++
 .../org/apache/ambari/logfeeder/OutputMgr.java  |  46 +-
 .../apache/ambari/logfeeder/input/Input.java    |   4 +
 .../ambari/logfeeder/input/InputFile.java       |  61 ++-
 .../ambari/logfeeder/input/InputS3File.java     | 494 +++++++++++++++++++
 .../apache/ambari/logfeeder/output/Output.java  |  10 +-
 .../ambari/logfeeder/output/OutputDevNull.java  |  10 +-
 .../ambari/logfeeder/output/OutputFile.java     |   7 +
 .../ambari/logfeeder/output/OutputKafka.java    |   8 +
 .../ambari/logfeeder/output/OutputS3File.java   | 227 +++++++++
 .../ambari/logfeeder/output/OutputSolr.java     |  14 +
 .../org/apache/ambari/logfeeder/s3/AWSUtil.java |  92 ++++
 .../org/apache/ambari/logfeeder/s3/S3Util.java  | 233 +++++++++
 .../ambari/logfeeder/util/CompressionUtil.java  |  89 ++++
 .../apache/ambari/logfeeder/util/FileUtil.java  |  44 ++
 .../ambari/logfeeder/util/PlaceholderUtil.java  |  79 +++
 .../src/main/resources/alias_config.json        |   8 +-
 .../src/main/resources/logfeeder.properties     |   3 +
 .../apache/ambari/logfeeder/s3/AWSUtilTest.java |  30 ++
 .../apache/ambari/logfeeder/s3/S3UtilTest.java  |  44 ++
 .../logfeeder/util/PlaceholderUtilTest.java     |  48 ++
 25 files changed, 1697 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index bbe0bc9..0684a95 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -120,7 +120,22 @@
       <artifactId>ambari-metrics-common</artifactId>
       <version>${project.version}</version>
     </dependency>
-  </dependencies>
+    <dependency>
+    <groupId>com.amazonaws</groupId>
+    <artifactId>aws-java-sdk-s3</artifactId>
+    <version>1.11.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>1.11</version>
+    </dependency>
+    <dependency>
+    <groupId>com.amazonaws</groupId>
+    <artifactId>aws-java-sdk-iam</artifactId>
+    <version>1.11.5</version>
+  </dependency>
+ </dependencies>
   <build>
     <finalName>LogFeeder</finalName>
     <pluginManagement>

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
index 6b78e2a..521319e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
@@ -22,11 +22,11 @@ package org.apache.ambari.logfeeder;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
 
+
 public abstract class ConfigBlock {
   static private Logger logger = Logger.getLogger(ConfigBlock.class);
 
@@ -258,5 +258,4 @@ public abstract class ConfigBlock {
   public void setDrain(boolean drain) {
     this.drain = drain;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
index 445c294..4359c78 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
@@ -65,6 +65,8 @@ public class InputMgr {
   MetricCount filesCountMetric = new MetricCount();
 
   private String checkPointExtension = ".cp";
+  
+  private Thread inputIsReadyMonitor = null;
 
   public List<Input> getInputList() {
     return inputList;
@@ -224,7 +226,7 @@ public class InputMgr {
     }
     // Start the monitoring thread if any file is in tail mode
     if (isAnyInputTail) {
-      Thread monitorThread = new Thread("InputIsReadyMonitor") {
+       inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
         @Override
         public void run() {
           logger.info("Going to monitor for these missing files: "
@@ -255,7 +257,7 @@ public class InputMgr {
           }
         }
       };
-      monitorThread.start();
+      inputIsReadyMonitor.start();
     }
   }
 
@@ -542,4 +544,33 @@ public class InputMgr {
 
   }
 
+  /**
+   * 
+   */
+  public void waitOnAllInputs() {
+    //wait on inputs
+    if (inputList != null) {
+      for (Input input : inputList) {
+        if (input != null) {
+          Thread inputThread = input.getThread();
+          if (inputThread != null) {
+            try {
+              inputThread.join();
+            } catch (InterruptedException e) {
+              // ignore
+            }
+          }
+        }
+      }
+    }
+    // wait on monitor
+    if (inputIsReadyMonitor != null) {
+      try {
+        this.close();
+        inputIsReadyMonitor.join();
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index f3dd4bf..166c0f3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -42,6 +42,7 @@ import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
 import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -57,16 +58,20 @@ public class LogFeeder {
   InputMgr inputMgr = new InputMgr();
   MetricsMgr metricsMgr = new MetricsMgr();
 
-  Map<String, Object> globalMap = null;
+  public static Map<String, Object> globalMap = null;
   String[] inputParams;
 
   List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>();
   List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
   List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>();
   List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>();
-
+  
   int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours
   long lastCheckPointCleanedMS = 0;
+  
+  private static boolean isLogfeederCompleted = false;
+  
+  private Thread statLoggerThread = null;
 
   public LogFeeder(String[] args) {
     inputParams = args;
@@ -80,14 +85,26 @@ public class LogFeeder {
     // loop the properties and load them
     // Load the configs
     String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files");
-    if (configFiles == null) {
-      configFiles = LogFeederUtil.getStringProperty("config.file",
-        "config.json");
-    }
     logger.info("logfeeder.config.files=" + configFiles);
-    String[] configFileList = configFiles.split(",");
-    for (String configFileName : configFileList) {
+    
+    String[] configFileList = null;
+    if (configFiles != null) {
+      configFileList = configFiles.split(",");
+    }
+    //list of config those are there in cmd line config dir , end with .json
+    String[] cmdLineConfigs = getConfigFromCmdLine();
+    //merge both config
+    String mergedConfigList[] = LogFeederUtil.mergeArray(configFileList,
+        cmdLineConfigs);
+    //mergedConfigList is null then set default conifg 
+    if (mergedConfigList == null || mergedConfigList.length == 0) {
+      mergedConfigList = LogFeederUtil.getStringProperty("config.file",
+          "config.json").split(",");
+    }
+    for (String configFileName : mergedConfigList) {
       logger.info("Going to load config file:" + configFileName);
+      //escape space from config file path
+      configFileName= configFileName.replace("\\ ", "%20");
       File configFile = new File(configFileName);
       if (configFile.exists() && configFile.isFile()) {
         logger.info("Config file exists in path."
@@ -97,7 +114,7 @@ public class LogFeeder {
         // Let's try to load it from class loader
         logger.info("Trying to load config file from classloader: "
           + configFileName);
-        laodConfigsUsingClassLoader(configFileName);
+        loadConfigsUsingClassLoader(configFileName);
         logger.info("Loaded config file from classloader: "
           + configFileName);
       }
@@ -114,7 +131,7 @@ public class LogFeeder {
     logger.debug("==============");
   }
 
-  void laodConfigsUsingClassLoader(String configFileName) throws Exception {
+  void loadConfigsUsingClassLoader(String configFileName) throws Exception {
     BufferedInputStream fileInputStream = (BufferedInputStream) this
       .getClass().getClassLoader()
       .getResourceAsStream(configFileName);
@@ -451,7 +468,7 @@ public class LogFeeder {
     inputMgr.monitor();
     Runtime.getRuntime().addShutdownHook(new JVMShutdownHook());
 
-    Thread statLogger = new Thread("statLogger") {
+    statLoggerThread = new Thread("statLogger") {
 
       @Override
       public void run() {
@@ -473,12 +490,17 @@ public class LogFeeder {
             lastCheckPointCleanedMS = System.currentTimeMillis();
             inputMgr.cleanCheckPointFiles();
           }
+
+          // logfeeder is stopped then break the loop
+          if (isLogfeederCompleted) {
+            break;
+          }
         }
       }
 
     };
-    statLogger.setDaemon(true);
-    statLogger.start();
+    statLoggerThread.setDaemon(true);
+    statLoggerThread.start();
 
   }
 
@@ -524,24 +546,25 @@ public class LogFeeder {
 
   public static void main(String[] args) {
     LogFeeder logFeeder = new LogFeeder(args);
-    logFeeder.run(logFeeder);
+    logFeeder.run();
   }
 
 
   public static void run(String[] args) {
     LogFeeder logFeeder = new LogFeeder(args);
-    logFeeder.run(logFeeder);
+    logFeeder.run();
   }
 
-  public void run(LogFeeder logFeeder) {
+  public void run() {
     try {
       Date startTime = new Date();
-      logFeeder.init();
+      this.init();
       Date endTime = new Date();
       logger.info("Took " + (endTime.getTime() - startTime.getTime())
         + " ms to initialize");
-      logFeeder.monitor();
-
+      this.monitor();
+      //wait for all background thread before stop main thread
+      this.waitOnAllDaemonThreads();
     } catch (Throwable t) {
       logger.fatal("Caught exception in main.", t);
       System.exit(1);
@@ -566,5 +589,42 @@ public class LogFeeder {
       }
     }
   }
-
+  
+  public void waitOnAllDaemonThreads() {
+    String foreground = LogFeederUtil.getStringProperty("foreground");
+    if (foreground != null && foreground.equalsIgnoreCase("true")) {
+      // wait on inputmgr daemon threads
+      inputMgr.waitOnAllInputs();
+      // set isLogfeederCompleted to true to stop statLoggerThread
+      isLogfeederCompleted = true;
+      if (statLoggerThread != null) {
+        try {
+          statLoggerThread.join();
+        } catch (InterruptedException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+  
+  private String[] getConfigFromCmdLine() {
+    String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
+    if (inputConfigDir != null && !inputConfigDir.isEmpty()) {
+      String[] searchFileWithExtensions = new String[] { "json" };
+      File configDirFile = new File(inputConfigDir);
+      List<File> configFiles = FileUtil.getAllFileFromDir(configDirFile,
+          searchFileWithExtensions, false);
+      if (configFiles != null && configFiles.size() > 0) {
+        String configPaths[] = new String[configFiles.size()];
+        for (int index = 0; index < configFiles.size(); index++) {
+          File configFile = configFiles.get(index);
+          String configFilePath = configFile.getAbsolutePath();
+          configPaths[index] = configFilePath;
+        }
+        return configPaths;
+      }
+    }
+    return new String[0];
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
index 7a30d72..7a68b4d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
@@ -24,7 +24,9 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -40,6 +42,7 @@ import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
 import org.apache.ambari.logfeeder.mapper.Mapper;
 import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -48,6 +51,7 @@ import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
+import com.google.common.collect.ObjectArrays;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -68,6 +72,18 @@ public class LogFeederUtil {
   private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
   private static int logInterval = 30000; // 30 seconds
 
+  public static String hostName = null;
+  public static String ipAddress = null;
+  
+  public static String logfeederTempDir = null;
+  
+  public static final Object _LOCK = new Object();
+  
+  static{
+    //set hostname and hostIp
+    setHostNameAndIP();
+  }
+  
   public static Gson getGson() {
     return gson;
   }
@@ -483,5 +499,57 @@ public class LogFeederUtil {
     }
     return false;
   }
+  
+  
+  public static synchronized String setHostNameAndIP() {
+    if (hostName == null || ipAddress == null) {
+      try {
+        InetAddress ip = InetAddress.getLocalHost();
+        ipAddress = ip.getHostAddress();
+        String getHostName = ip.getHostName();
+        String getCanonicalHostName = ip.getCanonicalHostName();
+        if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
+          logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
+          hostName = getCanonicalHostName;
+        } else {
+          logger.info("Using getHostName()=" + getHostName);
+          hostName = getHostName;
+        }
+        logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName
+            + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName="
+            + hostName);
+      } catch (UnknownHostException e) {
+        logger.error("Error getting hostname.", e);
+      }
+    }
+    return hostName;
+  }
 
+  public static String[] mergeArray(String[] first, String[] second) {
+    if (first == null) {
+      first = new String[0];
+    }
+    if (second == null) {
+      second = new String[0];
+    }
+    String[] mergedArray = ObjectArrays.concat(first, second, String.class);
+    return mergedArray;
+  }
+  
+  public static String getLogfeederTempDir() {
+    if (logfeederTempDir == null) {
+      synchronized (_LOCK) {
+        if (logfeederTempDir == null) {
+          String tempDirValue = getStringProperty("logfeeder.tmp.dir",
+              "/tmp/$username/logfeeder/");
+          HashMap<String, String> contextParam = new HashMap<String, String>();
+          String username = System.getProperty("user.name");
+          contextParam.put("username", username);
+          logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue,
+              contextParam);
+        }
+      }
+    }
+    return logfeederTempDir;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
index 8413878..f84457e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
@@ -19,8 +19,7 @@
 
 package org.apache.ambari.logfeeder;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -40,34 +39,13 @@ public class OutputMgr {
 
   Collection<Output> outputList = new ArrayList<Output>();
 
-  String hostName = null;
-  String ipAddress = null;
   boolean addMessageMD5 = true;
 
   private int MAX_OUTPUT_SIZE = 32765; // 32766-1
   static long doc_counter = 0;
   public MetricCount messageTruncateMetric = new MetricCount();
 
-  public OutputMgr() {
-    // Set the host for this server
-    try {
-      InetAddress ip = InetAddress.getLocalHost();
-      ipAddress = ip.getHostAddress();
-      String getHostName = ip.getHostName();
-      String getCanonicalHostName = ip.getCanonicalHostName();
-      if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
-        logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
-        hostName = getCanonicalHostName;
-      } else {
-        logger.info("Using getHostName()=" + getHostName);
-        hostName = getHostName;
-      }
-      logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName=" + hostName);
-    } catch (UnknownHostException e) {
-      logger.error("Error getting hostname.", e);
-    }
-  }
-
+  
   public Collection<Output> getOutputList() {
     return outputList;
   }
@@ -106,12 +84,12 @@ public class OutputMgr {
     }
 
     // Add host if required
-    if (jsonObj.get("host") == null && hostName != null) {
-      jsonObj.put("host", hostName);
+    if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
+      jsonObj.put("host", LogFeederUtil.hostName);
     }
     // Add IP if required
-    if (jsonObj.get("ip") == null && ipAddress != null) {
-      jsonObj.put("ip", ipAddress);
+    if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
+      jsonObj.put("ip", LogFeederUtil.ipAddress);
     }
 
     if (input.isUseEventMD5() || input.isGenEventMD5()) {
@@ -278,4 +256,16 @@ public class OutputMgr {
     }
   }
 
+  
+  public void copyFile(File inputFile, InputMarker inputMarker) {
+    Input input = inputMarker.input;
+    for (Output output : input.getOutputList()) {
+      try {
+        output.copyFile(inputFile, inputMarker);
+      }catch (Exception e) {
+        logger.error("Error coyping file . to " + output.getShortDescription(),
+            e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index ec75f2d..18e2184 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -317,5 +317,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
     }
     metricsList.add(readBytesMetric);
   }
+  
+  public Thread getThread(){
+    return thread;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 420610a..7107a69 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -251,31 +251,36 @@ public class InputFile extends Input {
    */
   @Override
   void start() throws Exception {
+
     if (logPathFiles == null || logPathFiles.length == 0) {
       return;
     }
-
-    if (isTail()) {
-      // Just process the first file
-      processFile(logPathFiles[0]);
-    } else {
-      for (File file : logPathFiles) {
-        try {
-          processFile(file);
-          if (isClosed() || isDrain()) {
-            logger.info("isClosed or isDrain. Now breaking loop.");
-            break;
+    boolean isProcessFile = getBooleanValue("process_file", true);
+    if (isProcessFile) {
+      if (isTail()) {
+        // Just process the first file
+        processFile(logPathFiles[0]);
+      } else {
+        for (File file : logPathFiles) {
+          try {
+            processFile(file);
+            if (isClosed() || isDrain()) {
+              logger.info("isClosed or isDrain. Now breaking loop.");
+              break;
+            }
+          } catch (Throwable t) {
+            logger.error("Error processing file=" + file.getAbsolutePath(), t);
           }
-        } catch (Throwable t) {
-          logger.error(
-            "Error processing file=" + file.getAbsolutePath(),
-            t);
         }
       }
+      // Call the close for the input. Which should flush to the filters and
+      // output
+      close();
+    }else{
+      //copy files
+      copyFiles(logPathFiles);
     }
-    // Call the close for the input. Which should flush to the filters and
-    // output
-    close();
+    
   }
 
   @Override
@@ -559,4 +564,24 @@ public class InputFile extends Input {
       + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0]
       .getAbsolutePath() : getStringValue("path"));
   }
+  
+  
+  public void copyFiles(File[] files) {
+    boolean isCopyFile = getBooleanValue("copy_file", false);
+    if (isCopyFile && files != null) {
+      for (File file : files) {
+        try {
+          InputMarker marker = new InputMarker();
+          marker.input = this;
+          outputMgr.copyFile(file, marker);
+          if (isClosed() || isDrain()) {
+            logger.info("isClosed or isDrain. Now breaking loop.");
+            break;
+          }
+        } catch (Throwable t) {
+          logger.error("Error processing file=" + file.getAbsolutePath(), t);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
new file mode 100644
index 0000000..d68ab96
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputS3File extends Input {
+  static private Logger logger = Logger.getLogger(InputS3File.class);
+
+  String logPath = null;
+  boolean isStartFromBegining = true;
+
+  boolean isReady = false;
+  String[] s3LogPathFiles = null;
+  Object fileKey = null;
+  String base64FileKey = null;
+
+  private boolean isRolledOver = false;
+  boolean addWildCard = false;
+
+  long lastCheckPointTimeMS = 0;
+  int checkPointIntervalMS = 5 * 1000; // 5 seconds
+  RandomAccessFile checkPointWriter = null;
+  Map<String, Object> jsonCheckPoint = null;
+
+  File checkPointFile = null;
+
+  private InputMarker lastCheckPointInputMarker = null;
+
+  private String checkPointExtension = ".cp";
+
+
+  @Override
+  public void init() throws Exception {
+    logger.info("init() called");
+    statMetric.metricsName = "input.files.read_lines";
+    readBytesMetric.metricsName = "input.files.read_bytes";
+    checkPointExtension = LogFeederUtil.getStringProperty(
+        "logfeeder.checkpoint.extension", checkPointExtension);
+
+    // Let's close the file and set it to true after we start monitoring it
+    setClosed(true);
+    logPath = getStringValue("path");
+    tail = getBooleanValue("tail", tail);
+    addWildCard = getBooleanValue("add_wild_card", addWildCard);
+    checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
+        checkPointIntervalMS);
+    if (logPath == null || logPath.isEmpty()) {
+      logger.error("path is empty for file input. " + getShortDescription());
+      return;
+    }
+
+    String startPosition = getStringValue("start_position");
+    if (StringUtils.isEmpty(startPosition)
+        || startPosition.equalsIgnoreCase("beginning")
+        || startPosition.equalsIgnoreCase("begining")) {
+      isStartFromBegining = true;
+    }
+
+    if (!tail) {
+      // start position end doesn't apply if we are not tailing
+      isStartFromBegining = true;
+    }
+
+    setFilePath(logPath);
+    boolean isFileReady = isReady();
+
+    logger.info("File to monitor " + logPath + ", tail=" + tail
+        + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
+
+    super.init();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.input.Input#isReady()
+   */
+  @Override
+  public boolean isReady() {
+    if (!isReady) {
+      // Let's try to check whether the file is available
+      s3LogPathFiles = getActualFiles(logPath);
+      if (s3LogPathFiles != null && s3LogPathFiles.length > 0) {
+        if (isTail() && s3LogPathFiles.length > 1) {
+          logger.warn("Found multiple files (" + s3LogPathFiles.length
+              + ") for the file filter " + filePath
+              + ". Will use only the first one. Using " + s3LogPathFiles[0]);
+        }
+        logger.info("File filter " + filePath + " expanded to "
+            + s3LogPathFiles[0]);
+        isReady = true;
+      } else {
+        logger.debug(logPath + " file doesn't exist. Ignoring for now");
+      }
+    }
+    return isReady;
+  }
+
+  private String[] getActualFiles(String searchPath) {
+    // TODO search file on s3
+    return new String[] { searchPath };
+  }
+
+  @Override
+  synchronized public void checkIn(InputMarker inputMarker) {
+    super.checkIn(inputMarker);
+    if (checkPointWriter != null) {
+      try {
+        int lineNumber = LogFeederUtil.objectToInt(
+            jsonCheckPoint.get("line_number"), 0, "line_number");
+        if (lineNumber > inputMarker.lineNumber) {
+          // Already wrote higher line number for this input
+          return;
+        }
+        // If interval is greater than last checkPoint time, then write
+        long currMS = System.currentTimeMillis();
+        if (!isClosed()
+            && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+          // Let's save this one so we can update the check point file
+          // on flush
+          lastCheckPointInputMarker = inputMarker;
+          return;
+        }
+        lastCheckPointTimeMS = currMS;
+
+        jsonCheckPoint.put("line_number", ""
+            + new Integer(inputMarker.lineNumber));
+        jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+        jsonCheckPoint.put("last_write_time_date", new Date());
+
+        String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+        // Let's rewind
+        checkPointWriter.seek(0);
+        checkPointWriter.writeInt(jsonStr.length());
+        checkPointWriter.write(jsonStr.getBytes());
+
+        if (isClosed()) {
+          final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+              + "_FINAL_CHECKIN";
+          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+              "Wrote final checkPoint, input=" + getShortDescription()
+                  + ", checkPointFile=" + checkPointFile.getAbsolutePath()
+                  + ", checkPoint=" + jsonStr, null, logger, Level.INFO);
+        }
+      } catch (Throwable t) {
+        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+            + "_CHECKIN_EXCEPTION";
+        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+            "Caught exception checkIn. , input=" + getShortDescription(), t,
+            logger, Level.ERROR);
+      }
+    }
+
+  }
+
+  @Override
+  public void checkIn() {
+    super.checkIn();
+    if (lastCheckPointInputMarker != null) {
+      checkIn(lastCheckPointInputMarker);
+    }
+  }
+
+  @Override
+  public void rollOver() {
+    logger.info("Marking this input file for rollover. "
+        + getShortDescription());
+    isRolledOver = true;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.input.Input#monitor()
+   */
+  @Override
+  void start() throws Exception {
+    if (s3LogPathFiles == null || s3LogPathFiles.length == 0) {
+      return;
+    }
+
+    if (isTail()) {
+      // Just process the first file
+      processFile(s3LogPathFiles[0]);
+    } else {
+      for (String s3FilePath : s3LogPathFiles) {
+        try {
+          processFile(s3FilePath);
+          if (isClosed() || isDrain()) {
+            logger.info("isClosed or isDrain. Now breaking loop.");
+            break;
+          }
+        } catch (Throwable t) {
+          logger.error("Error processing file=" + s3FilePath, t);
+        }
+      }
+    }
+    // Call the close for the input. Which should flush to the filters and
+    // output
+    close();
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    logger.info("close() calling checkPoint checkIn(). "
+        + getShortDescription());
+    checkIn();
+  }
+
+  private void processFile(String logPathFile) throws FileNotFoundException,
+      IOException {
+    logger.info("Monitoring logPath=" + logPath + ", logPathFile="
+        + logPathFile);
+    BufferedReader br = null;
+    checkPointFile = null;
+    checkPointWriter = null;
+    jsonCheckPoint = null;
+    int resumeFromLineNumber = 0;
+
+    int lineCount = 0;
+    try {
+      setFilePath(logPathFile);
+      String s3AccessKey = getStringValue("s3_access_key");
+      String s3SecretKey = getStringValue("s3_secret_key");
+      br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
+      if(br==null){
+        //log err
+        return;
+      }
+      
+      // Whether to send to output from the beginning.
+      boolean resume = isStartFromBegining;
+
+      // Seems FileWatch is not reliable, so let's only use file key
+      // comparison
+      // inputMgr.monitorSystemFileChanges(this);
+      fileKey = getFileKey(logPathFile);
+      base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+      logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". "
+          + getShortDescription());
+
+      if (isTail()) {
+        try {
+          // Let's see if there is a checkpoint for this file
+          logger.info("Checking existing checkpoint file. "
+              + getShortDescription());
+
+          String fileBase64 = Base64.byteArrayToBase64(fileKey.toString()
+              .getBytes());
+          String checkPointFileName = fileBase64 + checkPointExtension;
+          File checkPointFolder = inputMgr.getCheckPointFolderFile();
+          checkPointFile = new File(checkPointFolder, checkPointFileName);
+          checkPointWriter = new RandomAccessFile(checkPointFile, "rw");
+
+          try {
+            int contentSize = checkPointWriter.readInt();
+            byte b[] = new byte[contentSize];
+            int readSize = checkPointWriter.read(b, 0, contentSize);
+            if (readSize != contentSize) {
+              logger
+                  .error("Couldn't read expected number of bytes from checkpoint file. expected="
+                      + contentSize
+                      + ", read="
+                      + readSize
+                      + ", checkPointFile="
+                      + checkPointFile
+                      + ", input="
+                      + getShortDescription());
+            } else {
+              // Create JSON string
+              String jsonCheckPointStr = new String(b, 0, readSize);
+              jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+              resumeFromLineNumber = LogFeederUtil.objectToInt(
+                  jsonCheckPoint.get("line_number"), 0, "line_number");
+
+              if (resumeFromLineNumber > 0) {
+                // Let's read from last line read
+                resume = false;
+              }
+              logger.info("CheckPoint. checkPointFile=" + checkPointFile
+                  + ", json=" + jsonCheckPointStr + ", resumeFromLineNumber="
+                  + resumeFromLineNumber + ", resume=" + resume);
+            }
+          } catch (EOFException eofEx) {
+            logger.info("EOFException. Will reset checkpoint file "
+                + checkPointFile.getAbsolutePath() + " for "
+                + getShortDescription());
+          }
+          if (jsonCheckPoint == null) {
+            // This seems to be first time, so creating the initial
+            // checkPoint object
+            jsonCheckPoint = new HashMap<String, Object>();
+            jsonCheckPoint.put("file_path", filePath);
+            jsonCheckPoint.put("file_key", fileBase64);
+          }
+
+        } catch (Throwable t) {
+          logger.error(
+              "Error while configuring checkpoint file. Will reset file. checkPointFile="
+                  + checkPointFile, t);
+        }
+      }
+
+      setClosed(false);
+      int sleepStep = 2;
+      int sleepIteration = 0;
+      while (true) {
+        try {
+          if (isDrain()) {
+            break;
+          }
+
+          String line = br.readLine();
+          if (line == null) {
+            if (!resume) {
+              resume = true;
+            }
+            sleepIteration++;
+            try {
+              // Since FileWatch service is not reliable, we will
+              // check
+              // file inode every n seconds after no write
+              if (sleepIteration > 4) {
+                Object newFileKey = getFileKey(logPathFile);
+                if (newFileKey != null) {
+                  if (fileKey == null || !newFileKey.equals(fileKey)) {
+                    logger
+                        .info("File key is different. Calling rollover. oldKey="
+                            + fileKey
+                            + ", newKey="
+                            + newFileKey
+                            + ". "
+                            + getShortDescription());
+                    // File has rotated.
+                    rollOver();
+                  }
+                }
+              }
+              // Flush on the second iteration
+              if (!tail && sleepIteration >= 2) {
+                logger.info("End of file. Done with filePath=" + logPathFile
+                    + ", lineCount=" + lineCount);
+                flush();
+                break;
+              } else if (sleepIteration == 2) {
+                flush();
+              } else if (sleepIteration >= 2) {
+                if (isRolledOver) {
+                  isRolledOver = false;
+                  // Close existing file
+                  try {
+                    logger
+                        .info("File is rolled over. Closing current open file."
+                            + getShortDescription() + ", lineCount="
+                            + lineCount);
+                    br.close();
+                  } catch (Exception ex) {
+                    logger.error("Error closing file" + getShortDescription());
+                    break;
+                  }
+                  try {
+                    // Open new file
+                    logger.info("Opening new rolled over file."
+                        + getShortDescription());
+                    br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
+                    lineCount = 0;
+                    fileKey = getFileKey(logPathFile);
+                    base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
+                        .getBytes());
+                    logger.info("fileKey=" + fileKey + ", base64="
+                        + base64FileKey + ", " + getShortDescription());
+                  } catch (Exception ex) {
+                    logger.error("Error opening rolled over file. "
+                        + getShortDescription());
+                    // Let's add this to monitoring and exit
+                    // this
+                    // thread
+                    logger.info("Added input to not ready list."
+                        + getShortDescription());
+                    isReady = false;
+                    inputMgr.addToNotReady(this);
+                    break;
+                  }
+                  logger.info("File is successfully rolled over. "
+                      + getShortDescription());
+                  continue;
+                }
+              }
+              Thread.sleep(sleepStep * 1000);
+              sleepStep = (sleepStep * 2);
+              sleepStep = sleepStep > 10 ? 10 : sleepStep;
+            } catch (InterruptedException e) {
+              logger.info("Thread interrupted." + getShortDescription());
+            }
+          } else {
+            lineCount++;
+            sleepStep = 1;
+            sleepIteration = 0;
+
+            if (!resume && lineCount > resumeFromLineNumber) {
+              logger.info("Resuming to read from last line. lineCount="
+                  + lineCount + ", input=" + getShortDescription());
+              resume = true;
+            }
+            if (resume) {
+              InputMarker marker = new InputMarker();
+              marker.fileKey = fileKey;
+              marker.base64FileKey = base64FileKey;
+              marker.filePath = filePath;
+              marker.input = this;
+              marker.lineNumber = lineCount;
+              outputLine(line, marker);
+            }
+          }
+        } catch (Throwable t) {
+          final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+              + "_READ_LOOP_EXCEPTION";
+          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+              "Caught exception in read loop. lineNumber=" + lineCount
+                  + ", input=" + getShortDescription(), t, logger, Level.ERROR);
+
+        }
+      }
+    } finally {
+      if (br != null) {
+        logger.info("Closing reader." + getShortDescription() + ", lineCount="
+            + lineCount);
+        try {
+          br.close();
+        } catch (Throwable t) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  /**
+   * @param s3FilePath
+   * @return
+   */
+  static public Object getFileKey(String s3FilePath) {
+    return s3FilePath.toString();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.input.Input#getShortDescription()
+   */
+  @Override
+  public String getShortDescription() {
+    return "input:source="
+        + getStringValue("source")
+        + ", path="
+        + (s3LogPathFiles != null && s3LogPathFiles.length > 0 ? s3LogPathFiles[0]
+            : getStringValue("path"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index c067680..99a2909 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -19,6 +19,7 @@
 
 package org.apache.ambari.logfeeder.output;
 
+import java.io.File;
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
@@ -27,7 +28,6 @@ import java.util.Map.Entry;
 import org.apache.ambari.logfeeder.ConfigBlock;
 import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.log4j.Logger;
 
@@ -57,9 +57,11 @@ public abstract class Output extends ConfigBlock {
     return super.getNameForThread();
   }
 
-  public void write(String block, InputMarker inputMarker) throws Exception {
-    // No-op. Please implement in sub classes
-  }
+  public abstract void write(String block, InputMarker inputMarker)
+      throws Exception;
+  
+  public abstract void copyFile(File inputFile, InputMarker inputMarker)
+      throws UnsupportedOperationException;
 
   /**
    * @param jsonObj

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
index b6188cb..7cfcb98 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
@@ -18,6 +18,8 @@
  */
 package org.apache.ambari.logfeeder.output;
 
+import java.io.File;
+
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.log4j.Logger;
 
@@ -30,8 +32,14 @@ public class OutputDevNull extends Output {
   private static Logger logger = Logger.getLogger(OutputDevNull.class);
 
   @Override
-  public void write(String block, InputMarker inputMarker) throws Exception {
+  public void write(String block, InputMarker inputMarker){
     // just ignore the logs
     logger.trace("Ignore log block: " + block);
   }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker) {
+    throw new UnsupportedOperationException(
+        "copyFile method is not yet supported for output=dev_null");
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index b6e36d6..4327f6f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -135,4 +135,11 @@ public class OutputFile extends Output {
     return "output:destination=file,path=" + filePath;
   }
 
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker)
+      throws UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "copyFile method is not yet supported for output=file");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index efbc366..120d071 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -19,6 +19,7 @@
 
 package org.apache.ambari.logfeeder.output;
 
+import java.io.File;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -283,4 +284,11 @@ public class OutputKafka extends Output {
       }
     }
   }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker)
+      throws UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "copyFile method is not yet supported for output=kafka");     
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
new file mode 100644
index 0000000..4cdf82d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.ambari.logfeeder.LogFeeder;
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.CompressionUtil;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * 
+ * Write log file into s3 bucket
+ */
+public class OutputS3File extends Output {
+
+
+  private static boolean uploadedGlobalConfig = false;
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker) {
+    String bucketName = getStringValue("s3_bucket");
+    String s3LogDir = getStringValue("s3_log_dir");
+    HashMap<String, String> contextParam = buildContextParam();
+    s3LogDir = PlaceholderUtil.replaceVariables(s3LogDir, contextParam);
+    String s3AccessKey = getStringValue("s3_access_key");
+    String s3SecretKey = getStringValue("s3_secret_key");
+    String compressionAlgo = getStringValue("compression_algo");
+    String fileName = inputFile.getName();
+    // create tmp compressed File
+    String tmpDir = LogFeederUtil.getLogfeederTempDir();
+    File outputFile = new File(tmpDir + fileName + "_"
+        + new Date().getTime() + "." + compressionAlgo);
+    outputFile = CompressionUtil.compressFile(inputFile, outputFile,
+        compressionAlgo);
+    String type = inputMarker.input.getStringValue("type");
+    String s3Path = s3LogDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + type
+        + S3Util.INSTANCE.S3_PATH_SEPARATOR + fileName + "."
+        + compressionAlgo;
+    S3Util.INSTANCE.uploadFileTos3(bucketName, s3Path, outputFile, s3AccessKey,
+        s3SecretKey);
+    // delete local compressed file
+    outputFile.deleteOnExit();
+    ArrayList<Map<String, Object>> filters = new ArrayList<Map<String, Object>>();
+    addFilters(filters, inputMarker.input.getFirstFilter());
+    Map<String, Object> inputConfig = new HashMap<String, Object>();
+    inputConfig.putAll(inputMarker.input.getConfigs());
+    String s3CompletePath = S3Util.INSTANCE.S3_PATH_START_WITH + bucketName
+        + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Path;
+    inputConfig.put("path", s3CompletePath);
+
+    ArrayList<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
+    inputConfigList.add(inputConfig);
+    // set source s3_file
+    // remove global config from filter config
+    removeGlobalConfig(inputConfigList);
+    removeGlobalConfig(filters);
+    // write config into s3 file
+    String s3Key = getComponentConfigFileName(type);
+    Map<String, Object> config = new HashMap<String, Object>();
+    config.put("filter", filters);
+    config.put("input", inputConfigList);
+    writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, contextParam,
+        s3Key);
+    // write global config
+    writeGlobalConfig();
+
+  }
+
+  /**
+   * 
+   * @param filters
+   * @param filter
+   */
+  public void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
+    if (filter != null) {
+      Map<String, Object> filterConfig = new HashMap<String, Object>();
+      filterConfig.putAll(filter.getConfigs());
+      filters.add(filterConfig);
+      if (filter.getNextFilter() != null) {
+        addFilters(filters, filter.getNextFilter());
+      }
+    }
+  }
+
+  /**
+   * 
+   * @param filters
+   * @param inputConfig
+   * @param bucketName
+   * @param componentName
+   */
+  public void writeConfigToS3(Map<String, Object> config, String bucketName,
+      String accessKey, String secretKey, HashMap<String, String> contextParam,
+      String s3Key) {
+    String s3ConfigDir = getStringValue("s3_config_dir");
+    s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam);
+    Gson gson = new GsonBuilder().setPrettyPrinting().create();
+    String configJson = gson.toJson(config);
+    // write json to s3 file
+    s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key;
+    S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey,
+        secretKey);
+  }
+
+  /**
+   * 
+   * @param componentName
+   * @return String
+   */
+  public String getComponentConfigFileName(String componentName) {
+    String fileName = "input.config-" + componentName + ".json";
+    return fileName;
+  }
+
+  public HashMap<String, String> buildContextParam() {
+    HashMap<String, String> contextParam = new HashMap<String, String>();
+    contextParam.put("host", LogFeederUtil.hostName);
+    contextParam.put("ip", LogFeederUtil.ipAddress);
+    String cluster = getNVList("add_fields").get("cluster");
+    contextParam.put("cluster", cluster);
+    return contextParam;
+  }
+
+  
+  private Map<String, Object> getGlobalConfig() {
+    Map<String, Object> globalConfig = LogFeeder.globalMap;
+    if (globalConfig == null) {
+      globalConfig = new HashMap<String, Object>();
+    }
+    return globalConfig;
+  }
+
+  private void removeGlobalConfig(List<Map<String, Object>> configList) {
+    Map<String, Object> globalConfig = getGlobalConfig();
+    if (configList != null && globalConfig != null) {
+      for (Entry<String, Object> globalConfigEntry : globalConfig.entrySet()) {
+        if (globalConfigEntry != null) {
+          String globalKey = globalConfigEntry.getKey();
+          if (globalKey != null && !globalKey.trim().isEmpty()) {
+            for (Map<String, Object> config : configList) {
+              if (config != null) {
+                config.remove(globalKey);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * write global config in s3 file Invoke only once
+   */
+  @SuppressWarnings("unchecked")
+  private synchronized void writeGlobalConfig() {
+    if (!uploadedGlobalConfig) {
+      Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig());
+      //updating global config before write to s3
+      globalConfig.put("source", "s3_file");
+      globalConfig.put("copy_file", false);
+      globalConfig.put("process_file", true);
+      globalConfig.put("tail", false);
+      Map<String, Object> addFields = (Map<String, Object>) globalConfig
+          .get("add_fields");
+      if (addFields == null) {
+        addFields = new HashMap<String, Object>();
+      }
+      addFields.put("ip", LogFeederUtil.ipAddress);
+      addFields.put("host", LogFeederUtil.hostName);
+      // add bundle id same as cluster if its not there
+      String bundle_id = (String) addFields.get("bundle_id");
+      if (bundle_id == null || bundle_id.isEmpty()) {
+        String cluster = (String) addFields.get("cluster");
+        if (cluster != null && !cluster.isEmpty()) {
+          addFields.put("bundle_id", bundle_id);
+        }
+      }
+      globalConfig.put("add_fields", addFields);
+      Map<String, Object> config = new HashMap<String, Object>();
+      config.put("global", globalConfig);
+      String s3AccessKey = getStringValue("s3_access_key");
+      String s3SecretKey = getStringValue("s3_secret_key");
+      String bucketName = getStringValue("s3_bucket");
+      String s3Key = "global.config.json";
+      HashMap<String, String> contextParam = buildContextParam();
+      writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey,
+          contextParam, s3Key);
+      uploadedGlobalConfig = true;
+    }
+  }
+
+  @Override
+  public void write(String block, InputMarker inputMarker) throws Exception {
+    throw new UnsupportedOperationException(
+        "write method is not yet supported for output=s3_file");    
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 14b2093..6e3248b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -19,6 +19,7 @@
 
 package org.apache.ambari.logfeeder.output;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.ArrayList;
@@ -454,4 +455,17 @@ public class OutputSolr extends Output {
       return localBuffer.isEmpty();
     }
   }
+
+  @Override
+  public void write(String block, InputMarker inputMarker) throws Exception {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker)
+      throws UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "copyFile method is not yet supported for output=solr");     
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
new file mode 100644
index 0000000..050b69b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
+
+public enum AWSUtil {
+  INSTANCE;
+  private static final Logger LOG = Logger.getLogger(AWSUtil.class);
+
+  /**
+   * Get aws username
+   * 
+   * @param accessKey
+   * @param secretKey
+   * @return String
+   */
+  public String getAwsUserName(String accessKey, String secretKey) {
+    String username = null;
+    AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
+    AmazonIdentityManagementClient amazonIdentityManagementClient;
+    if (awsCredentials != null) {
+      amazonIdentityManagementClient = new AmazonIdentityManagementClient(
+          awsCredentials);
+    } else {
+      // create default client
+      amazonIdentityManagementClient = new AmazonIdentityManagementClient();
+    }
+    try {
+      username = amazonIdentityManagementClient.getUser().getUser()
+          .getUserName();
+    } catch (AmazonServiceException e) {
+      if (e.getErrorCode().compareTo("AccessDenied") == 0) {
+        String arn = null;
+        String msg = e.getMessage();
+        int arnIdx = msg.indexOf("arn:aws");
+        if (arnIdx != -1) {
+          int arnSpace = msg.indexOf(" ", arnIdx);
+          // should be similar to "arn:aws:iam::111111111111:user/username"
+          arn = msg.substring(arnIdx, arnSpace);
+        }
+        if (arn != null) {
+          String[] arnParts = arn.split(":");
+          if (arnParts != null && arnParts.length > 5) {
+            username = arnParts[5];
+            if (username != null) {
+              username = username.replace("user/", "");
+            }
+          }
+        }
+      }
+    } catch (Exception exception) {
+      LOG.error(
+          "Error in getting username :" + exception.getLocalizedMessage(),
+          exception.getCause());
+    }
+    return username;
+  }
+
+  public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
+    if (accessKey != null && secretKey != null) {
+      LOG.debug("Creating aws client as per new accesskey and secretkey");
+      AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
+          secretKey);
+      return awsCredentials;
+    } else {
+      // retrun null
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
new file mode 100644
index 0000000..f49837c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.services.s3.transfer.model.UploadResult;
+
+/**
+ * Utility to connect to s3
+ *
+ */
+public enum S3Util {
+  INSTANCE;
+
+  private static final Logger LOG = Logger.getLogger(S3Util.class);
+
+  public final String S3_PATH_START_WITH = "s3://";
+  public final String S3_PATH_SEPARATOR = "/";
+
+  /**
+   * get s3 client
+   * 
+   * @return AmazonS3
+   */
+  public AmazonS3 getS3Client(String accessKey, String secretKey) {
+    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+        accessKey, secretKey);
+    AmazonS3 s3client;
+    if (awsCredentials != null) {
+      s3client = new AmazonS3Client(awsCredentials);
+    } else {
+      s3client = new AmazonS3Client();
+    }
+    return s3client;
+  }
+
+  /**
+   * 
+   * @return TransferManager
+   */
+  public TransferManager getTransferManager(String accessKey, String secretKey) {
+    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+        accessKey, secretKey);
+    TransferManager transferManager;
+    if (awsCredentials != null) {
+      transferManager = new TransferManager(awsCredentials);
+    } else {
+      transferManager = new TransferManager();
+    }
+    return transferManager;
+  }
+
+  /**
+   * shutdown s3 transfer manager
+   */
+  public void shutdownTransferManager(TransferManager transferManager) {
+    if (transferManager != null) {
+      transferManager.shutdownNow();
+    }
+  }
+
+  /**
+   * Extract bucket name from s3 file complete path
+   * 
+   * @param s3Path
+   * @return String
+   */
+  public String getBucketName(String s3Path) {
+    String bucketName = null;
+    // s3path
+    if (s3Path != null) {
+      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+          S3_PATH_SEPARATOR);
+      bucketName = s3PathParts[0];
+    }
+    return bucketName;
+  }
+
+  /**
+   * get s3 key from s3Path after removing bucketname
+   * 
+   * @param s3Path
+   * @return String
+   */
+  public String getS3Key(String s3Path) {
+    StringBuilder s3Key = new StringBuilder();
+    // s3path
+    if (s3Path != null) {
+      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+          S3_PATH_SEPARATOR);
+      ArrayList<String> s3PathList = new ArrayList<String>(
+          Arrays.asList(s3PathParts));
+      s3PathList.remove(0);// remove bucketName
+      for (int index = 0; index < s3PathList.size(); index++) {
+        if (index > 0) {
+          s3Key.append(S3_PATH_SEPARATOR);
+        }
+        s3Key.append(s3PathList.get(index));
+      }
+    }
+    return s3Key.toString();
+  }
+
+  /**
+   * 
+   * @param bucketName
+   * @param s3Key
+   * @param localFile
+   */
+  public void uploadFileTos3(String bucketName, String s3Key, File localFile,
+      String accessKey, String secretKey) {
+    TransferManager transferManager = getTransferManager(accessKey, secretKey);
+    try {
+      Upload upload = transferManager.upload(bucketName, s3Key, localFile);
+      UploadResult uploadResult = upload.waitForUploadResult();
+    } catch (AmazonClientException | InterruptedException e) {
+      LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
+          e);
+    } finally {
+      shutdownTransferManager(transferManager);
+    }
+  }
+
+  /**
+   * Get the buffer reader to read s3 file as a stream
+   * 
+   * @param s3Path
+   * @return BufferedReader
+   * @throws IOException
+   */
+  public BufferedReader getReader(String s3Path, String accessKey,
+      String secretKey) throws IOException {
+    // TODO error handling
+    // Compression support
+    // read header and decide the compression(auto detection)
+    // For now hard-code GZIP compression
+    String s3Bucket = getBucketName(s3Path);
+    String s3Key = getS3Key(s3Path);
+    S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
+        new GetObjectRequest(s3Bucket, s3Key));
+    GZIPInputStream objectInputStream;
+    try {
+      objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
+      BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
+          objectInputStream));
+      return bufferedReader;
+    } catch (IOException e) {
+      LOG.error("Error in creating stream reader for s3 file :" + s3Path,
+          e.getCause());
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   * @param data
+   * @param bucketName
+   * @param s3Key
+   */
+  public void writeIntoS3File(String data, String bucketName, String s3Key,
+      String accessKey, String secretKey) {
+    InputStream in = null;
+    try {
+      in = IOUtils.toInputStream(data, "UTF-8");
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    if (in != null) {
+      TransferManager transferManager = getTransferManager(accessKey, secretKey);
+      try {
+        if (transferManager != null) {
+          UploadResult uploadResult = transferManager
+              .upload(
+                  new PutObjectRequest(bucketName, s3Key, in,
+                      new ObjectMetadata())).waitForUploadResult();
+          LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
+              + bucketName);
+        }
+      } catch (AmazonClientException | InterruptedException e) {
+        LOG.error(e);
+      } finally {
+        try {
+          shutdownTransferManager(transferManager);
+          in.close();
+        } catch (IOException e) {
+          // ignore
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
new file mode 100644
index 0000000..54008ec
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.log4j.Logger;
+
+public class CompressionUtil {
+
+  private static final Logger LOG = Logger.getLogger(CompressionUtil.class);
+
+  /**
+   * Compress file
+   * 
+   * @param inputFile
+   * @param outputFile
+   * @param algoName 
+   */
+  public static File compressFile(File inputFile, File outputFile, String algoName) {
+    CompressorOutputStream cos = null;
+    FileInputStream ios = null;
+    try {
+      if (!inputFile.exists()) {
+        throw new IllegalArgumentException("Input File:"
+            + inputFile.getAbsolutePath() + " is not exist.");
+      }
+      if (inputFile.isDirectory()) {
+        throw new IllegalArgumentException("Input File:"
+            + inputFile.getAbsolutePath() + " is a directory.");
+      }
+      File parent = outputFile.getParentFile();
+      if (parent != null && !parent.exists()) {
+        boolean isParentCreated = parent.mkdirs();
+        if (!isParentCreated) {
+          throw new IllegalAccessException(
+              "User does not have permission to create parent directory :"
+                  + parent.getAbsolutePath());
+        }
+      }
+      final OutputStream out = new FileOutputStream(outputFile);
+      cos = new CompressorStreamFactory().createCompressorOutputStream(
+          algoName, out);
+      ios = new FileInputStream(inputFile);
+      IOUtils.copy(ios, cos);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      // Close the stream
+      if (cos != null) {
+        try {
+          cos.close();
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+      if (ios != null) {
+        try {
+          ios.close();
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+    }
+    return outputFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
new file mode 100644
index 0000000..ec26a88
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.util;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+public class FileUtil {
+  private static final Logger logger = Logger.getLogger(FileUtil.class);
+
+  public static List<File> getAllFileFromDir(File directory,
+      String[] searchFileWithExtensions, boolean checkInSubDir) {
+    if (!directory.exists()) {
+      logger.error(directory.getAbsolutePath() + " is not exists ");
+    } else if (directory.isDirectory()) {
+      return (List<File>) FileUtils.listFiles(directory,
+          searchFileWithExtensions, checkInSubDir);
+    } else {
+      logger.error(directory.getAbsolutePath() + " is not Directory ");
+    }
+    return new ArrayList<File>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
new file mode 100644
index 0000000..9be85ee
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+public class PlaceholderUtil {
+
+  private static Logger LOG = Logger.getLogger(PlaceholderUtil.class);
+
+  private static Pattern placeHolderPattern;
+  static {
+//    placeHolderPattern = Pattern.compile("\\{(.*?)\\}");
+    placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)");
+  }
+
+  /**
+   * 
+   * @param inputStr
+   * @param contextParam
+   * @return String
+   */
+  public static String replaceVariables(String inputStr,
+      HashMap<String, String> contextParam) {
+    Matcher m = placeHolderPattern.matcher(inputStr);
+    String placeholder;
+    String replacement;
+    String output = new String(inputStr);
+    while (m.find()) {
+      placeholder = m.group();
+      if (placeholder != null && !placeholder.isEmpty()) {
+        String key = placeholder.replace("$","").toLowerCase();// remove
+                                                                   // brace
+        replacement = getFromContext(contextParam, placeholder, key);
+        output = output.replace(placeholder, replacement);
+      }
+    }
+    return output;
+  }
+
+  /**
+   * 
+   * @param contextParam
+   * @param defaultValue
+   * @param key
+   * @return String
+   */
+  private static String getFromContext(HashMap<String, String> contextParam,
+      String defaultValue, String key) {
+    String returnValue = defaultValue;// by default set default value as a
+                                      // return
+    if (contextParam != null) {
+      String value = contextParam.get(key);
+      if (value != null && !value.trim().isEmpty()) {
+        returnValue = value;
+      }
+    }
+    return returnValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index a55b348..58bcdae 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -2,6 +2,9 @@
 	"input": {
 		"file": {
 			"klass": "org.apache.ambari.logfeeder.input.InputFile"
+		},
+		"s3_file": {
+			"klass": "org.apache.ambari.logfeeder.input.InputS3File"
 		}
 
 	},
@@ -40,6 +43,9 @@
 		},
 		"dev_null": {
 			"klass": "org.apache.ambari.logfeeder.output.OutputDevNull"
-		}
+		},
+		"s3_file": {
+			"klass": "org.apache.ambari.logfeeder.output.OutputS3File"
+ 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 076c09c..fc72ce0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -22,3 +22,6 @@ logfeeder.solr.config.interval=5
 logfeeder.solr.core.history=history
 logfeeder.solr.zkhosts=
 logfeeder.solr.url=
+
+#logfeeder tmp dir 
+logfeeder.tmp.dir=/tmp/$username/logfeeder/

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
new file mode 100644
index 0000000..1e2be37
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+public class AWSUtilTest {
+//  @Test
+  public void testAWSUtil_getAwsUserName() throws Exception {
+    String S3_ACCESS_KEY = "S3_ACCESS_KEY";
+    String S3_SECRET_KEY = "S3_SECRET_KEY";
+    String expectedUsername = "";
+    String username = AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY,
+        S3_SECRET_KEY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/450eb3e4/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
new file mode 100644
index 0000000..d07ae2b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class S3UtilTest {
+  private static final Logger LOG = Logger.getLogger(S3UtilTest.class);
+
+  // @Test
+  public void testS3Util_pathToBucketName() throws Exception {
+    String s3Path = "s3://bucket_name/path/file.txt";
+    String expectedBucketName = "bucket_name";
+    String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path);
+    assertEquals(expectedBucketName, actualBucketName);
+  }
+
+  // @Test
+  public void testS3Util_pathToS3Key() throws Exception {
+    String s3Path = "s3://bucket_name/path/file.txt";
+    String expectedS3key = "path/file.txt";
+    String actualS3key = S3Util.INSTANCE.getS3Key(s3Path);
+    assertEquals(expectedS3key, actualS3key);
+  }
+
+}