You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/28 09:42:00 UTC

[33/52] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)

AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0a3cdccd
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0a3cdccd
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0a3cdccd

Branch: refs/heads/logsearch-ga
Commit: 0a3cdccda2b05f6c19046fd73bf2381e9a474a6d
Parents: 57ef91f
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Sep 7 22:24:51 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Wed Sep 28 11:26:42 2016 +0200

----------------------------------------------------------------------
 .../org/apache/ambari/logfeeder/LogFeeder.java  | 594 +++++++------------
 .../ambari/logfeeder/common/ConfigBlock.java    |  71 +--
 .../logfeeder/common/LogFeederConstants.java    |  39 ++
 .../apache/ambari/logfeeder/filter/Filter.java  |  55 +-
 .../ambari/logfeeder/filter/FilterGrok.java     |  94 +--
 .../ambari/logfeeder/filter/FilterJSON.java     |   8 +-
 .../ambari/logfeeder/filter/FilterKeyValue.java |  44 +-
 .../logfeeder/input/AbstractInputFile.java      | 319 ++++++++++
 .../apache/ambari/logfeeder/input/Input.java    | 313 +++++-----
 .../ambari/logfeeder/input/InputFile.java       | 503 ++--------------
 .../ambari/logfeeder/input/InputManager.java    | 379 ++++++++++++
 .../ambari/logfeeder/input/InputMarker.java     |  17 +-
 .../apache/ambari/logfeeder/input/InputMgr.java | 451 --------------
 .../ambari/logfeeder/input/InputS3File.java     | 424 +------------
 .../ambari/logfeeder/input/InputSimulate.java   |  40 +-
 .../logfeeder/input/reader/GZIPReader.java      |  23 +-
 .../input/reader/LogsearchReaderFactory.java    |   8 +-
 .../logconfig/FetchConfigFromSolr.java          | 194 ------
 .../logfeeder/logconfig/FilterLogData.java      |  83 +++
 .../logfeeder/logconfig/LogConfigFetcher.java   | 168 ++++++
 .../logfeeder/logconfig/LogConfigHandler.java   | 189 ++++++
 .../logfeeder/logconfig/LogFeederConstants.java |  34 --
 .../logfeeder/logconfig/LogFeederFilter.java    |  90 +++
 .../logconfig/LogFeederFilterWrapper.java       |  55 ++
 .../logfeeder/logconfig/LogfeederScheduler.java |  59 --
 .../logconfig/filter/ApplyLogFilter.java        |  62 --
 .../logconfig/filter/DefaultDataFilter.java     |  49 --
 .../logconfig/filter/FilterLogData.java         |  53 --
 .../apache/ambari/logfeeder/mapper/Mapper.java  |  14 +-
 .../ambari/logfeeder/mapper/MapperDate.java     |  32 +-
 .../logfeeder/mapper/MapperFieldName.java       |  20 +-
 .../logfeeder/mapper/MapperFieldValue.java      |  31 +-
 .../logfeeder/metrics/LogFeederAMSClient.java   |  10 +-
 .../ambari/logfeeder/metrics/MetricCount.java   |  31 -
 .../ambari/logfeeder/metrics/MetricData.java    |  46 ++
 .../logfeeder/metrics/MetricsManager.java       | 178 ++++++
 .../ambari/logfeeder/metrics/MetricsMgr.java    | 178 ------
 .../apache/ambari/logfeeder/output/Output.java  |  13 +-
 .../ambari/logfeeder/output/OutputData.java     |   8 +-
 .../ambari/logfeeder/output/OutputDevNull.java  |   7 +-
 .../ambari/logfeeder/output/OutputFile.java     |  42 +-
 .../ambari/logfeeder/output/OutputHDFSFile.java |  70 +--
 .../ambari/logfeeder/output/OutputKafka.java    |  58 +-
 .../ambari/logfeeder/output/OutputManager.java  | 250 ++++++++
 .../ambari/logfeeder/output/OutputMgr.java      | 263 --------
 .../ambari/logfeeder/output/OutputS3File.java   |  41 +-
 .../ambari/logfeeder/output/OutputSolr.java     |  62 +-
 .../logfeeder/output/S3LogPathResolver.java     |   6 +-
 .../logfeeder/output/S3OutputConfiguration.java |   5 +-
 .../ambari/logfeeder/output/S3Uploader.java     |  64 +-
 .../logfeeder/output/spool/LogSpooler.java      |  23 +-
 .../output/spool/LogSpoolerContext.java         |   2 +-
 .../output/spool/LogSpoolerException.java       |   2 +-
 .../output/spool/RolloverCondition.java         |   2 +-
 .../logfeeder/output/spool/RolloverHandler.java |   2 +-
 .../apache/ambari/logfeeder/util/AWSUtil.java   |  52 +-
 .../apache/ambari/logfeeder/util/AliasUtil.java | 103 ++--
 .../ambari/logfeeder/util/CompressionUtil.java  |  15 +-
 .../apache/ambari/logfeeder/util/DateUtil.java  |  39 +-
 .../apache/ambari/logfeeder/util/FileUtil.java  |  66 ++-
 .../ambari/logfeeder/util/LogFeederUtil.java    | 511 +++++-----------
 .../logfeeder/util/LogfeederHDFSUtil.java       |  58 +-
 .../ambari/logfeeder/util/PlaceholderUtil.java  |  32 +-
 .../apache/ambari/logfeeder/util/S3Util.java    |  81 +--
 .../apache/ambari/logfeeder/util/SolrUtil.java  | 186 ------
 .../ambari/logfeeder/view/VLogfeederFilter.java |  90 ---
 .../logfeeder/view/VLogfeederFilterWrapper.java |  55 --
 .../org/apache/ambari/logfeeder/AppTest.java    | 116 ----
 .../ambari/logfeeder/filter/FilterGrokTest.java |  55 +-
 .../ambari/logfeeder/filter/FilterJSONTest.java |  41 +-
 .../logfeeder/filter/FilterKeyValueTest.java    |  34 +-
 .../ambari/logfeeder/input/InputFileTest.java   |  24 +-
 .../logfeeder/input/InputManagerTest.java       | 241 ++++++++
 .../logconfig/LogConfigHandlerTest.java         | 117 ++++
 .../ambari/logfeeder/mapper/MapperDateTest.java |  17 +-
 .../logfeeder/mapper/MapperFieldNameTest.java   |   2 +-
 .../logfeeder/mapper/MapperFieldValueTest.java  |   2 +-
 .../logfeeder/metrics/MetrcisManagerTest.java   | 128 ++++
 .../logfeeder/output/OutputKafkaTest.java       |   5 +-
 .../logfeeder/output/OutputManagerTest.java     | 256 ++++++++
 .../logfeeder/output/OutputS3FileTest.java      |  17 +-
 .../ambari/logfeeder/output/OutputSolrTest.java |   5 +-
 .../logfeeder/output/S3LogPathResolverTest.java |   2 +-
 .../ambari/logfeeder/output/S3UploaderTest.java |  42 +-
 .../logfeeder/output/spool/LogSpoolerTest.java  |   2 +-
 .../ambari/logfeeder/util/AWSUtilTest.java      |  29 -
 .../logfeeder/util/PlaceholderUtilTest.java     |  20 +-
 .../ambari/logfeeder/util/S3UtilTest.java       |   4 +-
 .../src/test/resources/logfeeder.properties     |  20 +
 89 files changed, 3864 insertions(+), 4481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 373d743..6d0f22c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -20,14 +20,10 @@
 package org.apache.ambari.logfeeder;
 
 import java.io.BufferedInputStream;
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
@@ -39,18 +35,21 @@ import java.util.Set;
 
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMgr;
+import org.apache.ambari.logfeeder.input.InputManager;
 import org.apache.ambari.logfeeder.input.InputSimulate;
-import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.metrics.MetricsMgr;
+import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.metrics.MetricsManager;
 import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.ambari.logfeeder.util.AliasUtil;
 import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -58,171 +57,142 @@ import org.apache.log4j.Logger;
 import com.google.gson.reflect.TypeToken;
 
 public class LogFeeder {
-  private static final Logger logger = Logger.getLogger(LogFeeder.class);
+  private static final Logger LOG = Logger.getLogger(LogFeeder.class);
 
   private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
+  private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours
 
-  private Collection<Output> outputList = new ArrayList<Output>();
+  private OutputManager outputManager = new OutputManager();
+  private InputManager inputManager = new InputManager();
+  private MetricsManager metricsManager = new MetricsManager();
 
-  private OutputMgr outMgr = new OutputMgr();
-  private InputMgr inputMgr = new InputMgr();
-  private MetricsMgr metricsMgr = new MetricsMgr();
+  public static Map<String, Object> globalConfigs = new HashMap<>();
 
-  public static Map<String, Object> globalMap = null;
-  private String[] inputParams;
-
-  private List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>();
-  private List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
-  private List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>();
-  private List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>();
+  private List<Map<String, Object>> inputConfigList = new ArrayList<>();
+  private List<Map<String, Object>> filterConfigList = new ArrayList<>();
+  private List<Map<String, Object>> outputConfigList = new ArrayList<>();
   
-  private int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours
   private long lastCheckPointCleanedMS = 0;
-  
-  private static boolean isLogfeederCompleted = false;
-  
+  private boolean isLogfeederCompleted = false;
   private Thread statLoggerThread = null;
 
-  private LogFeeder(String[] args) {
-    inputParams = args;
+  private LogFeeder() {}
+
+  public void run() {
+    try {
+      init();
+      monitor();
+      waitOnAllDaemonThreads();
+    } catch (Throwable t) {
+      LOG.fatal("Caught exception in main.", t);
+      System.exit(1);
+    }
   }
 
   private void init() throws Throwable {
+    Date startTime = new Date();
 
-    LogFeederUtil.loadProperties("logfeeder.properties", inputParams);
-
-    String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files");
-    logger.info("logfeeder.config.files=" + configFiles);
+    loadConfigFiles();
+    addSimulatedInputs();
+    mergeAllConfigs();
     
-    String[] configFileList = null;
-    if (configFiles != null) {
-      configFileList = configFiles.split(",");
-    }
-    //list of config those are there in cmd line config dir , end with .json
-    String[] cmdLineConfigs = getConfigFromCmdLine();
-    //merge both config
-    String mergedConfigList[] = LogFeederUtil.mergeArray(configFileList,
-        cmdLineConfigs);
-    //mergedConfigList is null then set default conifg 
-    if (mergedConfigList == null || mergedConfigList.length == 0) {
-      mergedConfigList = LogFeederUtil.getStringProperty("config.file",
-          "config.json").split(",");
-    }
-    for (String configFileName : mergedConfigList) {
-      logger.info("Going to load config file:" + configFileName);
-      //escape space from config file path
-      configFileName= configFileName.replace("\\ ", "%20");
+    LogConfigHandler.handleConfig();
+    
+    outputManager.init();
+    inputManager.init();
+    metricsManager.init();
+    
+    LOG.debug("==============");
+    
+    Date endTime = new Date();
+    LOG.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize");
+  }
+
+  private void loadConfigFiles() throws Exception {
+    List<String> configFiles = getConfigFiles();
+    for (String configFileName : configFiles) {
+      LOG.info("Going to load config file:" + configFileName);
+      configFileName = configFileName.replace("\\ ", "%20");
       File configFile = new File(configFileName);
       if (configFile.exists() && configFile.isFile()) {
-        logger.info("Config file exists in path."
-          + configFile.getAbsolutePath());
+        LOG.info("Config file exists in path." + configFile.getAbsolutePath());
         loadConfigsUsingFile(configFile);
       } else {
-        // Let's try to load it from class loader
-        logger.info("Trying to load config file from classloader: "
-          + configFileName);
+        LOG.info("Trying to load config file from classloader: " + configFileName);
         loadConfigsUsingClassLoader(configFileName);
-        logger.info("Loaded config file from classloader: "
-          + configFileName);
+        LOG.info("Loaded config file from classloader: " + configFileName);
       }
     }
-    
-    addSimulatedInputs();
-    
-    mergeAllConfigs();
-    
-    LogfeederScheduler.INSTANCE.start();
-    
-    outMgr.setOutputList(outputList);
-    for (Output output : outputList) {
-      output.init();
-    }
-    inputMgr.init();
-    metricsMgr.init();
-    logger.debug("==============");
   }
 
-  private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
-    BufferedInputStream fileInputStream = null;
-    BufferedReader br = null;
-    try {
-      fileInputStream = (BufferedInputStream) this
-        .getClass().getClassLoader()
-        .getResourceAsStream(configFileName);
-      if (fileInputStream != null) {
-        br = new BufferedReader(new InputStreamReader(
-          fileInputStream));
-        String configData = readFile(br);
-        loadConfigs(configData);
-      } else {
-        throw new Exception("Can't find configFile=" + configFileName);
-      }
-    } finally {
-      if (br != null) {
-        try {
-          br.close();
-        } catch (IOException e) {
-        }
-      }
+  private List<String> getConfigFiles() {
+    List<String> configFiles = new ArrayList<>();
+    
+    String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
+    LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
+    if (logfeederConfigFilesProperty != null) {
+      configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
+    }
 
-      if (fileInputStream != null) {
-        try {
-          fileInputStream.close();
-        } catch (IOException e) {
-        }
+    String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
+    if (StringUtils.isNotEmpty(inputConfigDir)) {
+      File configDirFile = new File(inputConfigDir);
+      List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
+      for (File inputConfigFile : inputConfigFiles) {
+        configFiles.add(inputConfigFile.getAbsolutePath());
       }
     }
+    
+    if (CollectionUtils.isEmpty(configFiles)) {
+      String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
+      configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
+    }
+    
+    return configFiles;
   }
 
-  /**
-   * This method loads the configurations from the given file.
-   */
   private void loadConfigsUsingFile(File configFile) throws Exception {
-    FileInputStream fileInputStream = null;
     try {
-      fileInputStream = new FileInputStream(configFile);
-      BufferedReader br = new BufferedReader(new InputStreamReader(
-        fileInputStream));
-      String configData = readFile(br);
+      String configData = FileUtils.readFileToString(configFile);
       loadConfigs(configData);
     } catch (Exception t) {
-      logger.error("Error opening config file. configFilePath="
-        + configFile.getAbsolutePath());
+      LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
       throw t;
-    } finally {
-      if (fileInputStream != null) {
-        try {
-          fileInputStream.close();
-        } catch (Throwable t) {
-          // ignore
-        }
-      }
+    }
+  }
+
+  private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
+    try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
+      String configData = IOUtils.toString(fis);
+      loadConfigs(configData);
     }
   }
 
   @SuppressWarnings("unchecked")
   private void loadConfigs(String configData) throws Exception {
-    Type type = new TypeToken<Map<String, Object>>() {
-    }.getType();
-    Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(
-      configData, type);
+    Type type = new TypeToken<Map<String, Object>>() {}.getType();
+    Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
 
     // Get the globals
     for (String key : configMap.keySet()) {
-      if (key.equalsIgnoreCase("global")) {
-        globalConfigList.add((Map<String, Object>) configMap.get(key));
-      } else if (key.equalsIgnoreCase("input")) {
-        List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
-          .get(key);
-        inputConfigList.addAll(mapList);
-      } else if (key.equalsIgnoreCase("filter")) {
-        List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
-          .get(key);
-        filterConfigList.addAll(mapList);
-      } else if (key.equalsIgnoreCase("output")) {
-        List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
-          .get(key);
-        outputConfigList.addAll(mapList);
+      switch (key) {
+        case "global" :
+          globalConfigs.putAll((Map<String, Object>) configMap.get(key));
+          break;
+        case "input" :
+          List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
+          inputConfigList.addAll(inputConfig);
+          break;
+        case "filter" :
+          List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
+          filterConfigList.addAll(filterConfig);
+          break;
+        case "output" :
+          List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
+          outputConfigList.addAll(outputConfig);
+          break;
+        default :
+          LOG.warn("Unknown config key: " + key);
       }
     }
   }
@@ -244,231 +214,175 @@ public class LogFeeder {
   }
 
   private void mergeAllConfigs() {
-    globalMap = mergeConfigs(globalConfigList);
+    loadOutputs();
+    loadInputs();
+    loadFilters();
+    
+    assignOutputsToInputs();
+  }
 
-    sortBlocks(filterConfigList);
-    // First loop for output
+  private void loadOutputs() {
     for (Map<String, Object> map : outputConfigList) {
       if (map == null) {
         continue;
       }
-      mergeBlocks(globalMap, map);
+      mergeBlocks(globalConfigs, map);
 
       String value = (String) map.get("destination");
-      Output output;
-      if (value == null || value.isEmpty()) {
-        logger.error("Output block doesn't have destination element");
-        continue;
-      }
-      String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.OUTPUT, ALIAS_PARAM.KLASS);
-      if (classFullName == null || classFullName.isEmpty()) {
-        logger.error("Destination block doesn't have output element");
+      if (StringUtils.isEmpty(value)) {
+        LOG.error("Output block doesn't have destination element");
         continue;
       }
-      output = (Output) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.OUTPUT);
-
+      Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
       if (output == null) {
-        logger.error("Destination Object is null");
+        LOG.error("Output object could not be found");
         continue;
       }
-
       output.setDestination(value);
       output.loadConfig(map);
 
-      // We will only check for is_enabled out here. Down below we will
-      // check whether this output is enabled for the input
-      boolean isEnabled = output.getBooleanValue("is_enabled", true);
-      if (isEnabled) {
-        outputList.add(output);
+      // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
+      if (output.getBooleanValue("is_enabled", true)) {
         output.logConfgs(Level.INFO);
+        outputManager.add(output);
       } else {
-        logger.info("Output is disabled. So ignoring it. "
-          + output.getShortDescription());
+        LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
       }
     }
+  }
 
-    // Second loop for input
+  private void loadInputs() {
     for (Map<String, Object> map : inputConfigList) {
       if (map == null) {
         continue;
       }
-      mergeBlocks(globalMap, map);
+      mergeBlocks(globalConfigs, map);
 
       String value = (String) map.get("source");
-      Input input;
-      if (value == null || value.isEmpty()) {
-        logger.error("Input block doesn't have source element");
-        continue;
-      }
-      String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.INPUT, ALIAS_PARAM.KLASS);
-      if (classFullName == null || classFullName.isEmpty()) {
-        logger.error("Source block doesn't have source element");
+      if (StringUtils.isEmpty(value)) {
+        LOG.error("Input block doesn't have source element");
         continue;
       }
-      input = (Input) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.INPUT);
-
+      Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
       if (input == null) {
-        logger.error("Source Object is null");
+        LOG.error("Input object could not be found");
         continue;
       }
-
       input.setType(value);
       input.loadConfig(map);
 
       if (input.isEnabled()) {
-        input.setOutputMgr(outMgr);
-        input.setInputMgr(inputMgr);
-        inputMgr.add(input);
+        input.setOutputManager(outputManager);
+        input.setInputManager(inputManager);
+        inputManager.add(input);
         input.logConfgs(Level.INFO);
       } else {
-        logger.info("Input is disabled. So ignoring it. "
-          + input.getShortDescription());
+        LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
       }
     }
+  }
+
+  private void loadFilters() {
+    sortFilters();
 
-    // Third loop is for filter, but we will have to create a filter
-    // instance for each input, so it can maintain the state per input
     List<Input> toRemoveInputList = new ArrayList<Input>();
-    for (Input input : inputMgr.getInputList()) {
-      Filter prevFilter = null;
+    for (Input input : inputManager.getInputList()) {
       for (Map<String, Object> map : filterConfigList) {
         if (map == null) {
           continue;
         }
-        mergeBlocks(globalMap, map);
+        mergeBlocks(globalConfigs, map);
 
         String value = (String) map.get("filter");
-        Filter filter;
-        if (value == null || value.isEmpty()) {
-          logger.error("Filter block doesn't have filter element");
-          continue;
-        }
-
-        String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.FILTER, ALIAS_PARAM.KLASS);
-        if (classFullName == null || classFullName.isEmpty()) {
-          logger.error("Filter block doesn't have filter element");
+        if (StringUtils.isEmpty(value)) {
+          LOG.error("Filter block doesn't have filter element");
           continue;
         }
-        filter = (Filter) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.FILTER);
-
+        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
         if (filter == null) {
-          logger.error("Filter Object is null");
+          LOG.error("Filter object could not be found");
           continue;
         }
         filter.loadConfig(map);
         filter.setInput(input);
 
         if (filter.isEnabled()) {
-          filter.setOutputMgr(outMgr);
-          if (prevFilter == null) {
-            input.setFirstFilter(filter);
-          } else {
-            prevFilter.setNextFilter(filter);
-          }
-          prevFilter = filter;
+          filter.setOutputManager(outputManager);
+          input.addFilter(filter);
           filter.logConfgs(Level.INFO);
         } else {
-          logger.debug("Ignoring filter "
-            + filter.getShortDescription() + " for input "
-            + input.getShortDescription());
+          LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
         }
       }
+      
       if (input.getFirstFilter() == null) {
         toRemoveInputList.add(input);
       }
     }
 
-    // Fourth loop is for associating valid outputs to input
-    Set<Output> usedOutputSet = new HashSet<Output>();
-    for (Input input : inputMgr.getInputList()) {
-      for (Output output : outputList) {
-        boolean ret = LogFeederUtil.isEnabled(output.getConfigs(),
-          input.getConfigs());
-        if (ret) {
-          usedOutputSet.add(output);
-          input.addOutput(output);
-        }
-      }
-    }
-    outputList = usedOutputSet;
-
     for (Input toRemoveInput : toRemoveInputList) {
-      logger.warn("There are no filters, we will ignore this input. "
-        + toRemoveInput.getShortDescription());
-      inputMgr.removeInput(toRemoveInput);
+      LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
+      inputManager.removeInput(toRemoveInput);
     }
   }
 
-  private void sortBlocks(List<Map<String, Object>> blockList) {
-
-    Collections.sort(blockList, new Comparator<Map<String, Object>>() {
+  private void sortFilters() {
+    Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
 
       @Override
       public int compare(Map<String, Object> o1, Map<String, Object> o2) {
         Object o1Sort = o1.get("sort_order");
         Object o2Sort = o2.get("sort_order");
-        if (o1Sort == null) {
-          return 0;
-        }
-        if (o2Sort == null) {
+        if (o1Sort == null || o2Sort == null) {
           return 0;
         }
-        int o1Value = 0;
-        if (!(o1Sort instanceof Number)) {
-          try {
-            o1Value = (new Double(Double.parseDouble(o1Sort
-              .toString()))).intValue();
-          } catch (Throwable t) {
-            logger.error("Value is not of type Number. class="
-              + o1Sort.getClass().getName() + ", value="
-              + o1Sort.toString() + ", map=" + o1.toString());
-          }
-        } else {
-          o1Value = ((Number) o1Sort).intValue();
-        }
-        int o2Value = 0;
-        if (!(o2Sort instanceof Integer)) {
+        
+        int o1Value = parseSort(o1, o1Sort);
+        int o2Value = parseSort(o2, o2Sort);
+        
+        return o1Value - o2Value;
+      }
+
+      private int parseSort(Map<String, Object> map, Object o) {
+        if (!(o instanceof Number)) {
           try {
-            o2Value = (new Double(Double.parseDouble(o2Sort
-              .toString()))).intValue();
+            return (new Double(Double.parseDouble(o.toString()))).intValue();
           } catch (Throwable t) {
-            logger.error("Value is not of type Number. class="
-              + o2Sort.getClass().getName() + ", value="
-              + o2Sort.toString() + ", map=" + o2.toString());
+            LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
+              + ", map=" + map.toString());
+            return 0;
           }
         } else {
-
+          return ((Number) o).intValue();
         }
-        return o1Value - o2Value;
       }
     });
   }
 
-  private Map<String, Object> mergeConfigs(
-    List<Map<String, Object>> configList) {
-    Map<String, Object> mergedConfig = new HashMap<String, Object>();
-    for (Map<String, Object> config : configList) {
-      mergeBlocks(config, mergedConfig);
+  private void assignOutputsToInputs() {
+    Set<Output> usedOutputSet = new HashSet<Output>();
+    for (Input input : inputManager.getInputList()) {
+      for (Output output : outputManager.getOutputs()) {
+        if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
+          usedOutputSet.add(output);
+          input.addOutput(output);
+        }
+      }
     }
-    return mergedConfig;
+    outputManager.retainUsedOutputs(usedOutputSet);
   }
 
-  private void mergeBlocks(Map<String, Object> fromMap,
-                           Map<String, Object> toMap) {
-    // Merge the non-string
+  @SuppressWarnings("unchecked")
+  private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
     for (String key : fromMap.keySet()) {
       Object objValue = fromMap.get(key);
       if (objValue == null) {
         continue;
       }
       if (objValue instanceof Map) {
-        @SuppressWarnings("unchecked")
-        Map<String, Object> globalFields = LogFeederUtil
-          .cloneObject((Map<String, Object>) fromMap.get(key));
+        Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
 
-        @SuppressWarnings("unchecked")
-        Map<String, Object> localFields = (Map<String, Object>) toMap
-          .get(key);
+        Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
         if (localFields == null) {
           localFields = new HashMap<String, Object>();
           toMap.put(key, localFields);
@@ -477,8 +391,7 @@ public class LogFeeder {
         if (globalFields != null) {
           for (String fieldKey : globalFields.keySet()) {
             if (!localFields.containsKey(fieldKey)) {
-              localFields.put(fieldKey,
-                globalFields.get(fieldKey));
+              localFields.put(fieldKey, globalFields.get(fieldKey));
             }
           }
         }
@@ -493,11 +406,29 @@ public class LogFeeder {
     }
   }
 
+  private class JVMShutdownHook extends Thread {
+
+    public void run() {
+      try {
+        LOG.info("Processing is shutting down.");
+
+        inputManager.close();
+        outputManager.close();
+        inputManager.checkInAll();
+
+        logStats();
+
+        LOG.info("LogSearch is exiting.");
+      } catch (Throwable t) {
+        // Ignore
+      }
+    }
+  }
+
   private void monitor() throws Exception {
-    inputMgr.monitor();
+    inputManager.monitor();
     JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
-    ShutdownHookManager.get().addShutdownHook(logfeederJVMHook,
-        LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
+    ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
     
     statLoggerThread = new Thread("statLogger") {
 
@@ -512,17 +443,14 @@ public class LogFeeder {
           try {
             logStats();
           } catch (Throwable t) {
-            logger.error(
-              "LogStats: Caught exception while logging stats.",
-              t);
+            LOG.error("LogStats: Caught exception while logging stats.", t);
           }
 
-          if (System.currentTimeMillis() > (lastCheckPointCleanedMS + checkPointCleanIntervalMS)) {
+          if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
             lastCheckPointCleanedMS = System.currentTimeMillis();
-            inputMgr.cleanCheckPointFiles();
+            inputManager.cleanCheckPointFiles();
           }
 
-          // logfeeder is stopped then break the loop
           if (isLogfeederCompleted) {
             break;
           }
@@ -536,84 +464,20 @@ public class LogFeeder {
   }
 
   private void logStats() {
-    inputMgr.logStats();
-    outMgr.logStats();
-
-    if (metricsMgr.isMetricsEnabled()) {
-      List<MetricCount> metricsList = new ArrayList<MetricCount>();
-      inputMgr.addMetricsContainers(metricsList);
-      outMgr.addMetricsContainers(metricsList);
-      metricsMgr.useMetrics(metricsList);
-    }
-  }
-
-  private String readFile(BufferedReader br) throws Exception {
-    try {
-      StringBuilder sb = new StringBuilder();
-      String line = br.readLine();
-      while (line != null) {
-        sb.append(line);
-        line = br.readLine();
-      }
-      return sb.toString();
-    } catch (Exception t) {
-      logger.error("Error loading properties file.", t);
-      throw t;
-    }
-  }
-
-  public Collection<Output> getOutputList() {
-    return outputList;
-  }
-
-  public OutputMgr getOutMgr() {
-    return outMgr;
-  }
-
-  public static void main(String[] args) {
-    LogFeeder logFeeder = new LogFeeder(args);
-    logFeeder.run();
-  }
-
-  public void run() {
-    try {
-      Date startTime = new Date();
-      this.init();
-      Date endTime = new Date();
-      logger.info("Took " + (endTime.getTime() - startTime.getTime())
-        + " ms to initialize");
-      this.monitor();
-      //wait for all background thread before stop main thread
-      this.waitOnAllDaemonThreads();
-    } catch (Throwable t) {
-      logger.fatal("Caught exception in main.", t);
-      System.exit(1);
+    inputManager.logStats();
+    outputManager.logStats();
+
+    if (metricsManager.isMetricsEnabled()) {
+      List<MetricData> metricsList = new ArrayList<MetricData>();
+      inputManager.addMetricsContainers(metricsList);
+      outputManager.addMetricsContainers(metricsList);
+      metricsManager.useMetrics(metricsList);
     }
   }
 
-  private class JVMShutdownHook extends Thread {
-
-    public void run() {
-      try {
-        logger.info("Processing is shutting down.");
-
-        inputMgr.close();
-        outMgr.close();
-        inputMgr.checkInAll();
-
-        logStats();
-
-        logger.info("LogSearch is exiting.");
-      } catch (Throwable t) {
-        // Ignore
-      }
-    }
-  }
-  
   private void waitOnAllDaemonThreads() {
-    String foreground = LogFeederUtil.getStringProperty("foreground");
-    if (foreground != null && foreground.equalsIgnoreCase("true")) {
-      inputMgr.waitOnAllInputs();
+    if ("true".equals(LogFeederUtil.getStringProperty("foreground"))) {
+      inputManager.waitOnAllInputs();
       isLogfeederCompleted = true;
       if (statLoggerThread != null) {
         try {
@@ -624,24 +488,16 @@ public class LogFeeder {
       }
     }
   }
-  
-  private String[] getConfigFromCmdLine() {
-    String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
-    if (inputConfigDir != null && !inputConfigDir.isEmpty()) {
-      String[] searchFileWithExtensions = new String[] { "json" };
-      File configDirFile = new File(inputConfigDir);
-      List<File> configFiles = FileUtil.getAllFileFromDir(configDirFile,
-          searchFileWithExtensions, false);
-      if (configFiles != null && configFiles.size() > 0) {
-        String configPaths[] = new String[configFiles.size()];
-        for (int index = 0; index < configFiles.size(); index++) {
-          File configFile = configFiles.get(index);
-          String configFilePath = configFile.getAbsolutePath();
-          configPaths[index] = configFilePath;
-        }
-        return configPaths;
-      }
+
+  public static void main(String[] args) {
+    try {
+      LogFeederUtil.loadProperties("logfeeder.properties", args);
+    } catch (Throwable t) {
+      LOG.warn("Could not load logfeeder properites");
+      System.exit(1);
     }
-    return new String[0];
+
+    LogFeeder logFeeder = new LogFeeder();
+    logFeeder.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
index 287982f..47ddc51 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
@@ -23,27 +23,27 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
 
 
 public abstract class ConfigBlock {
-  static private Logger logger = Logger.getLogger(ConfigBlock.class);
+  private static final Logger LOG = Logger.getLogger(ConfigBlock.class);
 
   private boolean drain = false;
 
   protected Map<String, Object> configs;
   protected Map<String, String> contextFields = new HashMap<String, String>();
-  public MetricCount statMetric = new MetricCount();
-
-  /**
-   *
-   */
+  public MetricData statMetric = new MetricData(getStatMetricName(), false);
+  protected String getStatMetricName() {
+    return null;
+  }
+  
   public ConfigBlock() {
-    super();
   }
 
   /**
@@ -58,10 +58,7 @@ public abstract class ConfigBlock {
     return this.getClass().getSimpleName();
   }
 
-  /**
-   * @param metricsList
-   */
-  public void addMetricsContainers(List<MetricCount> metricsList) {
+  public void addMetricsContainers(List<MetricData> metricsList) {
     metricsList.add(statMetric);
   }
 
@@ -89,25 +86,21 @@ public abstract class ConfigBlock {
     boolean isEnabled = getBooleanValue("is_enabled", true);
     if (isEnabled) {
       // Let's check for static conditions
-      Map<String, Object> conditions = (Map<String, Object>) configs
-        .get("conditions");
+      Map<String, Object> conditions = (Map<String, Object>) configs.get("conditions");
       boolean allow = true;
-      if (conditions != null && conditions.size() > 0) {
+      if (MapUtils.isNotEmpty(conditions)) {
         allow = false;
         for (String conditionType : conditions.keySet()) {
           if (conditionType.equalsIgnoreCase("fields")) {
-            Map<String, Object> fields = (Map<String, Object>) conditions
-              .get("fields");
+            Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
             for (String fieldName : fields.keySet()) {
               Object values = fields.get(fieldName);
               if (values instanceof String) {
-                allow = isFieldConditionMatch(fieldName,
-                  (String) values);
+                allow = isFieldConditionMatch(fieldName, (String) values);
               } else {
                 List<String> listValues = (List<String>) values;
                 for (String stringValue : listValues) {
-                  allow = isFieldConditionMatch(fieldName,
-                    stringValue);
+                  allow = isFieldConditionMatch(fieldName, stringValue);
                   if (allow) {
                     break;
                   }
@@ -135,8 +128,7 @@ public abstract class ConfigBlock {
       allow = true;
     } else {
       @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) configs
-        .get("add_fields");
+      Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields");
       if (addFields != null && addFields.get(fieldName) != null) {
         String addFieldValue = (String) addFields.get(fieldName);
         if (stringValue.equalsIgnoreCase(addFieldValue)) {
@@ -184,12 +176,7 @@ public abstract class ConfigBlock {
     String strValue = getStringValue(key);
     boolean retValue = defaultValue;
     if (!StringUtils.isEmpty(strValue)) {
-      if (strValue.equalsIgnoreCase("true")
-        || strValue.equalsIgnoreCase("yes")) {
-        retValue = true;
-      } else {
-        retValue = false;
-      }
+      retValue = (strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("yes"));
     }
     return retValue;
   }
@@ -201,8 +188,7 @@ public abstract class ConfigBlock {
       try {
         retValue = Integer.parseInt(strValue);
       } catch (Throwable t) {
-        logger.error("Error parsing integer value. key=" + key
-          + ", value=" + strValue);
+        LOG.error("Error parsing integer value. key=" + key + ", value=" + strValue);
       }
     }
     return retValue;
@@ -215,8 +201,7 @@ public abstract class ConfigBlock {
       try {
         retValue = Long.parseLong(strValue);
       } catch (Throwable t) {
-        logger.error("Error parsing long value. key=" + key + ", value="
-            + strValue);
+        LOG.error("Error parsing long value. key=" + key + ", value=" + strValue);
       }
     }
     return retValue;
@@ -227,29 +212,27 @@ public abstract class ConfigBlock {
   }
 
   public void incrementStat(int count) {
-    statMetric.count += count;
+    statMetric.value += count;
   }
 
-  public void logStatForMetric(MetricCount metric, String prefixStr) {
-    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key="
-      + getShortDescription());
+  public void logStatForMetric(MetricData metric, String prefixStr) {
+    LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
   }
 
-  synchronized public void logStat() {
+  public synchronized void logStat() {
     logStatForMetric(statMetric, "Stat");
   }
 
   public boolean logConfgs(Priority level) {
-    if (level.toInt() == Priority.INFO_INT && !logger.isInfoEnabled()) {
+    if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
       return false;
     }
-    if (level.toInt() == Priority.DEBUG_INT && !logger.isDebugEnabled()) {
+    if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
       return false;
     }
-    logger.log(level, "Printing configuration Block="
-      + getShortDescription());
-    logger.log(level, "configs=" + configs);
-    logger.log(level, "contextFields=" + contextFields);
+    LOG.log(level, "Printing configuration Block=" + getShortDescription());
+    LOG.log(level, "configs=" + configs);
+    LOG.log(level, "contextFields=" + contextFields);
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
new file mode 100644
index 0000000..d1e7fba
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.common;
+
+public class LogFeederConstants {
+
+  public static final String ALL = "all";
+  public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
+  public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN";
+  
+  // solr fields
+  public static final String SOLR_LEVEL = "level";
+  public static final String SOLR_COMPONENT = "type";
+  public static final String SOLR_HOST = "host";
+
+  // UserConfig Constants History
+  public static final String VALUES = "jsons";
+  public static final String ROW_TYPE = "rowtype";
+  
+  // S3 Constants
+  public static final String S3_PATH_START_WITH = "s3://";
+  public static final String S3_PATH_SEPARATOR = "/";
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index ab371f1..684f3c4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -29,21 +29,19 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.ambari.logfeeder.util.AliasUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
+import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Priority;
 
 public abstract class Filter extends ConfigBlock {
-  private static final Logger logger = Logger.getLogger(Filter.class);
+  private static final Logger LOG = Logger.getLogger(Filter.class);
 
   protected Input input;
   private Filter nextFilter = null;
-  private OutputMgr outputMgr;
+  private OutputManager outputManager;
 
   private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
 
@@ -74,15 +72,12 @@ public abstract class Filter extends ConfigBlock {
       }
       for (Map<String, Object> mapObject : mapList) {
         for (String mapClassCode : mapObject.keySet()) {
-          Mapper mapper = getMapper(mapClassCode);
+          Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER);
           if (mapper == null) {
             break;
           }
-          if (mapper.init(getInput().getShortDescription(),
-            fieldName, mapClassCode,
-            mapObject.get(mapClassCode))) {
-            List<Mapper> fieldMapList = postFieldValueMappers
-              .get(fieldName);
+          if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapObject.get(mapClassCode))) {
+            List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName);
             if (fieldMapList == null) {
               fieldMapList = new ArrayList<Mapper>();
               postFieldValueMappers.put(fieldName, fieldMapList);
@@ -94,17 +89,8 @@ public abstract class Filter extends ConfigBlock {
     }
   }
 
-  private Mapper getMapper(String mapClassCode) {
-    String classFullName = AliasUtil.getInstance().readAlias(mapClassCode, ALIAS_TYPE.MAPPER, ALIAS_PARAM.KLASS);
-    if (classFullName != null && !classFullName.isEmpty()) {
-      Mapper mapper = (Mapper) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.MAPPER);
-      return mapper;
-    }
-    return null;
-  }
-
-  public void setOutputMgr(OutputMgr outputMgr) {
-    this.outputMgr = outputMgr;
+  public void setOutputManager(OutputManager outputManager) {
+    this.outputManager = outputManager;
   }
 
   public Filter getNextFilter() {
@@ -131,25 +117,23 @@ public abstract class Filter extends ConfigBlock {
     if (nextFilter != null) {
       nextFilter.apply(inputStr, inputMarker);
     } else {
-      outputMgr.write(inputStr, inputMarker);
+      outputManager.write(inputStr, inputMarker);
     }
   }
 
   public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException {
-    if (postFieldValueMappers.size() > 0) {
-      for (String fieldName : postFieldValueMappers.keySet()) {
-        Object value = jsonObj.get(fieldName);
-        if (value != null) {
-          for (Mapper mapper : postFieldValueMappers.get(fieldName)) {
-            value = mapper.apply(jsonObj, value);
-          }
+    for (String fieldName : postFieldValueMappers.keySet()) {
+      Object value = jsonObj.get(fieldName);
+      if (value != null) {
+        for (Mapper mapper : postFieldValueMappers.get(fieldName)) {
+          value = mapper.apply(jsonObj, value);
         }
       }
     }
     if (nextFilter != null) {
       nextFilter.apply(jsonObj, inputMarker);
     } else {
-      outputMgr.write(jsonObj, inputMarker);
+      outputManager.write(jsonObj, inputMarker);
     }
   }
 
@@ -193,16 +177,15 @@ public abstract class Filter extends ConfigBlock {
     if (!super.logConfgs(level)) {
       return false;
     }
-    logger.log(level, "input=" + input.getShortDescription());
+    LOG.log(level, "input=" + input.getShortDescription());
     return true;
   }
 
   @Override
-  public void addMetricsContainers(List<MetricCount> metricsList) {
+  public void addMetricsContainers(List<MetricData> metricsList) {
     super.addMetricsContainers(metricsList);
     if (nextFilter != null) {
       nextFilter.addMetricsContainers(metricsList);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 372c208..7e2da70 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -36,7 +36,7 @@ import oi.thekraken.grok.api.exception.GrokException;
 
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
@@ -45,7 +45,7 @@ import org.apache.log4j.Logger;
 import com.google.gson.reflect.TypeToken;
 
 public class FilterGrok extends Filter {
-  static private Logger logger = Logger.getLogger(FilterGrok.class);
+  private static final Logger LOG = Logger.getLogger(FilterGrok.class);
 
   private static final String GROK_PATTERN_FILE = "grok-patterns";
 
@@ -68,25 +68,23 @@ public class FilterGrok extends Filter {
 
   private Type jsonType = new TypeToken<Map<String, String>>() {}.getType();
 
-  private MetricCount grokErrorMetric = new MetricCount();
+  private MetricData grokErrorMetric = new MetricData("filter.error.grok", false);
 
   @Override
   public void init() throws Exception {
     super.init();
 
     try {
-      grokErrorMetric.metricsName = "filter.error.grok";
       messagePattern = escapePattern(getStringValue("message_pattern"));
       multilinePattern = escapePattern(getStringValue("multiline_pattern"));
       sourceField = getStringValue("source_field");
       removeSourceField = getBooleanValue("remove_source_field",
         removeSourceField);
 
-      logger.info("init() done. grokPattern=" + messagePattern
-        + ", multilinePattern=" + multilinePattern + ", "
-        + getShortDescription());
+      LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
+      getShortDescription());
       if (StringUtils.isEmpty(messagePattern)) {
-        logger.error("message_pattern is not set for filter.");
+        LOG.error("message_pattern is not set for filter.");
         return;
       }
       extractNamedParams(messagePattern, namedParamList);
@@ -102,9 +100,7 @@ public class FilterGrok extends Filter {
         grokMultiline.compile(multilinePattern);
       }
     } catch (Throwable t) {
-      logger.fatal(
-        "Caught exception while initializing Grok. multilinePattern="
-          + multilinePattern + ", messagePattern="
+      LOG.fatal("Caught exception while initializing Grok. multilinePattern=" + multilinePattern + ", messagePattern="
           + messagePattern, t);
       grokMessage = null;
       grokMultiline = null;
@@ -123,9 +119,10 @@ public class FilterGrok extends Filter {
   }
 
   private void extractNamedParams(String patternStr, Set<String> paramList) {
-    String grokRegEx = "%\\{" + "(?<name>" + "(?<pattern>[A-z0-9]+)"
-      + "(?::(?<subname>[A-z0-9_:]+))?" + ")" + "(?:=(?<definition>"
-      + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" + "\\}";
+    String grokRegEx = "%\\{" +
+        "(?<name>" + "(?<pattern>[A-z0-9]+)" + "(?::(?<subname>[A-z0-9_:]+))?" + ")" +
+        "(?:=(?<definition>" + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" +
+        "\\}";
 
     Pattern pattern = Pattern.compile(grokRegEx);
     java.util.regex.Matcher matcher = pattern.matcher(patternStr);
@@ -139,28 +136,23 @@ public class FilterGrok extends Filter {
 
   private boolean loadPatterns(Grok grok) {
     InputStreamReader grokPatternsReader = null;
-    logger.info("Loading pattern file " + GROK_PATTERN_FILE);
+    LOG.info("Loading pattern file " + GROK_PATTERN_FILE);
     try {
-      BufferedInputStream fileInputStream = (BufferedInputStream) this
-        .getClass().getClassLoader()
-        .getResourceAsStream(GROK_PATTERN_FILE);
+      BufferedInputStream fileInputStream =
+          (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(GROK_PATTERN_FILE);
       if (fileInputStream == null) {
-        logger.fatal("Couldn't load grok-patterns file "
-          + GROK_PATTERN_FILE + ". Things will not work");
+        LOG.fatal("Couldn't load grok-patterns file " + GROK_PATTERN_FILE + ". Things will not work");
         return false;
       }
       grokPatternsReader = new InputStreamReader(fileInputStream);
     } catch (Throwable t) {
-      logger.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE
-        + " from classpath. Grok filtering will not work.", t);
+      LOG.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE + " from classpath. Grok filtering will not work.", t);
       return false;
     }
     try {
       grok.addPatternFromReader(grokPatternsReader);
     } catch (GrokException e) {
-      logger.fatal(
-        "Error loading patterns from grok-patterns reader for file "
-          + GROK_PATTERN_FILE, e);
+      LOG.fatal("Error loading patterns from grok-patterns reader for file " + GROK_PATTERN_FILE, e);
       return false;
     }
 
@@ -177,8 +169,7 @@ public class FilterGrok extends Filter {
       String jsonStr = grokMultiline.capture(inputStr);
       if (!"{}".equals(jsonStr)) {
         if (strBuff != null) {
-          Map<String, Object> jsonObj = Collections
-            .synchronizedMap(new HashMap<String, Object>());
+          Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
           try {
             applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
           } finally {
@@ -192,15 +183,13 @@ public class FilterGrok extends Filter {
       if (strBuff == null) {
         strBuff = new StringBuilder();
       } else {
-        strBuff.append('\r');
-        strBuff.append('\n');
+        strBuff.append("\r\n");
       }
       strBuff.append(inputStr);
       savedInputMarker = inputMarker;
     } else {
       savedInputMarker = inputMarker;
-      Map<String, Object> jsonObj = Collections
-        .synchronizedMap(new HashMap<String, Object>());
+      Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
       applyMessage(inputStr, jsonObj, null);
     }
   }
@@ -216,14 +205,8 @@ public class FilterGrok extends Filter {
     }
   }
 
-  /**
-   * @param inputStr
-   * @param jsonObj
-   * @throws LogfeederException 
-   */
-  private void applyMessage(String inputStr, Map<String, Object> jsonObj,
-                            String multilineJsonStr) throws LogfeederException {
-    String jsonStr = grokParse(inputStr);
+  private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogfeederException {
+    String jsonStr = grokMessage.capture(inputStr);
 
     boolean parseError = false;
     if ("{}".equals(jsonStr)) {
@@ -239,8 +222,7 @@ public class FilterGrok extends Filter {
     if (parseError) {
       jsonStr = multilineJsonStr;
     }
-    Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr,
-      jsonType);
+    Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr, jsonType);
     for (String namedParam : namedParamList) {
       if (jsonSrc.get(namedParam) != null) {
         jsonObj.put(namedParam, jsonSrc.get(namedParam));
@@ -260,37 +242,26 @@ public class FilterGrok extends Filter {
       }
     }
     super.apply(jsonObj, savedInputMarker);
-    statMetric.count++;
-  }
-
-  public String grokParse(String inputStr) {
-    String jsonStr = grokMessage.capture(inputStr);
-    return jsonStr;
+    statMetric.value++;
   }
 
   private void logParseError(String inputStr) {
-    grokErrorMetric.count++;
-    final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-      + "_PARSEERROR";
+    grokErrorMetric.value++;
+    String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
     int inputStrLength = inputStr != null ? inputStr.length() : 0;
-    LogFeederUtil.logErrorMessageByInterval(
-      LOG_MESSAGE_KEY,
-      "Error parsing string. length=" + inputStrLength
-        + ", input=" + input.getShortDescription()
-        + ". First upto 100 characters="
-        + LogFeederUtil.subString(inputStr, 100), null, logger,
-      Level.WARN);
+    LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStrLength + ", input=" +
+        input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
+        Level.WARN);
   }
 
   @Override
   public void flush() {
     if (strBuff != null) {
-      Map<String, Object> jsonObj = Collections
-        .synchronizedMap(new HashMap<String, Object>());
+      Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
       try {
         applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
       } catch (LogfeederException e) {
-        logger.error(e.getLocalizedMessage(), e.getCause());
+        LOG.error(e.getLocalizedMessage(), e.getCause());
       }
       strBuff = null;
       savedInputMarker = null;
@@ -304,7 +275,7 @@ public class FilterGrok extends Filter {
   }
 
   @Override
-  public void addMetricsContainers(List<MetricCount> metricsList) {
+  public void addMetricsContainers(List<MetricData> metricsList) {
     super.addMetricsContainers(metricsList);
     metricsList.add(grokErrorMetric);
   }
@@ -314,5 +285,4 @@ public class FilterGrok extends Filter {
     super.logStat();
     logStatForMetric(grokErrorMetric, "Stat: Grok Errors");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index 2954106..ba63c61 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -22,12 +22,13 @@ import java.util.Map;
 
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
 public class FilterJSON extends Filter {
   
-  private static final Logger logger  = Logger.getLogger(FilterJSON.class);
+  private static final Logger LOG  = Logger.getLogger(FilterJSON.class);
 
   @Override
   public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
@@ -35,7 +36,7 @@ public class FilterJSON extends Filter {
     try {
       jsonMap = LogFeederUtil.toJSONObject(inputStr);
     } catch (Exception e) {
-      logger.error(e.getLocalizedMessage());
+      LOG.error(e.getLocalizedMessage());
       throw new LogfeederException("Json parsing failed for inputstr = " + inputStr ,e.getCause());
     }
     Double lineNumberD = (Double) jsonMap.get("line_number");
@@ -45,10 +46,9 @@ public class FilterJSON extends Filter {
     }
     String timeStampStr = (String) jsonMap.get("logtime");
     if (timeStampStr != null && !timeStampStr.isEmpty()) {
-      String logtime = LogFeederUtil.getDate(timeStampStr);
+      String logtime = DateUtil.getDate(timeStampStr);
       jsonMap.put("logtime", logtime);
     }
     super.apply(jsonMap, inputMarker);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index 7adb468..c9c3f2c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -25,38 +25,35 @@ import java.util.StringTokenizer;
 
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 public class FilterKeyValue extends Filter {
-  private static final Logger logger = Logger.getLogger(FilterKeyValue.class);
+  private static final Logger LOG = Logger.getLogger(FilterKeyValue.class);
 
   private String sourceField = null;
   private String valueSplit = "=";
   private String fieldSplit = "\t";
 
-  private MetricCount errorMetric = new MetricCount();
+  private MetricData errorMetric = new MetricData("filter.error.keyvalue", false);
 
   @Override
   public void init() throws Exception {
     super.init();
-    errorMetric.metricsName = "filter.error.keyvalue";
 
     sourceField = getStringValue("source_field");
     valueSplit = getStringValue("value_split", valueSplit);
     fieldSplit = getStringValue("field_split", fieldSplit);
 
-    logger.info("init() done. source_field=" + sourceField
-      + ", value_split=" + valueSplit + ", " + ", field_split="
-      + fieldSplit + ", " + getShortDescription());
+    LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" +
+        fieldSplit + ", " + getShortDescription());
     if (StringUtils.isEmpty(sourceField)) {
-      logger.fatal("source_field is not set for filter. This filter will not be applied");
+      LOG.fatal("source_field is not set for filter. This filter will not be applied");
       return;
     }
-
   }
 
   @Override
@@ -71,40 +68,30 @@ public class FilterKeyValue extends Filter {
     }
     Object valueObj = jsonObj.get(sourceField);
     if (valueObj != null) {
-      StringTokenizer fieldTokenizer = new StringTokenizer(
-        valueObj.toString(), fieldSplit);
+      StringTokenizer fieldTokenizer = new StringTokenizer(valueObj.toString(), fieldSplit);
       while (fieldTokenizer.hasMoreTokens()) {
         String nv = fieldTokenizer.nextToken();
-        StringTokenizer nvTokenizer = new StringTokenizer(nv,
-          valueSplit);
+        StringTokenizer nvTokenizer = new StringTokenizer(nv, valueSplit);
         while (nvTokenizer.hasMoreTokens()) {
           String name = nvTokenizer.nextToken();
           if (nvTokenizer.hasMoreTokens()) {
             String value = nvTokenizer.nextToken();
             jsonObj.put(name, value);
           } else {
-            logParseError("name=" + name + ", pair=" + nv
-              + ", field=" + sourceField + ", field_value="
-              + valueObj);
+            logParseError("name=" + name + ", pair=" + nv + ", field=" + sourceField + ", field_value=" + valueObj);
           }
         }
       }
     }
     super.apply(jsonObj, inputMarker);
-    statMetric.count++;
+    statMetric.value++;
   }
 
   private void logParseError(String inputStr) {
-    errorMetric.count++;
-    final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-      + "_PARSEERROR";
-    LogFeederUtil
-      .logErrorMessageByInterval(
-        LOG_MESSAGE_KEY,
-        "Error parsing string. length=" + inputStr.length()
-          + ", input=" + input.getShortDescription()
-          + ". First upto 100 characters="
-          + LogFeederUtil.subString(inputStr, 100), null, logger,
+    errorMetric.value++;
+    String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
+    LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStr.length() + ", input=" +
+        input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
         Level.ERROR);
   }
 
@@ -114,9 +101,8 @@ public class FilterKeyValue extends Filter {
   }
 
   @Override
-  public void addMetricsContainers(List<MetricCount> metricsList) {
+  public void addMetricsContainers(List<MetricData> metricsList) {
     super.addMetricsContainers(metricsList);
     metricsList.add(errorMetric);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
new file mode 100644
index 0000000..41a1fa5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public abstract class AbstractInputFile extends Input {
+  protected static final Logger LOG = Logger.getLogger(AbstractInputFile.class);
+
+  private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
+
+  protected File[] logFiles;
+  protected String logPath;
+  protected Object fileKey;
+  protected String base64FileKey;
+
+  protected boolean isReady;
+  private boolean isStartFromBegining = true;
+
+  private String checkPointExtension;
+  private File checkPointFile;
+  private RandomAccessFile checkPointWriter;
+  private long lastCheckPointTimeMS;
+  private int checkPointIntervalMS;
+  private Map<String, Object> jsonCheckPoint;
+  private InputMarker lastCheckPointInputMarker;
+
+  @Override
+  protected String getStatMetricName() {
+    return "input.files.read_lines";
+  }
+  
+  @Override
+  protected String getReadBytesMetricName() {
+    return "input.files.read_bytes";
+  }
+  
+  @Override
+  public void init() throws Exception {
+    LOG.info("init() called");
+    
+    checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", InputManager.DEFAULT_CHECKPOINT_EXTENSION);
+
+    // Let's close the file and set it to true after we start monitoring it
+    setClosed(true);
+    logPath = getStringValue("path");
+    tail = getBooleanValue("tail", tail);
+    checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS);
+
+    if (StringUtils.isEmpty(logPath)) {
+      LOG.error("path is empty for file input. " + getShortDescription());
+      return;
+    }
+
+    String startPosition = getStringValue("start_position");
+    if (StringUtils.isEmpty(startPosition) || startPosition.equalsIgnoreCase("beginning") ||
+        startPosition.equalsIgnoreCase("begining") || !tail) {
+      isStartFromBegining = true;
+    }
+
+    setFilePath(logPath);
+    boolean isFileReady = isReady();
+
+    LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
+
+    super.init();
+  }
+
+  protected void processFile(File logPathFile) throws FileNotFoundException, IOException {
+    LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile);
+    BufferedReader br = null;
+    checkPointFile = null;
+    checkPointWriter = null;
+    jsonCheckPoint = null;
+
+    int lineCount = 0;
+    try {
+      setFilePath(logPathFile.getAbsolutePath());
+      
+      br = openLogFile(logPathFile);
+
+      boolean resume = isStartFromBegining;
+      int resumeFromLineNumber = getResumeFromLineNumber();
+      if (resumeFromLineNumber > 0) {
+        resume = false;
+      }
+      
+      setClosed(false);
+      int sleepStep = 2;
+      int sleepIteration = 0;
+      while (true) {
+        try {
+          if (isDrain()) {
+            break;
+          }
+
+          String line = br.readLine();
+          if (line == null) {
+            if (!resume) {
+              resume = true;
+            }
+            sleepIteration++;
+            if (sleepIteration == 2) {
+              flush();
+              if (!tail) {
+                LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
+                break;
+              }
+            } else if (sleepIteration > 4) {
+              Object newFileKey = getFileKey(logPathFile);
+              if (newFileKey != null && (fileKey == null || !newFileKey.equals(fileKey))) {
+                LOG.info("File key is different. Marking this input file for rollover. oldKey=" + fileKey + ", newKey=" +
+                    newFileKey + ". " + getShortDescription());
+                
+                try {
+                  LOG.info("File is rolled over. Closing current open file." + getShortDescription() + ", lineCount=" +
+                      lineCount);
+                  br.close();
+                } catch (Exception ex) {
+                  LOG.error("Error closing file" + getShortDescription(), ex);
+                  break;
+                }
+                
+                try {
+                  LOG.info("Opening new rolled over file." + getShortDescription());
+                  br = openLogFile(logPathFile);
+                  lineCount = 0;
+                } catch (Exception ex) {
+                  LOG.error("Error opening rolled over file. " + getShortDescription(), ex);
+                  LOG.info("Added input to not ready list." + getShortDescription());
+                  isReady = false;
+                  inputManager.addToNotReady(this);
+                  break;
+                }
+                LOG.info("File is successfully rolled over. " + getShortDescription());
+                continue;
+              }
+            }
+            try {
+              Thread.sleep(sleepStep * 1000);
+              sleepStep = Math.min(sleepStep * 2, 10);
+            } catch (InterruptedException e) {
+              LOG.info("Thread interrupted." + getShortDescription());
+            }
+          } else {
+            lineCount++;
+            sleepStep = 1;
+            sleepIteration = 0;
+
+            if (!resume && lineCount > resumeFromLineNumber) {
+              LOG.info("Resuming to read from last line. lineCount=" + lineCount + ", input=" + getShortDescription());
+              resume = true;
+            }
+            if (resume) {
+              InputMarker marker = new InputMarker(this, base64FileKey, lineCount);
+              outputLine(line, marker);
+            }
+          }
+        } catch (Throwable t) {
+          String logMessageKey = this.getClass().getSimpleName() + "_READ_LOOP_EXCEPTION";
+          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in read loop. lineNumber=" + lineCount +
+              ", input=" + getShortDescription(), t, LOG, Level.ERROR);
+        }
+      }
+    } finally {
+      if (br != null) {
+        LOG.info("Closing reader." + getShortDescription() + ", lineCount=" + lineCount);
+        try {
+          br.close();
+        } catch (Throwable t) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  protected abstract BufferedReader openLogFile(File logFile) throws IOException;
+
+  protected abstract Object getFileKey(File logFile);
+  
+  private int getResumeFromLineNumber() {
+    int resumeFromLineNumber = 0;
+    
+    if (tail) {
+      try {
+        LOG.info("Checking existing checkpoint file. " + getShortDescription());
+
+        String checkPointFileName = base64FileKey + checkPointExtension;
+        File checkPointFolder = inputManager.getCheckPointFolderFile();
+        checkPointFile = new File(checkPointFolder, checkPointFileName);
+        checkPointWriter = new RandomAccessFile(checkPointFile, "rw");
+
+        try {
+          int contentSize = checkPointWriter.readInt();
+          byte b[] = new byte[contentSize];
+          int readSize = checkPointWriter.read(b, 0, contentSize);
+          if (readSize != contentSize) {
+            LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
+                readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
+          } else {
+            String jsonCheckPointStr = new String(b, 0, readSize);
+            jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+            resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+
+            LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
+                ", resumeFromLineNumber=" + resumeFromLineNumber);
+          }
+        } catch (EOFException eofEx) {
+          LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
+              getShortDescription());
+        }
+        if (jsonCheckPoint == null) {
+          // This seems to be first time, so creating the initial checkPoint object
+          jsonCheckPoint = new HashMap<String, Object>();
+          jsonCheckPoint.put("file_path", filePath);
+          jsonCheckPoint.put("file_key", base64FileKey);
+        }
+
+      } catch (Throwable t) {
+        LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
+      }
+    }
+    
+    return resumeFromLineNumber;
+  }
+
+  @Override
+  public synchronized void checkIn(InputMarker inputMarker) {
+    if (checkPointWriter != null) {
+      try {
+        int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+        if (lineNumber > inputMarker.lineNumber) {
+          // Already wrote higher line number for this input
+          return;
+        }
+        // If interval is greater than last checkPoint time, then write
+        long currMS = System.currentTimeMillis();
+        if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+          // Let's save this one so we can update the check point file on flush
+          lastCheckPointInputMarker = inputMarker;
+          return;
+        }
+        lastCheckPointTimeMS = currMS;
+
+        jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
+        jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+        jsonCheckPoint.put("last_write_time_date", new Date());
+
+        String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+        // Let's rewind
+        checkPointWriter.seek(0);
+        checkPointWriter.writeInt(jsonStr.length());
+        checkPointWriter.write(jsonStr.getBytes());
+
+        if (isClosed()) {
+          String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN";
+          LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() +
+              ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
+        }
+      } catch (Throwable t) {
+        String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
+        LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t,
+            LOG, Level.ERROR);
+      }
+    }
+  }
+
+  @Override
+  public void lastCheckIn() {
+    if (lastCheckPointInputMarker != null) {
+      checkIn(lastCheckPointInputMarker);
+    }
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    LOG.info("close() calling checkPoint checkIn(). " + getShortDescription());
+    lastCheckIn();
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "input:source=" + getStringValue("source") + ", path=" +
+        (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
+  }
+}