You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/07 23:38:39 UTC

[49/50] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 5feb9c4..e13d9bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.input;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,88 +28,138 @@ import java.util.Map;
 import org.apache.ambari.logfeeder.common.ConfigBlock;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
 import org.apache.log4j.Logger;
 
 public abstract class Input extends ConfigBlock implements Runnable {
-  static private Logger logger = Logger.getLogger(Input.class);
-
-  protected OutputMgr outputMgr;
-  protected InputMgr inputMgr;
+  private static final Logger LOG = Logger.getLogger(Input.class);
 
+  private static final boolean DEFAULT_TAIL = true;
+  private static final boolean DEFAULT_USE_EVENT_MD5 = false;
+  private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
+  
+  protected InputManager inputManager;
+  protected OutputManager outputManager;
   private List<Output> outputList = new ArrayList<Output>();
 
-  private Filter firstFilter = null;
   private Thread thread;
-  private boolean isClosed = false;
-  protected String filePath = null;
-  private String type = null;
+  private String type;
+  protected String filePath;
+  private Filter firstFilter;
+  private boolean isClosed;
 
-  protected boolean tail = true;
-  private boolean useEventMD5 = false;
-  private boolean genEventMD5 = true;
+  protected boolean tail;
+  private boolean useEventMD5;
+  private boolean genEventMD5;
 
-  protected MetricCount readBytesMetric = new MetricCount();
+  protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
+  protected String getReadBytesMetricName() {
+    return null;
+  }
+  
+  @Override
+  public void loadConfig(Map<String, Object> map) {
+    super.loadConfig(map);
+    String typeValue = getStringValue("type");
+    if (typeValue != null) {
+      // Explicitly add type and value to field list
+      contextFields.put("type", typeValue);
+      @SuppressWarnings("unchecked")
+      Map<String, Object> addFields = (Map<String, Object>) map.get("add_fields");
+      if (addFields == null) {
+        addFields = new HashMap<String, Object>();
+        map.put("add_fields", addFields);
+      }
+      addFields.put("type", typeValue);
+    }
+  }
 
-  /**
-   * This method will be called from the thread spawned for the output. This
-   * method should only exit after all data are read from the source or the
-   * process is exiting
-   */
-  abstract void start() throws Exception;
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public void setInputManager(InputManager inputManager) {
+    this.inputManager = inputManager;
+  }
+
+  public void setOutputManager(OutputManager outputManager) {
+    this.outputManager = outputManager;
+  }
+
+  public void addFilter(Filter filter) {
+    if (firstFilter == null) {
+      firstFilter = filter;
+    } else {
+      Filter f = firstFilter;
+      while (f.getNextFilter() != null) {
+        f = f.getNextFilter();
+      }
+      f.setNextFilter(filter);
+    }
+  }
+
+  public void addOutput(Output output) {
+    outputList.add(output);
+  }
 
   @Override
   public void init() throws Exception {
     super.init();
-    tail = getBooleanValue("tail", tail);
-    useEventMD5 = getBooleanValue("use_event_md5_as_id", useEventMD5);
-    genEventMD5 = getBooleanValue("gen_event_md5", genEventMD5);
+    tail = getBooleanValue("tail", DEFAULT_TAIL);
+    useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
+    genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
 
     if (firstFilter != null) {
       firstFilter.init();
     }
   }
 
-  @Override
-  public String getNameForThread() {
-    if (filePath != null) {
-      try {
-        return (type + "=" + (new File(filePath)).getName());
-      } catch (Throwable ex) {
-        logger.warn("Couldn't get basename for filePath=" + filePath,
-          ex);
-      }
+  boolean monitor() {
+    if (isReady()) {
+      LOG.info("Starting thread. " + getShortDescription());
+      thread = new Thread(this, getNameForThread());
+      thread.start();
+      return true;
+    } else {
+      return false;
     }
-    return super.getNameForThread() + ":" + type;
   }
 
+  public abstract boolean isReady();
+
   @Override
   public void run() {
     try {
-      logger.info("Started to monitor. " + getShortDescription());
+      LOG.info("Started to monitor. " + getShortDescription());
       start();
     } catch (Exception e) {
-      logger.error("Error writing to output.", e);
+      LOG.error("Error writing to output.", e);
     }
-    logger.info("Exiting thread. " + getShortDescription());
+    LOG.info("Exiting thread. " + getShortDescription());
   }
 
+  /**
+   * This method will be called from the thread spawned for the output. This
+   * method should only exit after all data are read from the source or the
+   * process is exiting
+   */
+  abstract void start() throws Exception;
+
   protected void outputLine(String line, InputMarker marker) {
-    statMetric.count++;
-    readBytesMetric.count += (line.length());
+    statMetric.value++;
+    readBytesMetric.value += (line.length());
 
     if (firstFilter != null) {
       try {
         firstFilter.apply(line, marker);
       } catch (LogfeederException e) {
-        logger.error(e.getLocalizedMessage(),e);
+        LOG.error(e.getLocalizedMessage(), e);
       }
     } else {
-      // TODO: For now, let's make filter mandatory, so that no one
-      // accidently forgets to write filter
-      // outputMgr.write(line, this);
+      // TODO: For now, let's make filter mandatory, so that no one accidently forgets to write filter
+      // outputManager.write(line, this);
     }
   }
 
@@ -120,60 +169,10 @@ public abstract class Input extends ConfigBlock implements Runnable {
     }
   }
 
-  public boolean monitor() {
-    if (isReady()) {
-      logger.info("Starting thread. " + getShortDescription());
-      thread = new Thread(this, getNameForThread());
-      thread.start();
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  public void checkIn(InputMarker inputMarker) {
-    // Default implementation is to ignore.
-  }
-
-  /**
-   * This is generally used by final checkin
-   */
-  public void checkIn() {
-  }
-
-  public boolean isReady() {
-    return true;
-  }
-
-  public boolean isTail() {
-    return tail;
-  }
-
-  public void setTail(boolean tail) {
-    this.tail = tail;
-  }
-
-  public boolean isUseEventMD5() {
-    return useEventMD5;
-  }
-
-  public void setUseEventMD5(boolean useEventMD5) {
-    this.useEventMD5 = useEventMD5;
-  }
-
-  public boolean isGenEventMD5() {
-    return genEventMD5;
-  }
-
-  public void setGenEventMD5(boolean genEventMD5) {
-    this.genEventMD5 = genEventMD5;
-  }
-
   @Override
   public void setDrain(boolean drain) {
-    logger.info("Request to drain. " + getShortDescription());
+    LOG.info("Request to drain. " + getShortDescription());
     super.setDrain(drain);
-    ;
     try {
       thread.interrupt();
     } catch (Throwable t) {
@@ -181,38 +180,36 @@ public abstract class Input extends ConfigBlock implements Runnable {
     }
   }
 
-  public Filter getFirstFilter() {
-    return firstFilter;
-  }
-
-  public void setFirstFilter(Filter filter) {
-    firstFilter = filter;
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    super.addMetricsContainers(metricsList);
+    if (firstFilter != null) {
+      firstFilter.addMetricsContainers(metricsList);
+    }
+    metricsList.add(readBytesMetric);
   }
 
-  public void setInputMgr(InputMgr inputMgr) {
-    this.inputMgr = inputMgr;
-  }
+  @Override
+  public void logStat() {
+    super.logStat();
+    logStatForMetric(readBytesMetric, "Stat: Bytes Read");
 
-  public void setOutputMgr(OutputMgr outputMgr) {
-    this.outputMgr = outputMgr;
+    if (firstFilter != null) {
+      firstFilter.logStat();
+    }
   }
 
-  public String getFilePath() {
-    return filePath;
-  }
+  public abstract void checkIn(InputMarker inputMarker);
 
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
-  }
+  public abstract void lastCheckIn();
 
   public void close() {
-    logger.info("Close called. " + getShortDescription());
+    LOG.info("Close called. " + getShortDescription());
 
     try {
       if (firstFilter != null) {
         firstFilter.close();
       } else {
-        outputMgr.close();
+        outputManager.close();
       }
     } catch (Throwable t) {
       // Ignore
@@ -220,86 +217,60 @@ public abstract class Input extends ConfigBlock implements Runnable {
     isClosed = true;
   }
 
-  public void setClosed(boolean isClosed) {
-    this.isClosed = isClosed;
-  }
-
-  public boolean isClosed() {
-    return isClosed;
-  }
-
-  @Override
-  public void loadConfig(Map<String, Object> map) {
-    super.loadConfig(map);
-    String typeValue = getStringValue("type");
-    if (typeValue != null) {
-      // Explicitly add type and value to field list
-      contextFields.put("type", typeValue);
-      @SuppressWarnings("unchecked")
-      Map<String, Object> addFields = (Map<String, Object>) map
-        .get("add_fields");
-      if (addFields == null) {
-        addFields = new HashMap<String, Object>();
-        map.put("add_fields", addFields);
-      }
-      addFields.put("type", typeValue);
-    }
+  public boolean isTail() {
+    return tail;
   }
 
-  @Override
-  public String getShortDescription() {
-    return null;
+  public boolean isUseEventMD5() {
+    return useEventMD5;
   }
 
-  @Override
-  public void logStat() {
-    super.logStat();
-    logStatForMetric(readBytesMetric, "Stat: Bytes Read");
-
-    if (firstFilter != null) {
-      firstFilter.logStat();
-    }
+  public boolean isGenEventMD5() {
+    return genEventMD5;
   }
 
-  @Override
-  public String toString() {
-    return getShortDescription();
+  public Filter getFirstFilter() {
+    return firstFilter;
   }
 
-  public void rollOver() {
-    // Only some inputs support it. E.g. InputFile
+  public String getFilePath() {
+    return filePath;
   }
 
-  public String getType() {
-    return type;
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
   }
 
-  public void setType(String type) {
-    this.type = type;
+  public void setClosed(boolean isClosed) {
+    this.isClosed = isClosed;
   }
 
-  public Date getEventTime() {
-    return null;
+  public boolean isClosed() {
+    return isClosed;
   }
 
   public List<Output> getOutputList() {
     return outputList;
   }
-
-  public void addOutput(Output output) {
-    outputList.add(output);
-  }
-
-  public void addMetricsContainers(List<MetricCount> metricsList) {
-    super.addMetricsContainers(metricsList);
-    if (firstFilter != null) {
-      firstFilter.addMetricsContainers(metricsList);
-    }
-    metricsList.add(readBytesMetric);
-  }
   
   public Thread getThread(){
     return thread;
   }
 
+  @Override
+  public String getNameForThread() {
+    if (filePath != null) {
+      try {
+        return (type + "=" + (new File(filePath)).getName());
+      } catch (Throwable ex) {
+        LOG.warn("Couldn't get basename for filePath=" + filePath, ex);
+      }
+    }
+    return super.getNameForThread() + ":" + type;
+  }
+
+  @Override
+  public String toString() {
+    return getShortDescription();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index c9f5ded..3737839 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -19,528 +19,99 @@
 package org.apache.ambari.logfeeder.input;
 
 import java.io.BufferedReader;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.solr.common.util.Base64;
 
-public class InputFile extends Input {
-  private static final Logger logger = Logger.getLogger(InputFile.class);
-
-  private String logPath = null;
-  private boolean isStartFromBegining = true;
-
-  private boolean isReady = false;
-  private File[] logPathFiles = null;
-  private Object fileKey = null;
-  private String base64FileKey = null;
-
-  private boolean isRolledOver = false;
-  private boolean addWildCard = false;
-
-  private long lastCheckPointTimeMS = 0;
-  private int checkPointIntervalMS = 5 * 1000; // 5 seconds
-  private RandomAccessFile checkPointWriter = null;
-  private Map<String, Object> jsonCheckPoint = null;
-
-  private File checkPointFile = null;
-
-  private InputMarker lastCheckPointInputMarker = null;
-
-  private String checkPointExtension = ".cp";
-
-  @Override
-  public void init() throws Exception {
-    logger.info("init() called");
-    statMetric.metricsName = "input.files.read_lines";
-    readBytesMetric.metricsName = "input.files.read_bytes";
-    checkPointExtension = LogFeederUtil.getStringProperty(
-      "logfeeder.checkpoint.extension", checkPointExtension);
-
-    // Let's close the file and set it to true after we start monitoring it
-    setClosed(true);
-    logPath = getStringValue("path");
-    tail = getBooleanValue("tail", tail);
-    addWildCard = getBooleanValue("add_wild_card", addWildCard);
-    checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
-      checkPointIntervalMS);
-
-    if (logPath == null || logPath.isEmpty()) {
-      logger.error("path is empty for file input. "
-        + getShortDescription());
-      return;
-    }
-
-    String startPosition = getStringValue("start_position");
-    if (StringUtils.isEmpty(startPosition)
-      || startPosition.equalsIgnoreCase("beginning")
-      || startPosition.equalsIgnoreCase("begining")) {
-      isStartFromBegining = true;
-    }
-
-    if (!tail) {
-      // start position end doesn't apply if we are not tailing
-      isStartFromBegining = true;
-    }
-
-    setFilePath(logPath);
-    boolean isFileReady = isReady();
-
-    logger.info("File to monitor " + logPath + ", tail=" + tail
-      + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
-
-    super.init();
-  }
+public class InputFile extends AbstractInputFile {
 
   @Override
   public boolean isReady() {
     if (!isReady) {
       // Let's try to check whether the file is available
-      logPathFiles = getActualFiles(logPath);
-      if (logPathFiles != null && logPathFiles.length > 0
-        && logPathFiles[0].isFile()) {
-
-        if (isTail() && logPathFiles.length > 1) {
-          logger.warn("Found multiple files (" + logPathFiles.length
-            + ") for the file filter " + filePath
-            + ". Will use only the first one. Using "
-            + logPathFiles[0].getAbsolutePath());
+      logFiles = getActualFiles(logPath);
+      if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
+        if (tail && logFiles.length > 1) {
+          LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
+              ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
         }
-        logger.info("File filter " + filePath + " expanded to "
-          + logPathFiles[0].getAbsolutePath());
+        LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
         isReady = true;
       } else {
-        logger.debug(logPath + " file doesn't exist. Ignoring for now");
+        LOG.debug(logPath + " file doesn't exist. Ignoring for now");
       }
     }
     return isReady;
   }
 
   private File[] getActualFiles(String searchPath) {
-    if (addWildCard) {
-      if (!searchPath.endsWith("*")) {
-        searchPath = searchPath + "*";
-      }
-    }
-    File checkFile = new File(searchPath);
-    if (checkFile.isFile()) {
-      return new File[]{checkFile};
+    File searchFile = new File(searchPath);
+    if (searchFile.isFile()) {
+      return new File[]{searchFile};
+    } else {
+      FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
+      return searchFile.getParentFile().listFiles(fileFilter);
     }
-    // Let's do wild card search
-    // First check current folder
-    File checkFiles[] = findFileForWildCard(searchPath, new File("."));
-    if (checkFiles == null || checkFiles.length == 0) {
-      // Let's check from the parent folder
-      File parentDir = (new File(searchPath)).getParentFile();
-      if (parentDir != null) {
-        String wildCard = (new File(searchPath)).getName();
-        checkFiles = findFileForWildCard(wildCard, parentDir);
-      }
-    }
-    return checkFiles;
-  }
-
-  private File[] findFileForWildCard(String searchPath, File dir) {
-    logger.debug("findFileForWildCard(). filePath=" + searchPath + ", dir="
-      + dir + ", dir.fullpath=" + dir.getAbsolutePath());
-    FileFilter fileFilter = new WildcardFileFilter(searchPath);
-    return dir.listFiles(fileFilter);
-  }
-
-  @Override
-  synchronized public void checkIn(InputMarker inputMarker) {
-    super.checkIn(inputMarker);
-    if (checkPointWriter != null) {
-      try {
-        int lineNumber = LogFeederUtil.objectToInt(
-          jsonCheckPoint.get("line_number"), 0, "line_number");
-        if (lineNumber > inputMarker.lineNumber) {
-          // Already wrote higher line number for this input
-          return;
-        }
-        // If interval is greater than last checkPoint time, then write
-        long currMS = System.currentTimeMillis();
-        if (!isClosed()
-          && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
-          // Let's save this one so we can update the check point file
-          // on flush
-          lastCheckPointInputMarker = inputMarker;
-          return;
-        }
-        lastCheckPointTimeMS = currMS;
-
-        jsonCheckPoint.put("line_number", ""
-          + new Integer(inputMarker.lineNumber));
-        jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
-        jsonCheckPoint.put("last_write_time_date", new Date());
-
-        String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
-
-        // Let's rewind
-        checkPointWriter.seek(0);
-        checkPointWriter.writeInt(jsonStr.length());
-        checkPointWriter.write(jsonStr.getBytes());
-
-        if (isClosed()) {
-          final String LOG_MESSAGE_KEY = this.getClass()
-            .getSimpleName() + "_FINAL_CHECKIN";
-          LogFeederUtil.logErrorMessageByInterval(
-            LOG_MESSAGE_KEY,
-            "Wrote final checkPoint, input="
-              + getShortDescription()
-              + ", checkPointFile="
-              + checkPointFile.getAbsolutePath()
-              + ", checkPoint=" + jsonStr, null, logger,
-            Level.INFO);
-        }
-      } catch (Throwable t) {
-        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-          + "_CHECKIN_EXCEPTION";
-        LogFeederUtil
-          .logErrorMessageByInterval(LOG_MESSAGE_KEY,
-            "Caught exception checkIn. , input="
-              + getShortDescription(), t, logger,
-            Level.ERROR);
-      }
-    }
-
-  }
-
-  @Override
-  public void checkIn() {
-    super.checkIn();
-    if (lastCheckPointInputMarker != null) {
-      checkIn(lastCheckPointInputMarker);
-    }
-  }
-
-  @Override
-  public void rollOver() {
-    logger.info("Marking this input file for rollover. "
-      + getShortDescription());
-    isRolledOver = true;
   }
 
   @Override
   void start() throws Exception {
-
-    if (logPathFiles == null || logPathFiles.length == 0) {
-      return;
-    }
     boolean isProcessFile = getBooleanValue("process_file", true);
     if (isProcessFile) {
-      if (isTail()) {
-        processFile(logPathFiles[0]);
+      if (tail) {
+        processFile(logFiles[0]);
       } else {
-        for (File file : logPathFiles) {
+        for (File file : logFiles) {
           try {
             processFile(file);
             if (isClosed() || isDrain()) {
-              logger.info("isClosed or isDrain. Now breaking loop.");
+              LOG.info("isClosed or isDrain. Now breaking loop.");
               break;
             }
           } catch (Throwable t) {
-            logger.error("Error processing file=" + file.getAbsolutePath(), t);
+            LOG.error("Error processing file=" + file.getAbsolutePath(), t);
           }
         }
       }
       close();
-    }else{
-      copyFiles(logPathFiles);
+    } else {
+      copyFiles(logFiles);
     }
-    
   }
 
   @Override
-  public void close() {
-    super.close();
-    logger.info("close() calling checkPoint checkIn(). "
-      + getShortDescription());
-    checkIn();
-  }
-
-  private void processFile(File logPathFile) throws FileNotFoundException,
-    IOException {
-    logger.info("Monitoring logPath=" + logPath + ", logPathFile="
-      + logPathFile);
-    BufferedReader br = null;
-    checkPointFile = null;
-    checkPointWriter = null;
-    jsonCheckPoint = null;
-    int resumeFromLineNumber = 0;
-
-    int lineCount = 0;
-    try {
-      setFilePath(logPathFile.getAbsolutePath());
-      br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logPathFile));
-
-      // Whether to send to output from the beginning.
-      boolean resume = isStartFromBegining;
-
-      // Seems FileWatch is not reliable, so let's only use file key comparison
-      fileKey = getFileKey(logPathFile);
-      base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
-        .getBytes());
-      logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey
-        + ". " + getShortDescription());
-
-      if (isTail()) {
-        try {
-          logger.info("Checking existing checkpoint file. "
-            + getShortDescription());
-
-          String fileBase64 = Base64.byteArrayToBase64(fileKey
-            .toString().getBytes());
-          String checkPointFileName = fileBase64
-            + checkPointExtension;
-          File checkPointFolder = inputMgr.getCheckPointFolderFile();
-          checkPointFile = new File(checkPointFolder,
-            checkPointFileName);
-          checkPointWriter = new RandomAccessFile(checkPointFile,
-            "rw");
-
-          try {
-            int contentSize = checkPointWriter.readInt();
-            byte b[] = new byte[contentSize];
-            int readSize = checkPointWriter.read(b, 0, contentSize);
-            if (readSize != contentSize) {
-              logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
-                + contentSize
-                + ", read="
-                + readSize
-                + ", checkPointFile="
-                + checkPointFile
-                + ", input=" + getShortDescription());
-            } else {
-              String jsonCheckPointStr = new String(b, 0, readSize);
-              jsonCheckPoint = LogFeederUtil
-                .toJSONObject(jsonCheckPointStr);
-
-              resumeFromLineNumber = LogFeederUtil.objectToInt(
-                jsonCheckPoint.get("line_number"), 0,
-                "line_number");
-
-              if (resumeFromLineNumber > 0) {
-                // Let's read from last line read
-                resume = false;
-              }
-              logger.info("CheckPoint. checkPointFile="
-                + checkPointFile + ", json="
-                + jsonCheckPointStr
-                + ", resumeFromLineNumber="
-                + resumeFromLineNumber + ", resume="
-                + resume);
-            }
-          } catch (EOFException eofEx) {
-            logger.info("EOFException. Will reset checkpoint file "
-              + checkPointFile.getAbsolutePath() + " for "
-              + getShortDescription());
-          }
-          if (jsonCheckPoint == null) {
-            // This seems to be first time, so creating the initial
-            // checkPoint object
-            jsonCheckPoint = new HashMap<String, Object>();
-            jsonCheckPoint.put("file_path", filePath);
-            jsonCheckPoint.put("file_key", fileBase64);
-          }
-
-        } catch (Throwable t) {
-          logger.error(
-            "Error while configuring checkpoint file. Will reset file. checkPointFile="
-              + checkPointFile, t);
-        }
-      }
-
-      setClosed(false);
-      int sleepStep = 2;
-      int sleepIteration = 0;
-      while (true) {
-        try {
-          if (isDrain()) {
-            break;
-          }
-
-          String line = br.readLine();
-          if (line == null) {
-            if (!resume) {
-              resume = true;
-            }
-            sleepIteration++;
-            try {
-              // Since FileWatch service is not reliable, we will check
-              // file inode every n seconds after no write
-              if (sleepIteration > 4) {
-                Object newFileKey = getFileKey(logPathFile);
-                if (newFileKey != null) {
-                  if (fileKey == null
-                    || !newFileKey.equals(fileKey)) {
-                    logger.info("File key is different. Calling rollover. oldKey="
-                      + fileKey
-                      + ", newKey="
-                      + newFileKey
-                      + ". "
-                      + getShortDescription());
-                    // File has rotated.
-                    rollOver();
-                  }
-                }
-              }
-              // Flush on the second iteration
-              if (!tail && sleepIteration >= 2) {
-                logger.info("End of file. Done with filePath="
-                  + logPathFile.getAbsolutePath()
-                  + ", lineCount=" + lineCount);
-                flush();
-                break;
-              } else if (sleepIteration == 2) {
-                flush();
-              } else if (sleepIteration >= 2) {
-                if (isRolledOver) {
-                  isRolledOver = false;
-                  // Close existing file
-                  try {
-                    logger.info("File is rolled over. Closing current open file."
-                      + getShortDescription()
-                      + ", lineCount=" + lineCount);
-                    br.close();
-                  } catch (Exception ex) {
-                    logger.error("Error closing file"
-                      + getShortDescription());
-                    break;
-                  }
-                  try {
-                    logger.info("Opening new rolled over file."
-                      + getShortDescription());
-                    br = new BufferedReader(LogsearchReaderFactory.
-                      INSTANCE.getReader(logPathFile));
-                    lineCount = 0;
-                    fileKey = getFileKey(logPathFile);
-                    base64FileKey = Base64
-                      .byteArrayToBase64(fileKey
-                        .toString().getBytes());
-                    logger.info("fileKey=" + fileKey
-                      + ", base64=" + base64FileKey
-                      + ", " + getShortDescription());
-                  } catch (Exception ex) {
-                    logger.error("Error opening rolled over file. "
-                      + getShortDescription());
-                    // Let's add this to monitoring and exit this thread
-                    logger.info("Added input to not ready list."
-                      + getShortDescription());
-                    isReady = false;
-                    inputMgr.addToNotReady(this);
-                    break;
-                  }
-                  logger.info("File is successfully rolled over. "
-                    + getShortDescription());
-                  continue;
-                }
-              }
-              Thread.sleep(sleepStep * 1000);
-              sleepStep = (sleepStep * 2);
-              sleepStep = sleepStep > 10 ? 10 : sleepStep;
-            } catch (InterruptedException e) {
-              logger.info("Thread interrupted."
-                + getShortDescription());
-            }
-          } else {
-            lineCount++;
-            sleepStep = 1;
-            sleepIteration = 0;
-
-            if (!resume && lineCount > resumeFromLineNumber) {
-              logger.info("Resuming to read from last line. lineCount="
-                + lineCount
-                + ", input="
-                + getShortDescription());
-              resume = true;
-            }
-            if (resume) {
-              InputMarker marker = new InputMarker();
-              marker.base64FileKey = base64FileKey;
-              marker.input = this;
-              marker.lineNumber = lineCount;
-              outputLine(line, marker);
-            }
-          }
-        } catch (Throwable t) {
-          final String LOG_MESSAGE_KEY = this.getClass()
-            .getSimpleName() + "_READ_LOOP_EXCEPTION";
-          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-            "Caught exception in read loop. lineNumber="
-              + lineCount + ", input="
-              + getShortDescription(), t, logger,
-            Level.ERROR);
-
-        }
-      }
-    } finally {
-      if (br != null) {
-        logger.info("Closing reader." + getShortDescription()
-          + ", lineCount=" + lineCount);
-        try {
-          br.close();
-        } catch (Throwable t) {
-          // ignore
-        }
-      }
-    }
-  }
-
-  static public Object getFileKey(File file) {
-    try {
-      Path fileFullPath = Paths.get(file.getAbsolutePath());
-      if (fileFullPath != null) {
-        BasicFileAttributes basicAttr = Files.readAttributes(
-          fileFullPath, BasicFileAttributes.class);
-        return basicAttr.fileKey();
-      }
-    } catch (Throwable ex) {
-      logger.error("Error getting file attributes for file=" + file, ex);
-    }
-    return file.toString();
+  protected BufferedReader openLogFile(File logFile) throws FileNotFoundException {
+    BufferedReader br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logFile));
+    fileKey = getFileKey(logFile);
+    base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+    LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
+    return br;
   }
 
   @Override
-  public String getShortDescription() {
-    return "input:source="
-      + getStringValue("source")
-      + ", path="
-      + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0]
-      .getAbsolutePath() : getStringValue("path"));
+  protected Object getFileKey(File logFile) {
+    return FileUtil.getFileKey(logFile);
   }
-  
-  public void copyFiles(File[] files) {
+
+  private void copyFiles(File[] files) {
     boolean isCopyFile = getBooleanValue("copy_file", false);
     if (isCopyFile && files != null) {
       for (File file : files) {
         try {
-          InputMarker marker = new InputMarker();
-          marker.input = this;
-          outputMgr.copyFile(file, marker);
+          InputMarker marker = new InputMarker(this, null, 0);
+          outputManager.copyFile(file, marker);
           if (isClosed() || isDrain()) {
-            logger.info("isClosed or isDrain. Now breaking loop.");
+            LOG.info("isClosed or isDrain. Now breaking loop.");
             break;
           }
         } catch (Throwable t) {
-          logger.error("Error processing file=" + file.getAbsolutePath(), t);
+          LOG.error("Error processing file=" + file.getAbsolutePath(), t);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
new file mode 100644
index 0000000..8e70850
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputManager {
+  private static final Logger LOG = Logger.getLogger(InputManager.class);
+
+  private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints";
+  public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp";
+  
+  private List<Input> inputList = new ArrayList<Input>();
+  private Set<Input> notReadyList = new HashSet<Input>();
+
+  private boolean isDrain = false;
+  private boolean isAnyInputTail = false;
+
+  private File checkPointFolderFile = null;
+
+  private MetricData filesCountMetric = new MetricData("input.files.count", true);
+
+  private String checkPointExtension;
+  
+  private Thread inputIsReadyMonitor = null;
+
+  public List<Input> getInputList() {
+    return inputList;
+  }
+
+  public void add(Input input) {
+    inputList.add(input);
+  }
+
+  public void removeInput(Input input) {
+    LOG.info("Trying to remove from inputList. " + input.getShortDescription());
+    Iterator<Input> iter = inputList.iterator();
+    while (iter.hasNext()) {
+      Input iterInput = iter.next();
+      if (iterInput.equals(input)) {
+        LOG.info("Removing Input from inputList. " + input.getShortDescription());
+        iter.remove();
+      }
+    }
+  }
+
+  private int getActiveFilesCount() {
+    int count = 0;
+    for (Input input : inputList) {
+      if (input.isReady()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public void init() {
+    checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", DEFAULT_CHECKPOINT_EXTENSION);
+    for (Input input : inputList) {
+      try {
+        input.init();
+        if (input.isTail()) {
+          isAnyInputTail = true;
+        }
+      } catch (Exception e) {
+        LOG.error("Error initializing input. " + input.getShortDescription(), e);
+      }
+    }
+
+    if (isAnyInputTail) {
+      LOG.info("Determining valid checkpoint folder");
+      boolean isCheckPointFolderValid = false;
+      // We need to keep track of the files we are reading.
+      String checkPointFolder = LogFeederUtil.getStringProperty("logfeeder.checkpoint.folder");
+      if (!StringUtils.isEmpty(checkPointFolder)) {
+        checkPointFolderFile = new File(checkPointFolder);
+        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+      }
+      if (!isCheckPointFolderValid) {
+        // Let's try home folder
+        String userHome = LogFeederUtil.getStringProperty("user.home");
+        if (userHome != null) {
+          checkPointFolderFile = new File(userHome, CHECKPOINT_SUBFOLDER_NAME);
+          LOG.info("Checking if home folder can be used for checkpoints. Folder=" + checkPointFolderFile);
+          isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+        }
+      }
+      if (!isCheckPointFolderValid) {
+        // Let's use tmp folder
+        String tmpFolder = LogFeederUtil.getStringProperty("java.io.tmpdir");
+        if (tmpFolder == null) {
+          tmpFolder = "/tmp";
+        }
+        checkPointFolderFile = new File(tmpFolder, CHECKPOINT_SUBFOLDER_NAME);
+        LOG.info("Checking if tmps folder can be used for checkpoints. Folder=" + checkPointFolderFile);
+        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+        if (isCheckPointFolderValid) {
+          LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check points. This is not recommended." +
+              "Please set logfeeder.checkpoint.folder property");
+        }
+      }
+
+      if (isCheckPointFolderValid) {
+        LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints");
+      }
+    }
+
+  }
+
+  private boolean verifyCheckPointFolder(File folderPathFile) {
+    if (!folderPathFile.exists()) {
+      try {
+        if (!folderPathFile.mkdir()) {
+          LOG.warn("Error creating folder for check point. folder=" + folderPathFile);
+        }
+      } catch (Throwable t) {
+        LOG.warn("Error creating folder for check point. folder=" + folderPathFile, t);
+      }
+    }
+
+    if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+      // Let's check whether we can create a file
+      File testFile = new File(folderPathFile, UUID.randomUUID().toString());
+      try {
+        testFile.createNewFile();
+        return testFile.delete();
+      } catch (IOException e) {
+        LOG.warn("Couldn't create test file in " + folderPathFile.getAbsolutePath() + " for checkPoint", e);
+      }
+    }
+    return false;
+  }
+
+  public File getCheckPointFolderFile() {
+    return checkPointFolderFile;
+  }
+
+  public void monitor() {
+    for (Input input : inputList) {
+      if (input.isReady()) {
+        input.monitor();
+      } else {
+        if (input.isTail()) {
+          LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
+              "So it might not be an issue. " + input.getShortDescription());
+          notReadyList.add(input);
+        } else {
+          LOG.info("Input is not ready, so going to ignore it " + input.getShortDescription());
+        }
+      }
+    }
+    // Start the monitoring thread if any file is in tail mode
+    if (isAnyInputTail) {
+       inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
+        @Override
+        public void run() {
+          LOG.info("Going to monitor for these missing files: " + notReadyList.toString());
+          while (true) {
+            if (isDrain) {
+              LOG.info("Exiting missing file monitor.");
+              break;
+            }
+            try {
+              Iterator<Input> iter = notReadyList.iterator();
+              while (iter.hasNext()) {
+                Input input = iter.next();
+                try {
+                  if (input.isReady()) {
+                    input.monitor();
+                    iter.remove();
+                  }
+                } catch (Throwable t) {
+                  LOG.error("Error while enabling monitoring for input. " + input.getShortDescription());
+                }
+              }
+              Thread.sleep(30 * 1000);
+            } catch (Throwable t) {
+              // Ignore
+            }
+          }
+        }
+      };
+      inputIsReadyMonitor.start();
+    }
+  }
+
+  void addToNotReady(Input notReadyInput) {
+    notReadyList.add(notReadyInput);
+  }
+
+  public void addMetricsContainers(List<MetricData> metricsList) {
+    for (Input input : inputList) {
+      input.addMetricsContainers(metricsList);
+    }
+    filesCountMetric.value = getActiveFilesCount();
+    metricsList.add(filesCountMetric);
+  }
+
+  public void logStats() {
+    for (Input input : inputList) {
+      input.logStat();
+    }
+
+    filesCountMetric.value = getActiveFilesCount();
+    LogFeederUtil.logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", "");
+  }
+
+
+  public void cleanCheckPointFiles() {
+
+    if (checkPointFolderFile == null) {
+      LOG.info("Will not clean checkPoint files. checkPointFolderFile=" + checkPointFolderFile);
+      return;
+    }
+    LOG.info("Cleaning checkPoint files. checkPointFolderFile=" + checkPointFolderFile.getAbsolutePath());
+    try {
+      // Loop over the check point files and if filePath is not present, then move to closed
+      String searchPath = "*" + checkPointExtension;
+      FileFilter fileFilter = new WildcardFileFilter(searchPath);
+      File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
+      int totalCheckFilesDeleted = 0;
+      for (File checkPointFile : checkPointFiles) {
+        try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) {
+          int contentSize = checkPointReader.readInt();
+          byte b[] = new byte[contentSize];
+          int readSize = checkPointReader.read(b, 0, contentSize);
+          if (readSize != contentSize) {
+            LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read="
+              + readSize + ", checkPointFile=" + checkPointFile);
+          } else {
+            String jsonCheckPointStr = new String(b, 0, readSize);
+            Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+            String logFilePath = (String) jsonCheckPoint.get("file_path");
+            String logFileKey = (String) jsonCheckPoint.get("file_key");
+            if (logFilePath != null && logFileKey != null) {
+              boolean deleteCheckPointFile = false;
+              File logFile = new File(logFilePath);
+              if (logFile.exists()) {
+                Object fileKeyObj = FileUtil.getFileKey(logFile);
+                String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+                if (!logFileKey.equals(fileBase64)) {
+                  deleteCheckPointFile = true;
+                  LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
+                      logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+                }
+              } else {
+                LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
+                    checkPointFile.getAbsolutePath());
+                deleteCheckPointFile = true;
+              }
+              if (deleteCheckPointFile) {
+                LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
+                checkPointFile.delete();
+                totalCheckFilesDeleted++;
+              }
+            }
+          }
+        } catch (EOFException eof) {
+          LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile);
+        } catch (Throwable t) {
+          LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+        }
+      }
+      LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" +
+          checkPointFolderFile.getAbsolutePath());
+
+    } catch (Throwable t) {
+      LOG.error("Error while cleaning checkPointFiles", t);
+    }
+  }
+
+  public void waitOnAllInputs() {
+    //wait on inputs
+    for (Input input : inputList) {
+      if (input != null) {
+        Thread inputThread = input.getThread();
+        if (inputThread != null) {
+          try {
+            inputThread.join();
+          } catch (InterruptedException e) {
+            // ignore
+          }
+        }
+      }
+    }
+    // wait on monitor
+    if (inputIsReadyMonitor != null) {
+      try {
+        this.close();
+        inputIsReadyMonitor.join();
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+  }
+
+  public void checkInAll() {
+    for (Input input : inputList) {
+      input.lastCheckIn();
+    }
+  }
+
+  public void close() {
+    for (Input input : inputList) {
+      try {
+        input.setDrain(true);
+      } catch (Throwable t) {
+        LOG.error("Error while draining. input=" + input.getShortDescription(), t);
+      }
+    }
+    isDrain = true;
+
+    // Need to get this value from property
+    int iterations = 30;
+    int waitTimeMS = 1000;
+    for (int i = 0; i < iterations; i++) {
+      boolean allClosed = true;
+      for (Input input : inputList) {
+        if (!input.isClosed()) {
+          try {
+            allClosed = false;
+            LOG.warn("Waiting for input to close. " + input.getShortDescription() + ", " + (iterations - i) + " more seconds");
+            Thread.sleep(waitTimeMS);
+          } catch (Throwable t) {
+            // Ignore
+          }
+        }
+      }
+      if (allClosed) {
+        LOG.info("All inputs are closed. Iterations=" + i);
+        return;
+      }
+    }
+    
+    LOG.warn("Some inputs were not closed after " + iterations + " iterations");
+    for (Input input : inputList) {
+      if (!input.isClosed()) {
+        LOG.warn("Input not closed. Will ignore it." + input.getShortDescription());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
index 48a7f1d..6767687 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
@@ -23,13 +23,18 @@ package org.apache.ambari.logfeeder.input;
  * This file contains the file inode, line number of the log currently been read
  */
 public class InputMarker {
-  public int lineNumber = 0;
-  public Input input;
-  public String base64FileKey = null;
-
+  public final Input input;
+  public final String base64FileKey;
+  public final int lineNumber;
+  
+  public InputMarker(Input input, String base64FileKey, int lineNumber) {
+    this.input = input;
+    this.base64FileKey = base64FileKey;
+    this.lineNumber = lineNumber;
+  }
+  
   @Override
   public String toString() {
-    return "InputMarker [lineNumber=" + lineNumber + ", input="
-      + input.getShortDescription() + "]";
+    return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
deleted file mode 100644
index b18c9b0..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.input;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.log4j.Logger;
-import org.apache.solr.common.util.Base64;
-
-public class InputMgr {
-  private static final Logger logger = Logger.getLogger(InputMgr.class);
-
-  private List<Input> inputList = new ArrayList<Input>();
-  private Set<Input> notReadyList = new HashSet<Input>();
-
-  private boolean isDrain = false;
-  private boolean isAnyInputTail = false;
-
-  private String checkPointSubFolderName = "logfeeder_checkpoints";
-  private File checkPointFolderFile = null;
-
-  private MetricCount filesCountMetric = new MetricCount();
-
-  private String checkPointExtension = ".cp";
-  
-  private Thread inputIsReadyMonitor = null;
-
-  public List<Input> getInputList() {
-    return inputList;
-  }
-
-  public void add(Input input) {
-    inputList.add(input);
-  }
-
-  public void removeInput(Input input) {
-    logger.info("Trying to remove from inputList. "
-      + input.getShortDescription());
-    Iterator<Input> iter = inputList.iterator();
-    while (iter.hasNext()) {
-      Input iterInput = iter.next();
-      if (iterInput.equals(input)) {
-        logger.info("Removing Input from inputList. "
-          + input.getShortDescription());
-        iter.remove();
-      }
-    }
-  }
-
-  public int getActiveFilesCount() {
-    int count = 0;
-    for (Input input : inputList) {
-      if (input.isReady()) {
-        count++;
-      }
-    }
-    return count;
-  }
-
-  public void init() {
-    filesCountMetric.metricsName = "input.files.count";
-    filesCountMetric.isPointInTime = true;
-
-    checkPointExtension = LogFeederUtil.getStringProperty(
-      "logfeeder.checkpoint.extension", checkPointExtension);
-    for (Input input : inputList) {
-      try {
-        input.init();
-        if (input.isTail()) {
-          isAnyInputTail = true;
-        }
-      } catch (Exception e) {
-        logger.error(
-          "Error initializing input. "
-            + input.getShortDescription(), e);
-      }
-    }
-
-    if (isAnyInputTail) {
-      logger.info("Determining valid checkpoint folder");
-      boolean isCheckPointFolderValid = false;
-      // We need to keep track of the files we are reading.
-      String checkPointFolder = LogFeederUtil
-        .getStringProperty("logfeeder.checkpoint.folder");
-      if (checkPointFolder != null && !checkPointFolder.isEmpty()) {
-        checkPointFolderFile = new File(checkPointFolder);
-        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
-      }
-      if (!isCheckPointFolderValid) {
-        // Let's try home folder
-        String userHome = LogFeederUtil.getStringProperty("user.home");
-        if (userHome != null) {
-          checkPointFolderFile = new File(userHome,
-            checkPointSubFolderName);
-          logger.info("Checking if home folder can be used for checkpoints. Folder="
-            + checkPointFolderFile);
-          isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
-        }
-      }
-      if (!isCheckPointFolderValid) {
-        // Let's use tmp folder
-        String tmpFolder = LogFeederUtil
-          .getStringProperty("java.io.tmpdir");
-        if (tmpFolder == null) {
-          tmpFolder = "/tmp";
-        }
-        checkPointFolderFile = new File(tmpFolder,
-          checkPointSubFolderName);
-        logger.info("Checking if tmps folder can be used for checkpoints. Folder="
-          + checkPointFolderFile);
-        isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
-        if (isCheckPointFolderValid) {
-          logger.warn("Using tmp folder "
-            + checkPointFolderFile
-            + " to store check points. This is not recommended."
-            + "Please set logfeeder.checkpoint.folder property");
-        }
-      }
-
-      if (isCheckPointFolderValid) {
-        logger.info("Using folder " + checkPointFolderFile
-          + " for storing checkpoints");
-      }
-    }
-
-  }
-
-  public File getCheckPointFolderFile() {
-    return checkPointFolderFile;
-  }
-
-  private boolean verifyCheckPointFolder(File folderPathFile) {
-    if (!folderPathFile.exists()) {
-      // Create the folder
-      try {
-        if (!folderPathFile.mkdir()) {
-          logger.warn("Error creating folder for check point. folder="
-            + folderPathFile);
-        }
-      } catch (Throwable t) {
-        logger.warn("Error creating folder for check point. folder="
-          + folderPathFile, t);
-      }
-    }
-
-    if (folderPathFile.exists() && folderPathFile.isDirectory()) {
-      // Let's check whether we can create a file
-      File testFile = new File(folderPathFile, UUID.randomUUID()
-        .toString());
-      try {
-        testFile.createNewFile();
-        return testFile.delete();
-      } catch (IOException e) {
-        logger.warn(
-          "Couldn't create test file in "
-            + folderPathFile.getAbsolutePath()
-            + " for checkPoint", e);
-      }
-    }
-    return false;
-  }
-
-  public void monitor() {
-    for (Input input : inputList) {
-      if (input.isReady()) {
-        input.monitor();
-      } else {
-        if (input.isTail()) {
-          logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. "
-            + input.getShortDescription());
-          notReadyList.add(input);
-        } else {
-          logger.info("Input is not ready, so going to ignore it "
-            + input.getShortDescription());
-        }
-      }
-    }
-    // Start the monitoring thread if any file is in tail mode
-    if (isAnyInputTail) {
-       inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
-        @Override
-        public void run() {
-          logger.info("Going to monitor for these missing files: "
-            + notReadyList.toString());
-          while (true) {
-            if (isDrain) {
-              logger.info("Exiting missing file monitor.");
-              break;
-            }
-            try {
-              Iterator<Input> iter = notReadyList.iterator();
-              while (iter.hasNext()) {
-                Input input = iter.next();
-                try {
-                  if (input.isReady()) {
-                    input.monitor();
-                    iter.remove();
-                  }
-                } catch (Throwable t) {
-                  logger.error("Error while enabling monitoring for input. "
-                    + input.getShortDescription());
-                }
-              }
-              Thread.sleep(30 * 1000);
-            } catch (Throwable t) {
-              // Ignore
-            }
-          }
-        }
-      };
-      inputIsReadyMonitor.start();
-    }
-  }
-
-  public void addToNotReady(Input notReadyInput) {
-    notReadyList.add(notReadyInput);
-  }
-
-  public void addMetricsContainers(List<MetricCount> metricsList) {
-    for (Input input : inputList) {
-      input.addMetricsContainers(metricsList);
-    }
-    filesCountMetric.count = getActiveFilesCount();
-    metricsList.add(filesCountMetric);
-  }
-
-  public void logStats() {
-    for (Input input : inputList) {
-      input.logStat();
-    }
-
-    filesCountMetric.count = getActiveFilesCount();
-    LogFeederUtil.logStatForMetric(filesCountMetric,
-      "Stat: Files Monitored Count", null);
-  }
-
-  public void close() {
-    for (Input input : inputList) {
-      try {
-        input.setDrain(true);
-      } catch (Throwable t) {
-        logger.error(
-          "Error while draining. input="
-            + input.getShortDescription(), t);
-      }
-    }
-    isDrain = true;
-
-    // Need to get this value from property
-    int iterations = 30;
-    int waitTimeMS = 1000;
-    int i = 0;
-    boolean allClosed = true;
-    for (i = 0; i < iterations; i++) {
-      allClosed = true;
-      for (Input input : inputList) {
-        if (!input.isClosed()) {
-          try {
-            allClosed = false;
-            logger.warn("Waiting for input to close. "
-              + input.getShortDescription() + ", "
-              + (iterations - i) + " more seconds");
-            Thread.sleep(waitTimeMS);
-          } catch (Throwable t) {
-            // Ignore
-          }
-        }
-      }
-      if (allClosed) {
-        break;
-      }
-    }
-    if (!allClosed) {
-      logger.warn("Some inputs were not closed. Iterations=" + i);
-      for (Input input : inputList) {
-        if (!input.isClosed()) {
-          logger.warn("Input not closed. Will ignore it."
-            + input.getShortDescription());
-        }
-      }
-    } else {
-      logger.info("All inputs are closed. Iterations=" + i);
-    }
-
-  }
-
-  public void checkInAll() {
-    for (Input input : inputList) {
-      input.checkIn();
-    }
-  }
-
-  public void cleanCheckPointFiles() {
-
-    if (checkPointFolderFile == null) {
-      logger.info("Will not clean checkPoint files. checkPointFolderFile="
-        + checkPointFolderFile);
-      return;
-    }
-    logger.info("Cleaning checkPoint files. checkPointFolderFile="
-      + checkPointFolderFile.getAbsolutePath());
-    try {
-      // Loop over the check point files and if filePath is not present, then move to closed
-      String searchPath = "*" + checkPointExtension;
-      FileFilter fileFilter = new WildcardFileFilter(searchPath);
-      File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
-      int totalCheckFilesDeleted = 0;
-      for (File checkPointFile : checkPointFiles) {
-        RandomAccessFile checkPointReader = null;
-        try {
-          checkPointReader = new RandomAccessFile(checkPointFile, "r");
-
-          int contentSize = checkPointReader.readInt();
-          byte b[] = new byte[contentSize];
-          int readSize = checkPointReader.read(b, 0, contentSize);
-          if (readSize != contentSize) {
-            logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
-              + contentSize
-              + ", read="
-              + readSize
-              + ", checkPointFile=" + checkPointFile);
-          } else {
-            // Create JSON string
-            String jsonCheckPointStr = new String(b, 0, readSize);
-            Map<String, Object> jsonCheckPoint = LogFeederUtil
-              .toJSONObject(jsonCheckPointStr);
-
-            String logFilePath = (String) jsonCheckPoint
-              .get("file_path");
-            String logFileKey = (String) jsonCheckPoint
-              .get("file_key");
-            if (logFilePath != null && logFileKey != null) {
-              boolean deleteCheckPointFile = false;
-              File logFile = new File(logFilePath);
-              if (logFile.exists()) {
-                Object fileKeyObj = InputFile
-                  .getFileKey(logFile);
-                String fileBase64 = Base64
-                  .byteArrayToBase64(fileKeyObj
-                    .toString().getBytes());
-                if (!logFileKey.equals(fileBase64)) {
-                  deleteCheckPointFile = true;
-                  logger.info("CheckPoint clean: File key has changed. old="
-                    + logFileKey
-                    + ", new="
-                    + fileBase64
-                    + ", filePath="
-                    + logFilePath
-                    + ", checkPointFile="
-                    + checkPointFile.getAbsolutePath());
-                }
-              } else {
-                logger.info("CheckPoint clean: Log file doesn't exist. filePath="
-                  + logFilePath
-                  + ", checkPointFile="
-                  + checkPointFile.getAbsolutePath());
-                deleteCheckPointFile = true;
-              }
-              if (deleteCheckPointFile) {
-                logger.info("Deleting CheckPoint file="
-                  + checkPointFile.getAbsolutePath()
-                  + ", logFile=" + logFilePath);
-                checkPointFile.delete();
-                totalCheckFilesDeleted++;
-              }
-            }
-          }
-        } catch (EOFException eof) {
-          logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. "
-            + checkPointFile);
-        } catch (Throwable t) {
-          logger.error("Error while checking checkPoint file. "
-            + checkPointFile, t);
-        } finally {
-          if (checkPointReader != null) {
-            try {
-              checkPointReader.close();
-            } catch (Throwable t) {
-              logger.error("Error closing checkPoint file. "
-                + checkPointFile, t);
-            }
-          }
-        }
-      }
-      logger.info("Deleted " + totalCheckFilesDeleted
-        + " checkPoint file(s). checkPointFolderFile="
-        + checkPointFolderFile.getAbsolutePath());
-
-    } catch (Throwable t) {
-      logger.error("Error while cleaning checkPointFiles", t);
-    }
-  }
-
-  public void waitOnAllInputs() {
-    //wait on inputs
-    if (inputList != null) {
-      for (Input input : inputList) {
-        if (input != null) {
-          Thread inputThread = input.getThread();
-          if (inputThread != null) {
-            try {
-              inputThread.join();
-            } catch (InterruptedException e) {
-              // ignore
-            }
-          }
-        }
-      }
-    }
-    // wait on monitor
-    if (inputIsReadyMonitor != null) {
-      try {
-        this.close();
-        inputIsReadyMonitor.join();
-      } catch (InterruptedException e) {
-        // ignore
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index c9d28bd..f560379 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -19,201 +19,57 @@
 package org.apache.ambari.logfeeder.input;
 
 import java.io.BufferedReader;
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.solr.common.util.Base64;
 
-public class InputS3File extends Input {
-  private static final Logger logger = Logger.getLogger(InputS3File.class);
-
-  private String logPath = null;
-  private boolean isStartFromBegining = true;
-
-  private boolean isReady = false;
-  private String[] s3LogPathFiles = null;
-  private Object fileKey = null;
-  private String base64FileKey = null;
-
-  private boolean isRolledOver = false;
-  private boolean addWildCard = false;
-
-  private long lastCheckPointTimeMS = 0;
-  private int checkPointIntervalMS = 5 * 1000; // 5 seconds
-  private RandomAccessFile checkPointWriter = null;
-  private Map<String, Object> jsonCheckPoint = null;
-
-  private File checkPointFile = null;
-
-  private InputMarker lastCheckPointInputMarker = null;
-
-  private String checkPointExtension = ".cp";
-
-
-  @Override
-  public void init() throws Exception {
-    logger.info("init() called");
-    statMetric.metricsName = "input.files.read_lines";
-    readBytesMetric.metricsName = "input.files.read_bytes";
-    checkPointExtension = LogFeederUtil.getStringProperty(
-        "logfeeder.checkpoint.extension", checkPointExtension);
-
-    // Let's close the file and set it to true after we start monitoring it
-    setClosed(true);
-    logPath = getStringValue("path");
-    tail = getBooleanValue("tail", tail);
-    addWildCard = getBooleanValue("add_wild_card", addWildCard);
-    checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
-        checkPointIntervalMS);
-    if (logPath == null || logPath.isEmpty()) {
-      logger.error("path is empty for file input. " + getShortDescription());
-      return;
-    }
-
-    String startPosition = getStringValue("start_position");
-    if (StringUtils.isEmpty(startPosition)
-        || startPosition.equalsIgnoreCase("beginning")
-        || startPosition.equalsIgnoreCase("begining")) {
-      isStartFromBegining = true;
-    }
-
-    if (!tail) {
-      // start position end doesn't apply if we are not tailing
-      isStartFromBegining = true;
-    }
-
-    setFilePath(logPath);
-    boolean isFileReady = isReady();
-
-    logger.info("File to monitor " + logPath + ", tail=" + tail
-        + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
-
-    super.init();
-  }
+public class InputS3File extends AbstractInputFile {
 
   @Override
   public boolean isReady() {
     if (!isReady) {
       // Let's try to check whether the file is available
-      s3LogPathFiles = getActualFiles(logPath);
-      if (s3LogPathFiles != null && s3LogPathFiles.length > 0) {
-        if (isTail() && s3LogPathFiles.length > 1) {
-          logger.warn("Found multiple files (" + s3LogPathFiles.length
-              + ") for the file filter " + filePath
-              + ". Will use only the first one. Using " + s3LogPathFiles[0]);
+      logFiles = getActualFiles(logPath);
+      if (!ArrayUtils.isEmpty(logFiles)) {
+        if (tail && logFiles.length > 1) {
+          LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
+              ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
         }
-        logger.info("File filter " + filePath + " expanded to "
-            + s3LogPathFiles[0]);
+        LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
         isReady = true;
       } else {
-        logger.debug(logPath + " file doesn't exist. Ignoring for now");
+        LOG.debug(logPath + " file doesn't exist. Ignoring for now");
       }
     }
     return isReady;
   }
 
-  private String[] getActualFiles(String searchPath) {
+  private File[] getActualFiles(String searchPath) {
     // TODO search file on s3
-    return new String[] { searchPath };
-  }
-
-  @Override
-  synchronized public void checkIn(InputMarker inputMarker) {
-    super.checkIn(inputMarker);
-    if (checkPointWriter != null) {
-      try {
-        int lineNumber = LogFeederUtil.objectToInt(
-            jsonCheckPoint.get("line_number"), 0, "line_number");
-        if (lineNumber > inputMarker.lineNumber) {
-          // Already wrote higher line number for this input
-          return;
-        }
-        // If interval is greater than last checkPoint time, then write
-        long currMS = System.currentTimeMillis();
-        if (!isClosed()
-            && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
-          // Let's save this one so we can update the check point file
-          // on flush
-          lastCheckPointInputMarker = inputMarker;
-          return;
-        }
-        lastCheckPointTimeMS = currMS;
-
-        jsonCheckPoint.put("line_number", ""
-            + new Integer(inputMarker.lineNumber));
-        jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
-        jsonCheckPoint.put("last_write_time_date", new Date());
-
-        String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
-
-        // Let's rewind
-        checkPointWriter.seek(0);
-        checkPointWriter.writeInt(jsonStr.length());
-        checkPointWriter.write(jsonStr.getBytes());
-
-        if (isClosed()) {
-          final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-              + "_FINAL_CHECKIN";
-          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-              "Wrote final checkPoint, input=" + getShortDescription()
-                  + ", checkPointFile=" + checkPointFile.getAbsolutePath()
-                  + ", checkPoint=" + jsonStr, null, logger, Level.INFO);
-        }
-      } catch (Throwable t) {
-        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-            + "_CHECKIN_EXCEPTION";
-        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-            "Caught exception checkIn. , input=" + getShortDescription(), t,
-            logger, Level.ERROR);
-      }
-    }
-
-  }
-
-  @Override
-  public void checkIn() {
-    super.checkIn();
-    if (lastCheckPointInputMarker != null) {
-      checkIn(lastCheckPointInputMarker);
-    }
-  }
-
-  @Override
-  public void rollOver() {
-    logger.info("Marking this input file for rollover. "
-        + getShortDescription());
-    isRolledOver = true;
+    return new File[] { new File(searchPath) };
   }
 
   @Override
   void start() throws Exception {
-    if (s3LogPathFiles == null || s3LogPathFiles.length == 0) {
+    if (ArrayUtils.isEmpty(logFiles)) {
       return;
     }
 
-    if (isTail()) {
-      processFile(s3LogPathFiles[0]);
+    if (tail) {
+      processFile(logFiles[0]);
     } else {
-      for (String s3FilePath : s3LogPathFiles) {
+      for (File s3FilePath : logFiles) {
         try {
           processFile(s3FilePath);
           if (isClosed() || isDrain()) {
-            logger.info("isClosed or isDrain. Now breaking loop.");
+            LOG.info("isClosed or isDrain. Now breaking loop.");
             break;
           }
         } catch (Throwable t) {
-          logger.error("Error processing file=" + s3FilePath, t);
+          LOG.error("Error processing file=" + s3FilePath, t);
         }
       }
     }
@@ -221,244 +77,18 @@ public class InputS3File extends Input {
   }
 
   @Override
-  public void close() {
-    super.close();
-    logger.info("close() calling checkPoint checkIn(). "
-        + getShortDescription());
-    checkIn();
-  }
-
-  private void processFile(String logPathFile) throws FileNotFoundException,
-      IOException {
-    logger.info("Monitoring logPath=" + logPath + ", logPathFile="
-        + logPathFile);
-    BufferedReader br = null;
-    checkPointFile = null;
-    checkPointWriter = null;
-    jsonCheckPoint = null;
-    int resumeFromLineNumber = 0;
-
-    int lineCount = 0;
-    try {
-      setFilePath(logPathFile);
-      String s3AccessKey = getStringValue("s3_access_key");
-      String s3SecretKey = getStringValue("s3_secret_key");
-      br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
-      if(br==null){
-        //log err
-        return;
-      }
-      
-      // Whether to send to output from the beginning.
-      boolean resume = isStartFromBegining;
-
-      // Seems FileWatch is not reliable, so let's only use file key comparison
-      fileKey = getFileKey(logPathFile);
-      base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
-      logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". "
-          + getShortDescription());
-
-      if (isTail()) {
-        try {
-          // Let's see if there is a checkpoint for this file
-          logger.info("Checking existing checkpoint file. "
-              + getShortDescription());
-
-          String fileBase64 = Base64.byteArrayToBase64(fileKey.toString()
-              .getBytes());
-          String checkPointFileName = fileBase64 + checkPointExtension;
-          File checkPointFolder = inputMgr.getCheckPointFolderFile();
-          checkPointFile = new File(checkPointFolder, checkPointFileName);
-          checkPointWriter = new RandomAccessFile(checkPointFile, "rw");
-
-          try {
-            int contentSize = checkPointWriter.readInt();
-            byte b[] = new byte[contentSize];
-            int readSize = checkPointWriter.read(b, 0, contentSize);
-            if (readSize != contentSize) {
-              logger
-                  .error("Couldn't read expected number of bytes from checkpoint file. expected="
-                      + contentSize
-                      + ", read="
-                      + readSize
-                      + ", checkPointFile="
-                      + checkPointFile
-                      + ", input="
-                      + getShortDescription());
-            } else {
-              String jsonCheckPointStr = new String(b, 0, readSize);
-              jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
-              resumeFromLineNumber = LogFeederUtil.objectToInt(
-                  jsonCheckPoint.get("line_number"), 0, "line_number");
-
-              if (resumeFromLineNumber > 0) {
-                // Let's read from last line read
-                resume = false;
-              }
-              logger.info("CheckPoint. checkPointFile=" + checkPointFile
-                  + ", json=" + jsonCheckPointStr + ", resumeFromLineNumber="
-                  + resumeFromLineNumber + ", resume=" + resume);
-            }
-          } catch (EOFException eofEx) {
-            logger.info("EOFException. Will reset checkpoint file "
-                + checkPointFile.getAbsolutePath() + " for "
-                + getShortDescription());
-          }
-          if (jsonCheckPoint == null) {
-            // This seems to be first time, so creating the initial
-            // checkPoint object
-            jsonCheckPoint = new HashMap<String, Object>();
-            jsonCheckPoint.put("file_path", filePath);
-            jsonCheckPoint.put("file_key", fileBase64);
-          }
-
-        } catch (Throwable t) {
-          logger.error(
-              "Error while configuring checkpoint file. Will reset file. checkPointFile="
-                  + checkPointFile, t);
-        }
-      }
-
-      setClosed(false);
-      int sleepStep = 2;
-      int sleepIteration = 0;
-      while (true) {
-        try {
-          if (isDrain()) {
-            break;
-          }
-
-          String line = br.readLine();
-          if (line == null) {
-            if (!resume) {
-              resume = true;
-            }
-            sleepIteration++;
-            try {
-              // Since FileWatch service is not reliable, we will check
-              // file inode every n seconds after no write
-              if (sleepIteration > 4) {
-                Object newFileKey = getFileKey(logPathFile);
-                if (newFileKey != null) {
-                  if (fileKey == null || !newFileKey.equals(fileKey)) {
-                    logger
-                        .info("File key is different. Calling rollover. oldKey="
-                            + fileKey
-                            + ", newKey="
-                            + newFileKey
-                            + ". "
-                            + getShortDescription());
-                    // File has rotated.
-                    rollOver();
-                  }
-                }
-              }
-              // Flush on the second iteration
-              if (!tail && sleepIteration >= 2) {
-                logger.info("End of file. Done with filePath=" + logPathFile
-                    + ", lineCount=" + lineCount);
-                flush();
-                break;
-              } else if (sleepIteration == 2) {
-                flush();
-              } else if (sleepIteration >= 2) {
-                if (isRolledOver) {
-                  isRolledOver = false;
-                  // Close existing file
-                  try {
-                    logger
-                        .info("File is rolled over. Closing current open file."
-                            + getShortDescription() + ", lineCount="
-                            + lineCount);
-                    br.close();
-                  } catch (Exception ex) {
-                    logger.error("Error closing file" + getShortDescription());
-                    break;
-                  }
-                  try {
-                    // Open new file
-                    logger.info("Opening new rolled over file."
-                        + getShortDescription());
-                    br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
-                    lineCount = 0;
-                    fileKey = getFileKey(logPathFile);
-                    base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
-                        .getBytes());
-                    logger.info("fileKey=" + fileKey + ", base64="
-                        + base64FileKey + ", " + getShortDescription());
-                  } catch (Exception ex) {
-                    logger.error("Error opening rolled over file. "
-                        + getShortDescription());
-                    // Let's add this to monitoring and exit this thread
-                    logger.info("Added input to not ready list."
-                        + getShortDescription());
-                    isReady = false;
-                    inputMgr.addToNotReady(this);
-                    break;
-                  }
-                  logger.info("File is successfully rolled over. "
-                      + getShortDescription());
-                  continue;
-                }
-              }
-              Thread.sleep(sleepStep * 1000);
-              sleepStep = (sleepStep * 2);
-              sleepStep = sleepStep > 10 ? 10 : sleepStep;
-            } catch (InterruptedException e) {
-              logger.info("Thread interrupted." + getShortDescription());
-            }
-          } else {
-            lineCount++;
-            sleepStep = 1;
-            sleepIteration = 0;
-
-            if (!resume && lineCount > resumeFromLineNumber) {
-              logger.info("Resuming to read from last line. lineCount="
-                  + lineCount + ", input=" + getShortDescription());
-              resume = true;
-            }
-            if (resume) {
-              InputMarker marker = new InputMarker();
-              marker.base64FileKey = base64FileKey;
-              marker.input = this;
-              marker.lineNumber = lineCount;
-              outputLine(line, marker);
-            }
-          }
-        } catch (Throwable t) {
-          final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
-              + "_READ_LOOP_EXCEPTION";
-          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
-              "Caught exception in read loop. lineNumber=" + lineCount
-                  + ", input=" + getShortDescription(), t, logger, Level.ERROR);
-
-        }
-      }
-    } finally {
-      if (br != null) {
-        logger.info("Closing reader." + getShortDescription() + ", lineCount="
-            + lineCount);
-        try {
-          br.close();
-        } catch (Throwable t) {
-          // ignore
-        }
-      }
-    }
-  }
-
-  static public Object getFileKey(String s3FilePath) {
-    return s3FilePath.toString();
+  protected BufferedReader openLogFile(File logPathFile) throws IOException {
+    String s3AccessKey = getStringValue("s3_access_key");
+    String s3SecretKey = getStringValue("s3_secret_key");
+    BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey);
+    fileKey = getFileKey(logPathFile);
+    base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+    LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
+    return br;
   }
 
   @Override
-  public String getShortDescription() {
-    return "input:source="
-        + getStringValue("source")
-        + ", path="
-        + (s3LogPathFiles != null && s3LogPathFiles.length > 0 ? s3LogPathFiles[0]
-            : getStringValue("path"));
+  protected Object getFileKey(File logFile) {
+    return logFile.getPath();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 5ba56a5..743be69 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -18,7 +18,7 @@
  */
 package org.apache.ambari.logfeeder.input;
 
-import java.net.Inet4Address;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
@@ -66,7 +66,7 @@ public class InputSimulate extends Input {
     
     Filter filter = new FilterJSON();
     filter.setInput(this);
-    setFirstFilter(filter);
+    addFilter(filter);
   }
   
   private List<String> getSimulatedLogTypes() {
@@ -88,23 +88,18 @@ public class InputSimulate extends Input {
     
     return LOG_TEXT_PATTERN.replaceAll("<LOG_MESSAGE_PATTERN>", logMessagePattern);
   }
-  
-  @Override
-  public String getNameForThread() {
-    return "Simulated input";
-  }
 
   @Override
-  public String getShortDescription() {
-    return "Simulated input";
+  public boolean isReady() {
+    return true;
   }
-  
+
   @Override
   void start() throws Exception {
     if (types.isEmpty())
       return;
     
-    getFirstFilter().setOutputMgr(outputMgr);
+    getFirstFilter().setOutputManager(outputManager);
     while (true) {
       String type = imitateRandomLogFile();
       
@@ -129,10 +124,7 @@ public class InputSimulate extends Input {
   }
 
   private InputMarker getInputMarker(String type) throws Exception {
-    InputMarker marker = new InputMarker();
-    marker.input = this;
-    marker.lineNumber = getLineNumber(type);
-    marker.base64FileKey = getBase64FileKey();
+    InputMarker marker = new InputMarker(this, getBase64FileKey(), getLineNumber(type));
     return marker;
   }
 
@@ -147,7 +139,7 @@ public class InputSimulate extends Input {
   }
 
   private String getBase64FileKey() throws Exception {
-    String fileKey = Inet4Address.getLocalHost().getHostAddress() + "|" + filePath;
+    String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + filePath;
     return Base64.byteArrayToBase64(fileKey.getBytes());
   }
 
@@ -155,4 +147,20 @@ public class InputSimulate extends Input {
     Date d = new Date();
     return String.format(logText, d.getTime(), level, marker.lineNumber);
   }
+
+  @Override
+  public void checkIn(InputMarker inputMarker) {}
+
+  @Override
+  public void lastCheckIn() {}
+  
+  @Override
+  public String getNameForThread() {
+    return "Simulated input";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "Simulated input";
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
index a2a9db2..9ccc4f2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
@@ -18,7 +18,6 @@
  */
 package org.apache.ambari.logfeeder.input.reader;
 
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -30,15 +29,11 @@ import org.apache.log4j.Logger;
 
 class GZIPReader extends InputStreamReader {
 
-  private static Logger logger = Logger.getLogger(GZIPReader.class);
+  private static final Logger LOG = Logger.getLogger(GZIPReader.class);
 
   GZIPReader(String fileName) throws FileNotFoundException {
     super(getStream(fileName));
-    logger.info("Created GZIPReader for file : " + fileName);
-  }
-
-  GZIPReader(File file) throws FileNotFoundException {
-    super(getStream(file.getName()));
+    LOG.info("Created GZIPReader for file : " + fileName);
   }
 
   private static InputStream getStream(String fileName) {
@@ -48,7 +43,7 @@ class GZIPReader extends InputStreamReader {
       fileStream = new FileInputStream(fileName);
       gzipStream = new GZIPInputStream(fileStream);
     } catch (Exception e) {
-      logger.error(e, e.getCause());
+      LOG.error(e, e.getCause());
     }
     return gzipStream;
   }
@@ -58,21 +53,13 @@ class GZIPReader extends InputStreamReader {
    */
   static boolean isValidFile(String fileName) {
     // TODO make it generic and put in factory itself
-    InputStream is = null;
-    try {
-      is = new FileInputStream(fileName);
+    
+    try (InputStream is = new FileInputStream(fileName)) {
       byte[] signature = new byte[2];
       int nread = is.read(signature); // read the gzip signature
       return nread == 2 && signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b;
     } catch (IOException e) {
       return false;
-    } finally {
-      if (is != null) {
-        try {
-          is.close();
-        } catch (IOException e) {
-        }
-      }
     }
   }
 }