You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/28 09:41:29 UTC

[02/52] [abbrv] ambari git commit: AMBARI-18236. Fix package structure in Logfeeder (Miklos Gergely via oleewere)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java
deleted file mode 100644
index c22b512..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.exception;
-
-public class LogfeederException extends Exception {
-
-  public LogfeederException(String message, Throwable throwable) {
-    super(message, throwable);
-  }
-
-  public LogfeederException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index 01d4f79..ab371f1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -24,17 +24,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.AliasUtil;
-import org.apache.ambari.logfeeder.ConfigBlock;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.OutputMgr;
-import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
-import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.util.AliasUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
+import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 7aa649d..372c208 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -34,10 +34,10 @@ import java.util.regex.Pattern;
 import oi.thekraken.grok.api.Grok;
 import oi.thekraken.grok.api.exception.GrokException;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index f375374..2954106 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -20,9 +20,9 @@ package org.apache.ambari.logfeeder.filter;
 
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
 public class FilterJSON extends Filter {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index 1b8b3a3..7adb468 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -23,10 +23,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 76af16c..5feb9c4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -26,13 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.ConfigBlock;
-import org.apache.ambari.logfeeder.InputMgr;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.OutputMgr;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
 import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.output.OutputMgr;
 import org.apache.log4j.Logger;
 
 public abstract class Input extends ConfigBlock implements Runnable {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 9d3545e..c9f5ded 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -33,8 +33,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
new file mode 100644
index 0000000..b18c9b0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputMgr {
+  private static final Logger logger = Logger.getLogger(InputMgr.class);
+
+  private List<Input> inputList = new ArrayList<Input>();
+  private Set<Input> notReadyList = new HashSet<Input>();
+
+  private boolean isDrain = false;
+  private boolean isAnyInputTail = false;
+
+  private String checkPointSubFolderName = "logfeeder_checkpoints";
+  private File checkPointFolderFile = null;
+
+  private MetricCount filesCountMetric = new MetricCount();
+
+  private String checkPointExtension = ".cp";
+  
+  private Thread inputIsReadyMonitor = null;
+
+  public List<Input> getInputList() {
+    return inputList;
+  }
+
+  public void add(Input input) {
+    inputList.add(input);
+  }
+
+  public void removeInput(Input input) {
+    logger.info("Trying to remove from inputList. "
+      + input.getShortDescription());
+    Iterator<Input> iter = inputList.iterator();
+    while (iter.hasNext()) {
+      Input iterInput = iter.next();
+      if (iterInput.equals(input)) {
+        logger.info("Removing Input from inputList. "
+          + input.getShortDescription());
+        iter.remove();
+      }
+    }
+  }
+
+  public int getActiveFilesCount() {
+    int count = 0;
+    for (Input input : inputList) {
+      if (input.isReady()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public void init() {
+    filesCountMetric.metricsName = "input.files.count";
+    filesCountMetric.isPointInTime = true;
+
+    checkPointExtension = LogFeederUtil.getStringProperty(
+      "logfeeder.checkpoint.extension", checkPointExtension);
+    for (Input input : inputList) {
+      try {
+        input.init();
+        if (input.isTail()) {
+          isAnyInputTail = true;
+        }
+      } catch (Exception e) {
+        logger.error(
+          "Error initializing input. "
+            + input.getShortDescription(), e);
+      }
+    }
+
+    if (isAnyInputTail) {
+      logger.info("Determining valid checkpoint folder");
+      boolean isCheckPointFolderValid = false;
+      // We need to keep track of the files we are reading.
+      String checkPointFolder = LogFeederUtil
+        .getStringProperty("logfeeder.checkpoint.folder");
+      if (checkPointFolder != null && !checkPointFolder.isEmpty()) {
+        checkPointFolderFile = new File(checkPointFolder);
+        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+      }
+      if (!isCheckPointFolderValid) {
+        // Let's try home folder
+        String userHome = LogFeederUtil.getStringProperty("user.home");
+        if (userHome != null) {
+          checkPointFolderFile = new File(userHome,
+            checkPointSubFolderName);
+          logger.info("Checking if home folder can be used for checkpoints. Folder="
+            + checkPointFolderFile);
+          isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+        }
+      }
+      if (!isCheckPointFolderValid) {
+        // Let's use tmp folder
+        String tmpFolder = LogFeederUtil
+          .getStringProperty("java.io.tmpdir");
+        if (tmpFolder == null) {
+          tmpFolder = "/tmp";
+        }
+        checkPointFolderFile = new File(tmpFolder,
+          checkPointSubFolderName);
+        logger.info("Checking if tmps folder can be used for checkpoints. Folder="
+          + checkPointFolderFile);
+        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+        if (isCheckPointFolderValid) {
+          logger.warn("Using tmp folder "
+            + checkPointFolderFile
+            + " to store check points. This is not recommended."
+            + "Please set logfeeder.checkpoint.folder property");
+        }
+      }
+
+      if (isCheckPointFolderValid) {
+        logger.info("Using folder " + checkPointFolderFile
+          + " for storing checkpoints");
+      }
+    }
+
+  }
+
+  public File getCheckPointFolderFile() {
+    return checkPointFolderFile;
+  }
+
+  private boolean verifyCheckPointFolder(File folderPathFile) {
+    if (!folderPathFile.exists()) {
+      // Create the folder
+      try {
+        if (!folderPathFile.mkdir()) {
+          logger.warn("Error creating folder for check point. folder="
+            + folderPathFile);
+        }
+      } catch (Throwable t) {
+        logger.warn("Error creating folder for check point. folder="
+          + folderPathFile, t);
+      }
+    }
+
+    if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+      // Let's check whether we can create a file
+      File testFile = new File(folderPathFile, UUID.randomUUID()
+        .toString());
+      try {
+        testFile.createNewFile();
+        return testFile.delete();
+      } catch (IOException e) {
+        logger.warn(
+          "Couldn't create test file in "
+            + folderPathFile.getAbsolutePath()
+            + " for checkPoint", e);
+      }
+    }
+    return false;
+  }
+
+  public void monitor() {
+    for (Input input : inputList) {
+      if (input.isReady()) {
+        input.monitor();
+      } else {
+        if (input.isTail()) {
+          logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. "
+            + input.getShortDescription());
+          notReadyList.add(input);
+        } else {
+          logger.info("Input is not ready, so going to ignore it "
+            + input.getShortDescription());
+        }
+      }
+    }
+    // Start the monitoring thread if any file is in tail mode
+    if (isAnyInputTail) {
+       inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
+        @Override
+        public void run() {
+          logger.info("Going to monitor for these missing files: "
+            + notReadyList.toString());
+          while (true) {
+            if (isDrain) {
+              logger.info("Exiting missing file monitor.");
+              break;
+            }
+            try {
+              Iterator<Input> iter = notReadyList.iterator();
+              while (iter.hasNext()) {
+                Input input = iter.next();
+                try {
+                  if (input.isReady()) {
+                    input.monitor();
+                    iter.remove();
+                  }
+                } catch (Throwable t) {
+                  logger.error("Error while enabling monitoring for input. "
+                    + input.getShortDescription());
+                }
+              }
+              Thread.sleep(30 * 1000);
+            } catch (Throwable t) {
+              // Ignore
+            }
+          }
+        }
+      };
+      inputIsReadyMonitor.start();
+    }
+  }
+
+  public void addToNotReady(Input notReadyInput) {
+    notReadyList.add(notReadyInput);
+  }
+
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    for (Input input : inputList) {
+      input.addMetricsContainers(metricsList);
+    }
+    filesCountMetric.count = getActiveFilesCount();
+    metricsList.add(filesCountMetric);
+  }
+
+  public void logStats() {
+    for (Input input : inputList) {
+      input.logStat();
+    }
+
+    filesCountMetric.count = getActiveFilesCount();
+    LogFeederUtil.logStatForMetric(filesCountMetric,
+      "Stat: Files Monitored Count", null);
+  }
+
+  public void close() {
+    for (Input input : inputList) {
+      try {
+        input.setDrain(true);
+      } catch (Throwable t) {
+        logger.error(
+          "Error while draining. input="
+            + input.getShortDescription(), t);
+      }
+    }
+    isDrain = true;
+
+    // Need to get this value from property
+    int iterations = 30;
+    int waitTimeMS = 1000;
+    int i = 0;
+    boolean allClosed = true;
+    for (i = 0; i < iterations; i++) {
+      allClosed = true;
+      for (Input input : inputList) {
+        if (!input.isClosed()) {
+          try {
+            allClosed = false;
+            logger.warn("Waiting for input to close. "
+              + input.getShortDescription() + ", "
+              + (iterations - i) + " more seconds");
+            Thread.sleep(waitTimeMS);
+          } catch (Throwable t) {
+            // Ignore
+          }
+        }
+      }
+      if (allClosed) {
+        break;
+      }
+    }
+    if (!allClosed) {
+      logger.warn("Some inputs were not closed. Iterations=" + i);
+      for (Input input : inputList) {
+        if (!input.isClosed()) {
+          logger.warn("Input not closed. Will ignore it."
+            + input.getShortDescription());
+        }
+      }
+    } else {
+      logger.info("All inputs are closed. Iterations=" + i);
+    }
+
+  }
+
+  public void checkInAll() {
+    for (Input input : inputList) {
+      input.checkIn();
+    }
+  }
+
+  public void cleanCheckPointFiles() {
+
+    if (checkPointFolderFile == null) {
+      logger.info("Will not clean checkPoint files. checkPointFolderFile="
+        + checkPointFolderFile);
+      return;
+    }
+    logger.info("Cleaning checkPoint files. checkPointFolderFile="
+      + checkPointFolderFile.getAbsolutePath());
+    try {
+      // Loop over the check point files and if filePath is not present, then move to closed
+      String searchPath = "*" + checkPointExtension;
+      FileFilter fileFilter = new WildcardFileFilter(searchPath);
+      File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
+      int totalCheckFilesDeleted = 0;
+      for (File checkPointFile : checkPointFiles) {
+        RandomAccessFile checkPointReader = null;
+        try {
+          checkPointReader = new RandomAccessFile(checkPointFile, "r");
+
+          int contentSize = checkPointReader.readInt();
+          byte b[] = new byte[contentSize];
+          int readSize = checkPointReader.read(b, 0, contentSize);
+          if (readSize != contentSize) {
+            logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
+              + contentSize
+              + ", read="
+              + readSize
+              + ", checkPointFile=" + checkPointFile);
+          } else {
+            // Create JSON string
+            String jsonCheckPointStr = new String(b, 0, readSize);
+            Map<String, Object> jsonCheckPoint = LogFeederUtil
+              .toJSONObject(jsonCheckPointStr);
+
+            String logFilePath = (String) jsonCheckPoint
+              .get("file_path");
+            String logFileKey = (String) jsonCheckPoint
+              .get("file_key");
+            if (logFilePath != null && logFileKey != null) {
+              boolean deleteCheckPointFile = false;
+              File logFile = new File(logFilePath);
+              if (logFile.exists()) {
+                Object fileKeyObj = InputFile
+                  .getFileKey(logFile);
+                String fileBase64 = Base64
+                  .byteArrayToBase64(fileKeyObj
+                    .toString().getBytes());
+                if (!logFileKey.equals(fileBase64)) {
+                  deleteCheckPointFile = true;
+                  logger.info("CheckPoint clean: File key has changed. old="
+                    + logFileKey
+                    + ", new="
+                    + fileBase64
+                    + ", filePath="
+                    + logFilePath
+                    + ", checkPointFile="
+                    + checkPointFile.getAbsolutePath());
+                }
+              } else {
+                logger.info("CheckPoint clean: Log file doesn't exist. filePath="
+                  + logFilePath
+                  + ", checkPointFile="
+                  + checkPointFile.getAbsolutePath());
+                deleteCheckPointFile = true;
+              }
+              if (deleteCheckPointFile) {
+                logger.info("Deleting CheckPoint file="
+                  + checkPointFile.getAbsolutePath()
+                  + ", logFile=" + logFilePath);
+                checkPointFile.delete();
+                totalCheckFilesDeleted++;
+              }
+            }
+          }
+        } catch (EOFException eof) {
+          logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. "
+            + checkPointFile);
+        } catch (Throwable t) {
+          logger.error("Error while checking checkPoint file. "
+            + checkPointFile, t);
+        } finally {
+          if (checkPointReader != null) {
+            try {
+              checkPointReader.close();
+            } catch (Throwable t) {
+              logger.error("Error closing checkPoint file. "
+                + checkPointFile, t);
+            }
+          }
+        }
+      }
+      logger.info("Deleted " + totalCheckFilesDeleted
+        + " checkPoint file(s). checkPointFolderFile="
+        + checkPointFolderFile.getAbsolutePath());
+
+    } catch (Throwable t) {
+      logger.error("Error while cleaning checkPointFiles", t);
+    }
+  }
+
+  public void waitOnAllInputs() {
+    //wait on inputs
+    if (inputList != null) {
+      for (Input input : inputList) {
+        if (input != null) {
+          Thread inputThread = input.getThread();
+          if (inputThread != null) {
+            try {
+              inputThread.join();
+            } catch (InterruptedException e) {
+              // ignore
+            }
+          }
+        }
+      }
+    }
+    // wait on monitor
+    if (inputIsReadyMonitor != null) {
+      try {
+        this.close();
+        inputIsReadyMonitor.join();
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 12a512f..c9d28bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -28,8 +28,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 48ad7ac..5ba56a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -27,9 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.filter.FilterJSON;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.common.util.Base64;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
index 872460b..ae0cfc0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.TimeZone;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.SolrUtil;
 import org.apache.ambari.logfeeder.view.VLogfeederFilter;
 import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
index 128c5c4..bc807193 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
@@ -22,7 +22,7 @@ package org.apache.ambari.logfeeder.logconfig;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
 public enum LogfeederScheduler {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
index 8691a19..b5e4eb3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
@@ -22,9 +22,9 @@ package org.apache.ambari.logfeeder.logconfig.filter;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
 import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.view.VLogfeederFilter;
 import org.apache.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
index bf33f93..3a8eae9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
@@ -21,8 +21,8 @@ package org.apache.ambari.logfeeder.logconfig.filter;
 
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
 /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 45ccc70..9aa0b23 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index e1f8f97..c692a9d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.mapper;
 
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 7e530f5..e618261 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.mapper;
 
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
new file mode 100644
index 0000000..c99a091
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
+  private static final Logger logger = Logger.getLogger(LogFeederAMSClient.class);
+
+  private String collectorHosts = null;
+
+  public LogFeederAMSClient() {
+    collectorHosts = LogFeederUtil
+      .getStringProperty("logfeeder.metrics.collector.hosts");
+    if (collectorHosts != null && collectorHosts.trim().length() == 0) {
+      collectorHosts = null;
+    }
+    if (collectorHosts != null) {
+      collectorHosts = collectorHosts.trim();
+    }
+    logger.info("AMS collector URL=" + collectorHosts);
+  }
+
+  @Override
+  public String getCollectorUri() {
+    return collectorHosts;
+  }
+
+  @Override
+  protected int getTimeoutSeconds() {
+    // TODO: Hard coded timeout
+    return 10;
+  }
+
+  @Override
+  protected boolean emitMetrics(TimelineMetrics metrics) {
+    return super.emitMetrics(metrics);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
new file mode 100644
index 0000000..abb84c7
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+public class MetricCount {
+  public String metricsName = null;
+  public boolean isPointInTime = false;
+
+  public long count = 0;
+  public long prevLogCount = 0;
+  public long prevLogMS = System.currentTimeMillis();
+  public long prevPublishCount = 0;
+  public int publishCount = 0; // Count of published metrics. Used for first time sending metrics
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
new file mode 100644
index 0000000..eff9d0d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class MetricsMgr {
+  private static final Logger logger = Logger.getLogger(MetricsMgr.class);
+
+  private boolean isMetricsEnabled = false;
+  private String nodeHostName = null;
+  private String appId = "logfeeder";
+
+  private long lastPublishTimeMS = 0; // Let's do the first publish immediately
+  private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock
+
+  private int publishIntervalMS = 60 * 1000;
+  private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep
+  // the metrics in memory forever
+  private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
+  private LogFeederAMSClient amsClient = null;
+
+  public void init() {
+    logger.info("Initializing MetricsMgr()");
+    amsClient = new LogFeederAMSClient();
+
+    if (amsClient.getCollectorUri() != null) {
+      nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
+      if (nodeHostName == null) {
+        try {
+          nodeHostName = InetAddress.getLocalHost().getHostName();
+        } catch (Throwable e) {
+          logger.warn(
+            "Error getting hostname using InetAddress.getLocalHost().getHostName()",
+            e);
+        }
+        if (nodeHostName == null) {
+          try {
+            nodeHostName = InetAddress.getLocalHost()
+              .getCanonicalHostName();
+          } catch (Throwable e) {
+            logger.warn(
+              "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()",
+              e);
+          }
+        }
+      }
+      if (nodeHostName == null) {
+        isMetricsEnabled = false;
+        logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
+      } else {
+        isMetricsEnabled = true;
+        logger.info("LogFeeder Metrics is enabled. Metrics host="
+          + amsClient.getCollectorUri());
+      }
+    } else {
+      logger.info("LogFeeder Metrics publish is disabled");
+    }
+  }
+
+  public boolean isMetricsEnabled() {
+    return isMetricsEnabled;
+  }
+
+  synchronized public void useMetrics(List<MetricCount> metricsList) {
+    if (!isMetricsEnabled) {
+      return;
+    }
+    logger.info("useMetrics() metrics.size=" + metricsList.size());
+    long currMS = System.currentTimeMillis();
+    Long currMSLong = new Long(currMS);
+    for (MetricCount metric : metricsList) {
+      if (metric.metricsName == null) {
+        logger.debug("metric.metricsName is null");
+        // Metrics is not meant to be published
+        continue;
+      }
+      long currCount = metric.count;
+      if (!metric.isPointInTime && metric.publishCount > 0
+        && currCount <= metric.prevPublishCount) {
+        // No new data added, so let's ignore it
+        logger.debug("Nothing changed. " + metric.metricsName
+          + ", currCount=" + currCount + ", prevPublishCount="
+          + metric.prevPublishCount);
+        continue;
+      }
+      metric.publishCount++;
+
+      TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
+      if (timelineMetric == null) {
+        logger.debug("Creating new metric obbject for "
+          + metric.metricsName);
+        // First time for this metric
+        timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metric.metricsName);
+        timelineMetric.setHostName(nodeHostName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setStartTime(currMS);
+        timelineMetric.setType("Long");
+        timelineMetric.setMetricValues(new TreeMap<Long, Double>());
+
+        metricsMap.put(metric.metricsName, timelineMetric);
+      }
+      logger.debug("Adding metrics=" + metric.metricsName);
+      if (metric.isPointInTime) {
+        timelineMetric.getMetricValues().put(currMSLong,
+          new Double(currCount));
+      } else {
+        Double value = timelineMetric.getMetricValues().get(currMSLong);
+        if (value == null) {
+          value = new Double(0);
+        }
+        value += (currCount - metric.prevPublishCount);
+        timelineMetric.getMetricValues().put(currMSLong, value);
+        metric.prevPublishCount = currCount;
+      }
+    }
+
+    if (metricsMap.size() > 0
+      && currMS - lastPublishTimeMS > publishIntervalMS) {
+      try {
+        // Time to publish
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>();
+        timeLineMetricList.addAll(metricsMap.values());
+        timelineMetrics.setMetrics(timeLineMetricList);
+        amsClient.emitMetrics(timelineMetrics);
+        logger.info("Published " + timeLineMetricList.size()
+          + " metrics to AMS");
+        metricsMap.clear();
+        timeLineMetricList.clear();
+        lastPublishTimeMS = currMS;
+      } catch (Throwable t) {
+        logger.warn("Error sending metrics to AMS.", t);
+        if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
+          logger.error("AMS was not sent for last "
+            + maxMetricsBuffer
+            / 1000
+            + " seconds. Purging it and will start rebuilding it again");
+          metricsMap.clear();
+          lastFailedPublishTimeMS = currMS;
+        }
+      }
+    } else {
+      logger.info("Not publishing metrics. metrics.size()="
+        + metricsMap.size() + ", lastPublished="
+        + (currMS - lastPublishTimeMS) / 1000
+        + " seconds ago, intervalConfigured=" + publishIntervalMS
+        / 1000);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index a4e0eda..6f84251 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -24,10 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.ambari.logfeeder.ConfigBlock;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.common.ConfigBlock;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
 public abstract class Output extends ConfigBlock {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index aef8dc5..18a5a54 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -26,8 +26,8 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index f711a5f..a360215 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -19,12 +19,12 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.output.spool.LogSpooler;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
 import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
 import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
 import org.apache.ambari.logfeeder.util.PlaceholderUtil;
 import org.apache.commons.lang3.StringUtils;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index a7f2321..2595d87 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -25,8 +25,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedTransferQueue;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
new file mode 100644
index 0000000..0a6b7fa
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class OutputMgr {
+  private static final Logger logger = Logger.getLogger(OutputMgr.class);
+
+  private Collection<Output> outputList = new ArrayList<Output>();
+
+  private boolean addMessageMD5 = true;
+
+  private int MAX_OUTPUT_SIZE = 32765; // 32766-1
+  private static long doc_counter = 0;
+  private MetricCount messageTruncateMetric = new MetricCount();
+
+  
+  public Collection<Output> getOutputList() {
+    return outputList;
+  }
+
+  public void setOutputList(Collection<Output> outputList) {
+    this.outputList = outputList;
+  }
+
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+    Input input = inputMarker.input;
+
+    // Update the block with the context fields
+    for (Map.Entry<String, String> entry : input.getContextFields()
+      .entrySet()) {
+      if (jsonObj.get(entry.getKey()) == null) {
+        jsonObj.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    // TODO: Ideally most of the overrides should be configurable
+
+    // Add the input type
+    if (jsonObj.get("type") == null) {
+      jsonObj.put("type", input.getStringValue("type"));
+    }
+    if (jsonObj.get("path") == null && input.getFilePath() != null) {
+      jsonObj.put("path", input.getFilePath());
+    }
+    if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
+      jsonObj.put("path", input.getStringValue("path"));
+    }
+
+    // Add host if required
+    if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
+      jsonObj.put("host", LogFeederUtil.hostName);
+    }
+    // Add IP if required
+    if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
+      jsonObj.put("ip", LogFeederUtil.ipAddress);
+    }
+    
+    //Add level
+    if (jsonObj.get("level") == null) {
+      jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
+    }
+    if (input.isUseEventMD5() || input.isGenEventMD5()) {
+      String prefix = "";
+      Object logtimeObj = jsonObj.get("logtime");
+      if (logtimeObj != null) {
+        if (logtimeObj instanceof Date) {
+          prefix = "" + ((Date) logtimeObj).getTime();
+        } else {
+          prefix = logtimeObj.toString();
+        }
+      }
+      Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson()
+        .toJson(jsonObj));
+      if (input.isGenEventMD5()) {
+        jsonObj.put("event_md5", prefix + eventMD5.toString());
+      }
+      if (input.isUseEventMD5()) {
+        jsonObj.put("id", prefix + eventMD5.toString());
+      }
+    }
+
+    // jsonObj.put("@timestamp", new Date());
+    jsonObj.put("seq_num", new Long(doc_counter++));
+    if (jsonObj.get("id") == null) {
+      jsonObj.put("id", UUID.randomUUID().toString());
+    }
+    if (jsonObj.get("event_count") == null) {
+      jsonObj.put("event_count", new Integer(1));
+    }
+    if (inputMarker.lineNumber > 0) {
+      jsonObj.put("logfile_line_number", new Integer(
+        inputMarker.lineNumber));
+    }
+    if (jsonObj.containsKey("log_message")) {
+      // TODO: Let's check size only for log_message for now
+      String logMessage = (String) jsonObj.get("log_message");
+      if (logMessage != null
+        && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+        messageTruncateMetric.count++;
+        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+          + "_MESSAGESIZE";
+        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+          "Message is too big. size="
+            + logMessage.getBytes().length + ", input="
+            + input.getShortDescription()
+            + ". Truncating to " + MAX_OUTPUT_SIZE
+            + ", first upto 100 characters="
+            + LogFeederUtil.subString(logMessage, 100),
+          null, logger, Level.WARN);
+        logMessage = new String(logMessage.getBytes(), 0,
+          MAX_OUTPUT_SIZE);
+        jsonObj.put("log_message", logMessage);
+        // Add error tags
+        @SuppressWarnings("unchecked")
+        List<String> tagsList = (List<String>) jsonObj.get("tags");
+        if (tagsList == null) {
+          tagsList = new ArrayList<String>();
+          jsonObj.put("tags", tagsList);
+        }
+        tagsList.add("error_message_truncated");
+
+      }
+      if (addMessageMD5) {
+        jsonObj.put("message_md5",
+          "" + LogFeederUtil.genHash(logMessage));
+      }
+    }
+    //check log is allowed to send output
+    if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
+      for (Output output : input.getOutputList()) {
+        try {
+          output.write(jsonObj, inputMarker);
+        } catch (Exception e) {
+          logger.error("Error writing. to " + output.getShortDescription(), e);
+        }
+      }
+    }
+  }
+
+  public void write(String jsonBlock, InputMarker inputMarker) {
+    //check log is allowed to send output
+    if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
+      for (Output output : inputMarker.input.getOutputList()) {
+        try {
+          output.write(jsonBlock, inputMarker);
+        } catch (Exception e) {
+          logger.error("Error writing. to " + output.getShortDescription(), e);
+        }
+      }
+    }
+  }
+
+  public void close() {
+    logger.info("Close called for outputs ...");
+    for (Output output : outputList) {
+      try {
+        output.setDrain(true);
+        output.close();
+      } catch (Exception e) {
+        // Ignore
+      }
+    }
+    // Need to get this value from property
+    int iterations = 30;
+    int waitTimeMS = 1000;
+    int i;
+    boolean allClosed = true;
+    for (i = 0; i < iterations; i++) {
+      allClosed = true;
+      for (Output output : outputList) {
+        if (!output.isClosed()) {
+          try {
+            allClosed = false;
+            logger.warn("Waiting for output to close. "
+              + output.getShortDescription() + ", "
+              + (iterations - i) + " more seconds");
+            Thread.sleep(waitTimeMS);
+          } catch (Throwable t) {
+            // Ignore
+          }
+        }
+      }
+      if (allClosed) {
+        break;
+      }
+    }
+
+    if (!allClosed) {
+      logger.warn("Some outpus were not closed. Iterations=" + i);
+      for (Output output : outputList) {
+        if (!output.isClosed()) {
+          logger.warn("Output not closed. Will ignore it."
+            + output.getShortDescription() + ", pendingCound="
+            + output.getPendingCount());
+        }
+      }
+    } else {
+      logger.info("All outputs are closed. Iterations=" + i);
+    }
+  }
+
+  public void logStats() {
+    for (Output output : outputList) {
+      output.logStat();
+    }
+    LogFeederUtil.logStatForMetric(messageTruncateMetric,
+      "Stat: Messages Truncated", null);
+  }
+
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    metricsList.add(messageTruncateMetric);
+    for (Output output : outputList) {
+      output.addMetricsContainers(metricsList);
+    }
+  }
+
+  
+  public void copyFile(File inputFile, InputMarker inputMarker) {
+    Input input = inputMarker.input;
+    for (Output output : input.getOutputList()) {
+      try {
+        output.copyFile(inputFile, inputMarker);
+      }catch (Exception e) {
+        logger.error("Error coyping file . to " + output.getShortDescription(),
+            e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index cbc1045..e95f8df 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -22,14 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.ambari.logfeeder.LogFeeder;
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.output.spool.LogSpooler;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
 import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
 import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
 import org.apache.log4j.Logger;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index b4dac72..cd9ce4d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -33,9 +33,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
index 1bbf33e..58282e0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
@@ -18,9 +18,9 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
 
 import java.util.HashMap;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
index fb597d3..485b0d4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -18,11 +18,11 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import org.apache.ambari.logfeeder.ConfigBlock;
-
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ambari.logfeeder.common.ConfigBlock;
+
 /**
  * Holds all configuration relevant for S3 upload.
  */

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
index dec685f..fd59c51 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -19,9 +19,9 @@
 package org.apache.ambari.logfeeder.output;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.s3.S3Util;
+
 import org.apache.ambari.logfeeder.util.CompressionUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
 import org.apache.log4j.Logger;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
deleted file mode 100644
index d0fbb6c..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-import org.apache.log4j.Logger;
-
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
-
-public enum AWSUtil {
-  INSTANCE;
-  private static final Logger LOG = Logger.getLogger(AWSUtil.class);
-
-  public String getAwsUserName(String accessKey, String secretKey) {
-    String username = null;
-    AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
-    AmazonIdentityManagementClient amazonIdentityManagementClient;
-    if (awsCredentials != null) {
-      amazonIdentityManagementClient = new AmazonIdentityManagementClient(
-          awsCredentials);
-    } else {
-      // create default client
-      amazonIdentityManagementClient = new AmazonIdentityManagementClient();
-    }
-    try {
-      username = amazonIdentityManagementClient.getUser().getUser()
-          .getUserName();
-    } catch (AmazonServiceException e) {
-      if (e.getErrorCode().compareTo("AccessDenied") == 0) {
-        String arn = null;
-        String msg = e.getMessage();
-        int arnIdx = msg.indexOf("arn:aws");
-        if (arnIdx != -1) {
-          int arnSpace = msg.indexOf(" ", arnIdx);
-          // should be similar to "arn:aws:iam::111111111111:user/username"
-          arn = msg.substring(arnIdx, arnSpace);
-        }
-        if (arn != null) {
-          String[] arnParts = arn.split(":");
-          if (arnParts != null && arnParts.length > 5) {
-            username = arnParts[5];
-            if (username != null) {
-              username = username.replace("user/", "");
-            }
-          }
-        }
-      }
-    } catch (Exception exception) {
-      LOG.error(
-          "Error in getting username :" + exception.getLocalizedMessage(),
-          exception.getCause());
-    }
-    return username;
-  }
-
-  public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
-    if (accessKey != null && secretKey != null) {
-      LOG.debug("Creating aws client as per new accesskey and secretkey");
-      AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
-          secretKey);
-      return awsCredentials;
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
deleted file mode 100644
index db187be..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.zip.GZIPInputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Logger;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
-
-/**
- * Utility to connect to s3
- */
-public class S3Util {
-  public static final S3Util INSTANCE = new S3Util();
-
-  private static final Logger LOG = Logger.getLogger(S3Util.class);
-
-  public static final String S3_PATH_START_WITH = "s3://";
-  public static final String S3_PATH_SEPARATOR = "/";
-
-  public AmazonS3 getS3Client(String accessKey, String secretKey) {
-    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
-        accessKey, secretKey);
-    AmazonS3 s3client;
-    if (awsCredentials != null) {
-      s3client = new AmazonS3Client(awsCredentials);
-    } else {
-      s3client = new AmazonS3Client();
-    }
-    return s3client;
-  }
-
-  public TransferManager getTransferManager(String accessKey, String secretKey) {
-    AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
-        accessKey, secretKey);
-    TransferManager transferManager;
-    if (awsCredentials != null) {
-      transferManager = new TransferManager(awsCredentials);
-    } else {
-      transferManager = new TransferManager();
-    }
-    return transferManager;
-  }
-
-  public void shutdownTransferManager(TransferManager transferManager) {
-    if (transferManager != null) {
-      transferManager.shutdownNow();
-    }
-  }
-
-  public String getBucketName(String s3Path) {
-    String bucketName = null;
-    // s3path
-    if (s3Path != null) {
-      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
-          S3_PATH_SEPARATOR);
-      bucketName = s3PathParts[0];
-    }
-    return bucketName;
-  }
-
-  public String getS3Key(String s3Path) {
-    StringBuilder s3Key = new StringBuilder();
-    // s3path
-    if (s3Path != null) {
-      String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
-          S3_PATH_SEPARATOR);
-      ArrayList<String> s3PathList = new ArrayList<String>(
-          Arrays.asList(s3PathParts));
-      s3PathList.remove(0);// remove bucketName
-      for (int index = 0; index < s3PathList.size(); index++) {
-        if (index > 0) {
-          s3Key.append(S3_PATH_SEPARATOR);
-        }
-        s3Key.append(s3PathList.get(index));
-      }
-    }
-    return s3Key.toString();
-  }
-
-  public void uploadFileTos3(String bucketName, String s3Key, File localFile,
-      String accessKey, String secretKey) {
-    TransferManager transferManager = getTransferManager(accessKey, secretKey);
-    try {
-      Upload upload = transferManager.upload(bucketName, s3Key, localFile);
-      upload.waitForUploadResult();
-    } catch (AmazonClientException | InterruptedException e) {
-      LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
-          e);
-    } finally {
-      shutdownTransferManager(transferManager);
-    }
-  }
-
-  /**
-   * Get the buffer reader to read s3 file as a stream
-   */
-  public BufferedReader getReader(String s3Path, String accessKey,
-      String secretKey) throws IOException {
-    // TODO error handling
-    // Compression support
-    // read header and decide the compression(auto detection)
-    // For now hard-code GZIP compression
-    String s3Bucket = getBucketName(s3Path);
-    String s3Key = getS3Key(s3Path);
-    S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
-        new GetObjectRequest(s3Bucket, s3Key));
-    GZIPInputStream objectInputStream;
-    try {
-      objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
-      BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
-          objectInputStream));
-      return bufferedReader;
-    } catch (IOException e) {
-      LOG.error("Error in creating stream reader for s3 file :" + s3Path,
-          e.getCause());
-      throw e;
-    }
-  }
-
-  public void writeIntoS3File(String data, String bucketName, String s3Key,
-      String accessKey, String secretKey) {
-    InputStream in = null;
-    try {
-      in = IOUtils.toInputStream(data, "UTF-8");
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-    if (in != null) {
-      TransferManager transferManager = getTransferManager(accessKey, secretKey);
-      try {
-        if (transferManager != null) {
-          transferManager.upload(
-                  new PutObjectRequest(bucketName, s3Key, in,
-                  new ObjectMetadata())).waitForUploadResult();
-          LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
-              + bucketName);
-        }
-      } catch (AmazonClientException | InterruptedException e) {
-        LOG.error(e);
-      } finally {
-        try {
-          shutdownTransferManager(transferManager);
-          in.close();
-        } catch (IOException e) {
-          // ignore
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
new file mode 100644
index 0000000..15f7594
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
+
+public enum AWSUtil {
+  INSTANCE;
+  private static final Logger LOG = Logger.getLogger(AWSUtil.class);
+
+  public String getAwsUserName(String accessKey, String secretKey) {
+    String username = null;
+    AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
+    AmazonIdentityManagementClient amazonIdentityManagementClient;
+    if (awsCredentials != null) {
+      amazonIdentityManagementClient = new AmazonIdentityManagementClient(
+          awsCredentials);
+    } else {
+      // create default client
+      amazonIdentityManagementClient = new AmazonIdentityManagementClient();
+    }
+    try {
+      username = amazonIdentityManagementClient.getUser().getUser()
+          .getUserName();
+    } catch (AmazonServiceException e) {
+      if (e.getErrorCode().compareTo("AccessDenied") == 0) {
+        String arn = null;
+        String msg = e.getMessage();
+        int arnIdx = msg.indexOf("arn:aws");
+        if (arnIdx != -1) {
+          int arnSpace = msg.indexOf(" ", arnIdx);
+          // should be similar to "arn:aws:iam::111111111111:user/username"
+          arn = msg.substring(arnIdx, arnSpace);
+        }
+        if (arn != null) {
+          String[] arnParts = arn.split(":");
+          if (arnParts != null && arnParts.length > 5) {
+            username = arnParts[5];
+            if (username != null) {
+              username = username.replace("user/", "");
+            }
+          }
+        }
+      }
+    } catch (Exception exception) {
+      LOG.error(
+          "Error in getting username :" + exception.getLocalizedMessage(),
+          exception.getCause());
+    }
+    return username;
+  }
+
+  public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
+    if (accessKey != null && secretKey != null) {
+      LOG.debug("Creating aws client as per new accesskey and secretkey");
+      AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
+          secretKey);
+      return awsCredentials;
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
new file mode 100644
index 0000000..a92ba29
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+public class AliasUtil {
+
+  private static Logger logger = Logger.getLogger(AliasUtil.class);
+
+  private static AliasUtil instance = null;
+
+  private static String aliasConfigJson = "alias_config.json";
+
+  private HashMap<String, Object> aliasMap = null;
+
+  public static enum ALIAS_TYPE {
+    INPUT, FILTER, MAPPER, OUTPUT
+  }
+
+  public static enum ALIAS_PARAM {
+    KLASS
+  }
+
+  private AliasUtil() {
+    init();
+  }
+
+  public static AliasUtil getInstance() {
+    if (instance == null) {
+      synchronized (AliasUtil.class) {
+        if (instance == null) {
+          instance = new AliasUtil();
+        }
+      }
+    }
+    return instance;
+  }
+
+  /**
+   */
+  private void init() {
+    File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson);
+    if (jsonFile != null) {
+      this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile);
+    }
+
+  }
+
+
+  public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) {
+    String result = key;// key as a default value;
+    HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
+    String value = aliasInfo.get(aliasParam.name().toLowerCase());
+    if (value != null && !value.isEmpty()) {
+      result = value;
+      logger.debug("Alias found for key :" + key + ",  param :" + aliasParam.name().toLowerCase() + ", value :"
+        + value + " aliastype:" + aliastype.name());
+    } else {
+      logger.debug("Alias not found for key :" + key + ", param :" + aliasParam.name().toLowerCase());
+    }
+    return result;
+  }
+
+  @SuppressWarnings("unchecked")
+  private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE aliastype) {
+    HashMap<String, String> aliasInfo = null;
+    if (aliasMap != null) {
+      String typeKey = aliastype.name().toLowerCase();
+      HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey);
+      if (typeJson != null) {
+        aliasInfo = (HashMap<String, String>) typeJson.get(key);
+      }
+    }
+    if (aliasInfo == null) {
+      aliasInfo = new HashMap<String, String>();
+    }
+    return aliasInfo;
+  }
+}