You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ab...@apache.org on 2017/07/07 14:58:08 UTC

[21/36] ambari git commit: AMBARI-21387 Log Feeder input config attribute "tail" should be clearer (mgergely)

AMBARI-21387 Log Feeder input config attribute "tail" should be clearer (mgergely)

Change-Id: I0ca164df6b5b91d237f1503bc4b9e45a0df4b685


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8d9fd451
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8d9fd451
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8d9fd451

Branch: refs/heads/branch-feature-logsearch-ui
Commit: 8d9fd451d5ad348d073d4adbe73970586ab71c37
Parents: 78ebbef
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Thu Jul 6 10:17:52 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Thu Jul 6 10:17:52 2017 +0200

----------------------------------------------------------------------
 .../config/zookeeper/LogSearchConfigZK.java     |  12 +-
 .../logfeeder/input/AbstractInputFile.java      | 105 +++++++++--------
 .../apache/ambari/logfeeder/input/Input.java    |   4 -
 .../ambari/logfeeder/input/InputFile.java       |  23 ++--
 .../ambari/logfeeder/input/InputManager.java    | 113 +++++++++++--------
 .../ambari/logfeeder/input/InputS3File.java     |  11 +-
 .../logfeeder/input/InputManagerTest.java       |   1 -
 7 files changed, 152 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 1926efa..827101c 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -22,6 +22,7 @@ package org.apache.ambari.logsearch.config.zookeeper;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
@@ -53,6 +54,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
@@ -175,10 +177,14 @@ public class LogSearchConfigZK implements LogSearchConfig {
       globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global"));
     }
     
-    createGlobalConfigNode(globalConfigNode);
-    
     TreeCacheListener listener = new TreeCacheListener() {
+      private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, Type.NODE_UPDATED, Type.NODE_REMOVED);
+      
       public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+        if (!nodeEvents.contains(event.getType())) {
+          return;
+        }
+        
         String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
         String nodeData = new String(event.getData().getData());
         Type eventType = event.getType();
@@ -265,6 +271,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
     };
     cache.getListenable().addListener(listener);
     cache.start();
+
+    createGlobalConfigNode(globalConfigNode);
   }
 
   private void createGlobalConfigNode(JsonArray globalConfigNode) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index ab50eb7..2359256 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
-import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -48,11 +47,12 @@ public abstract class AbstractInputFile extends Input {
   protected boolean isReady;
 
   private String checkPointExtension;
-  private File checkPointFile;
-  private long lastCheckPointTimeMS;
   private int checkPointIntervalMS;
-  private Map<String, Object> jsonCheckPoint;
-  private InputMarker lastCheckPointInputMarker;
+  
+  private Map<String, File> checkPointFiles = new HashMap<>();
+  private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
+  private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>();
+  private Map<String, InputMarker> lastCheckPointInputMarkers = new HashMap<>();
 
   @Override
   protected String getStatMetricName() {
@@ -73,7 +73,6 @@ public abstract class AbstractInputFile extends Input {
     // Let's close the file and set it to true after we start monitoring it
     setClosed(true);
     logPath = inputDescriptor.getPath();
-    tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), tail);
     checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
 
     if (StringUtils.isEmpty(logPath)) {
@@ -89,11 +88,9 @@ public abstract class AbstractInputFile extends Input {
     super.init();
   }
 
-  protected void processFile(File logPathFile) throws FileNotFoundException, IOException {
+  protected void processFile(File logPathFile, boolean follow) throws FileNotFoundException, IOException {
     LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile);
     BufferedReader br = null;
-    checkPointFile = null;
-    jsonCheckPoint = null;
 
     int lineCount = 0;
     try {
@@ -125,7 +122,7 @@ public abstract class AbstractInputFile extends Input {
             sleepIteration++;
             if (sleepIteration == 2) {
               flush();
-              if (!tail) {
+              if (!follow) {
                 LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
                 break;
               }
@@ -204,47 +201,50 @@ public abstract class AbstractInputFile extends Input {
   private int getResumeFromLineNumber() {
     int resumeFromLineNumber = 0;
     
-    if (tail) {
-      try {
-        LOG.info("Checking existing checkpoint file. " + getShortDescription());
+    File checkPointFile = null;
+    try {
+      LOG.info("Checking existing checkpoint file. " + getShortDescription());
 
-        String checkPointFileName = base64FileKey + checkPointExtension;
-        File checkPointFolder = inputManager.getCheckPointFolderFile();
-        checkPointFile = new File(checkPointFolder, checkPointFileName);
-        if (!checkPointFile.exists()) {
-          LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning");
-        } else {
-          try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
-            int contentSize = checkPointWriter.readInt();
-            byte b[] = new byte[contentSize];
-            int readSize = checkPointWriter.read(b, 0, contentSize);
-            if (readSize != contentSize) {
-              LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
-                  readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
-            } else {
-              String jsonCheckPointStr = new String(b, 0, readSize);
-              jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+      String checkPointFileName = base64FileKey + checkPointExtension;
+      File checkPointFolder = inputManager.getCheckPointFolderFile();
+      checkPointFile = new File(checkPointFolder, checkPointFileName);
+      checkPointFiles.put(base64FileKey, checkPointFile);
+      Map<String, Object> jsonCheckPoint = null;
+      if (!checkPointFile.exists()) {
+        LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning");
+      } else {
+        try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
+          int contentSize = checkPointWriter.readInt();
+          byte b[] = new byte[contentSize];
+          int readSize = checkPointWriter.read(b, 0, contentSize);
+          if (readSize != contentSize) {
+            LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
+                readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
+          } else {
+            String jsonCheckPointStr = new String(b, 0, readSize);
+            jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
 
-              resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+            resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
 
-              LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
-                  ", resumeFromLineNumber=" + resumeFromLineNumber);
-           }
-          } catch (EOFException eofEx) {
-            LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
-                getShortDescription(), eofEx);
-          }
-        }
-        if (jsonCheckPoint == null) {
-          // This seems to be first time, so creating the initial checkPoint object
-          jsonCheckPoint = new HashMap<String, Object>();
-          jsonCheckPoint.put("file_path", filePath);
-          jsonCheckPoint.put("file_key", base64FileKey);
+            LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
+                ", resumeFromLineNumber=" + resumeFromLineNumber);
+         }
+        } catch (EOFException eofEx) {
+          LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
+              getShortDescription(), eofEx);
         }
-
-      } catch (Throwable t) {
-        LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
       }
+      if (jsonCheckPoint == null) {
+        // This seems to be first time, so creating the initial checkPoint object
+        jsonCheckPoint = new HashMap<String, Object>();
+        jsonCheckPoint.put("file_path", filePath);
+        jsonCheckPoint.put("file_key", base64FileKey);
+      }
+      
+      jsonCheckPoints.put(base64FileKey, jsonCheckPoint);
+
+    } catch (Throwable t) {
+      LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
     }
     
     return resumeFromLineNumber;
@@ -253,6 +253,9 @@ public abstract class AbstractInputFile extends Input {
   @Override
   public synchronized void checkIn(InputMarker inputMarker) {
     try {
+      Map<String, Object> jsonCheckPoint = jsonCheckPoints.get(inputMarker.base64FileKey);
+      File checkPointFile = checkPointFiles.get(inputMarker.base64FileKey);
+      
       int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
       if (lineNumber > inputMarker.lineNumber) {
         // Already wrote higher line number for this input
@@ -260,12 +263,14 @@ public abstract class AbstractInputFile extends Input {
       }
       // If interval is greater than last checkPoint time, then write
       long currMS = System.currentTimeMillis();
-      if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+      long lastCheckPointTimeMs = lastCheckPointTimeMSs.containsKey(inputMarker.base64FileKey) ?
+          lastCheckPointTimeMSs.get(inputMarker.base64FileKey) : 0;
+      if (!isClosed() && (currMS - lastCheckPointTimeMs < checkPointIntervalMS)) {
         // Let's save this one so we can update the check point file on flush
-        lastCheckPointInputMarker = inputMarker;
+        lastCheckPointInputMarkers.put(inputMarker.base64FileKey, inputMarker);
         return;
       }
-      lastCheckPointTimeMS = currMS;
+      lastCheckPointTimeMSs.put(inputMarker.base64FileKey, currMS);
 
       jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
       jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
@@ -299,7 +304,7 @@ public abstract class AbstractInputFile extends Input {
 
   @Override
   public void lastCheckIn() {
-    if (lastCheckPointInputMarker != null) {
+    for (InputMarker lastCheckPointInputMarker : lastCheckPointInputMarkers.values()) {
       checkIn(lastCheckPointInputMarker);
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index c36f96b..49151e7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -270,10 +270,6 @@ public abstract class Input extends ConfigItem implements Runnable {
     }
   }
 
-  public boolean isTail() {
-    return tail;
-  }
-
   public boolean isUseEventMD5() {
     return useEventMD5;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index fc40ca4..e24a7aa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -22,6 +22,8 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Comparator;
 
 import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
 import org.apache.ambari.logfeeder.util.FileUtil;
@@ -41,7 +43,7 @@ public class InputFile extends AbstractInputFile {
       if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
         if (tail && logFiles.length > 1) {
           LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
-              ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
+              ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
         }
         LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
         isReady = true;
@@ -58,7 +60,15 @@ public class InputFile extends AbstractInputFile {
       return new File[]{searchFile};
     } else {
       FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
-      return searchFile.getParentFile().listFiles(fileFilter);
+      File[] logFiles = searchFile.getParentFile().listFiles(fileFilter);
+      Arrays.sort(logFiles,
+          new Comparator<File>() {
+            @Override
+            public int compare(File o1, File o2) {
+              return o1.getName().compareTo(o2.getName());
+            }
+      });
+      return logFiles;
     }
   }
 
@@ -66,12 +76,11 @@ public class InputFile extends AbstractInputFile {
   void start() throws Exception {
     boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true);
     if (isProcessFile) {
-      if (tail) {
-        processFile(logFiles[0]);
-      } else {
-        for (File file : logFiles) {
+      for (int i = logFiles.length - 1; i >= 0; i--) {
+        File file = logFiles[i];
+        if (i == 0 || !tail) {
           try {
-            processFile(file);
+            processFile(file, i == 0);
             if (isClosed() || isDrain()) {
               LOG.info("isClosed or isDrain. Now breaking loop.");
               break;

http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
index 19894ae..01a11ec 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -197,13 +197,9 @@ public class InputManager {
         if (input.isReady()) {
           input.monitor();
         } else {
-          if (input.isTail()) {
-            LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
-                "So it might not be an issue. " + input.getShortDescription());
-            notReadyList.add(input);
-          } else {
-            LOG.info("Input is not ready, so going to ignore it " + input.getShortDescription());
-          }
+          LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
+              "So it might not be an issue. " + input.getShortDescription());
+          notReadyList.add(input);
         }
       } catch (Exception e) {
         LOG.error("Error initializing input. " + input.getShortDescription(), e);
@@ -279,46 +275,8 @@ public class InputManager {
       File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
       int totalCheckFilesDeleted = 0;
       for (File checkPointFile : checkPointFiles) {
-        try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) {
-          int contentSize = checkPointReader.readInt();
-          byte b[] = new byte[contentSize];
-          int readSize = checkPointReader.read(b, 0, contentSize);
-          if (readSize != contentSize) {
-            LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read="
-              + readSize + ", checkPointFile=" + checkPointFile);
-          } else {
-            String jsonCheckPointStr = new String(b, 0, readSize);
-            Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
-            String logFilePath = (String) jsonCheckPoint.get("file_path");
-            String logFileKey = (String) jsonCheckPoint.get("file_key");
-            if (logFilePath != null && logFileKey != null) {
-              boolean deleteCheckPointFile = false;
-              File logFile = new File(logFilePath);
-              if (logFile.exists()) {
-                Object fileKeyObj = FileUtil.getFileKey(logFile);
-                String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
-                if (!logFileKey.equals(fileBase64)) {
-                  deleteCheckPointFile = true;
-                  LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
-                      logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
-                }
-              } else {
-                LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
-                    checkPointFile.getAbsolutePath());
-                deleteCheckPointFile = true;
-              }
-              if (deleteCheckPointFile) {
-                LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
-                checkPointFile.delete();
-                totalCheckFilesDeleted++;
-              }
-            }
-          }
-        } catch (EOFException eof) {
-          LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile);
-        } catch (Throwable t) {
-          LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+        if (checkCheckPointFile(checkPointFile)) {
+          totalCheckFilesDeleted++;
         }
       }
       LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" +
@@ -329,6 +287,67 @@ public class InputManager {
     }
   }
 
+  private boolean checkCheckPointFile(File checkPointFile) {
+    boolean deleted = false;
+    try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) {
+      int contentSize = checkPointReader.readInt();
+      byte b[] = new byte[contentSize];
+      int readSize = checkPointReader.read(b, 0, contentSize);
+      if (readSize != contentSize) {
+        LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read="
+          + readSize + ", checkPointFile=" + checkPointFile);
+      } else {
+        String jsonCheckPointStr = new String(b, 0, readSize);
+        Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+        String logFilePath = (String) jsonCheckPoint.get("file_path");
+        String logFileKey = (String) jsonCheckPoint.get("file_key");
+        if (logFilePath != null && logFileKey != null) {
+          boolean deleteCheckPointFile = false;
+          File logFile = new File(logFilePath);
+          if (logFile.exists()) {
+            Object fileKeyObj = FileUtil.getFileKey(logFile);
+            String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+            if (!logFileKey.equals(fileBase64)) {
+              LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
+                  logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+              deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
+            }
+          } else {
+            LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
+                checkPointFile.getAbsolutePath());
+            deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
+          }
+          if (deleteCheckPointFile) {
+            LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
+            checkPointFile.delete();
+            deleted = true;
+          }
+        }
+      }
+    } catch (EOFException eof) {
+      LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile);
+    } catch (Throwable t) {
+      LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+    }
+    
+    return deleted;
+  }
+  
+  private boolean wasFileRenamed(File folder, String searchFileBase64) {
+    for (File file : folder.listFiles()) {
+      Object fileKeyObj = FileUtil.getFileKey(file);
+      String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+      if (searchFileBase64.equals(fileBase64)) {
+        // even though the file name in the checkpoint file is different from the one it was renamed to, checkpoint files are
+        // identified by their name, which is generated from the file key, which would be the same for the renamed file
+        LOG.info("CheckPoint clean: File key matches file " + file.getAbsolutePath() + ", it must have been renamed");
+        return true;
+      }
+    }
+    return false;
+  }
+  
   public void waitOnAllInputs() {
     //wait on inputs
     for (List<Input> inputList : inputs.values()) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 2b19503..69d053a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -59,18 +59,17 @@ public class InputS3File extends AbstractInputFile {
       return;
     }
 
-    if (tail) {
-      processFile(logFiles[0]);
-    } else {
-      for (File s3FilePath : logFiles) {
+    for (int i = logFiles.length - 1; i >= 0; i--) {
+      File file = logFiles[i];
+      if (i == 0 || !tail) {
         try {
-          processFile(s3FilePath);
+          processFile(file, i == 0);
           if (isClosed() || isDrain()) {
             LOG.info("isClosed or isDrain. Now breaking loop.");
             break;
           }
         } catch (Throwable t) {
-          LOG.error("Error processing file=" + s3FilePath, t);
+          LOG.error("Error processing file=" + file.getAbsolutePath(), t);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
index e9bbe7e..625e362 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
@@ -73,7 +73,6 @@ public class InputManagerTest {
     
     expect(input1.monitor()).andReturn(false);
     expect(input2.monitor()).andReturn(false);
-    expect(input3.isTail()).andReturn(false);
     expect(input3.getShortDescription()).andReturn("").once();
     
     replay(input1, input2, input3);