You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/07/06 08:18:31 UTC
ambari git commit: AMBARI-21387 Log Feeder input config attribute
"tail" should be clearer (mgergely)
Repository: ambari
Updated Branches:
refs/heads/trunk 78ebbef3d -> 8d9fd451d
AMBARI-21387 Log Feeder input config attribute "tail" should be clearer (mgergely)
Change-Id: I0ca164df6b5b91d237f1503bc4b9e45a0df4b685
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8d9fd451
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8d9fd451
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8d9fd451
Branch: refs/heads/trunk
Commit: 8d9fd451d5ad348d073d4adbe73970586ab71c37
Parents: 78ebbef
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Thu Jul 6 10:17:52 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Thu Jul 6 10:17:52 2017 +0200
----------------------------------------------------------------------
.../config/zookeeper/LogSearchConfigZK.java | 12 +-
.../logfeeder/input/AbstractInputFile.java | 105 +++++++++--------
.../apache/ambari/logfeeder/input/Input.java | 4 -
.../ambari/logfeeder/input/InputFile.java | 23 ++--
.../ambari/logfeeder/input/InputManager.java | 113 +++++++++++--------
.../ambari/logfeeder/input/InputS3File.java | 11 +-
.../logfeeder/input/InputManagerTest.java | 1 -
7 files changed, 152 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 1926efa..827101c 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -22,6 +22,7 @@ package org.apache.ambari.logsearch.config.zookeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
@@ -53,6 +54,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
@@ -175,10 +177,14 @@ public class LogSearchConfigZK implements LogSearchConfig {
globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global"));
}
- createGlobalConfigNode(globalConfigNode);
-
TreeCacheListener listener = new TreeCacheListener() {
+ private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, Type.NODE_UPDATED, Type.NODE_REMOVED);
+
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (!nodeEvents.contains(event.getType())) {
+ return;
+ }
+
String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
String nodeData = new String(event.getData().getData());
Type eventType = event.getType();
@@ -265,6 +271,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
};
cache.getListenable().addListener(listener);
cache.start();
+
+ createGlobalConfigNode(globalConfigNode);
}
private void createGlobalConfigNode(JsonArray globalConfigNode) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index ab50eb7..2359256 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -31,7 +31,6 @@ import java.util.Map;
import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
-import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@@ -48,11 +47,12 @@ public abstract class AbstractInputFile extends Input {
protected boolean isReady;
private String checkPointExtension;
- private File checkPointFile;
- private long lastCheckPointTimeMS;
private int checkPointIntervalMS;
- private Map<String, Object> jsonCheckPoint;
- private InputMarker lastCheckPointInputMarker;
+
+ private Map<String, File> checkPointFiles = new HashMap<>();
+ private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
+ private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>();
+ private Map<String, InputMarker> lastCheckPointInputMarkers = new HashMap<>();
@Override
protected String getStatMetricName() {
@@ -73,7 +73,6 @@ public abstract class AbstractInputFile extends Input {
// Let's close the file and set it to true after we start monitoring it
setClosed(true);
logPath = inputDescriptor.getPath();
- tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), tail);
checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
if (StringUtils.isEmpty(logPath)) {
@@ -89,11 +88,9 @@ public abstract class AbstractInputFile extends Input {
super.init();
}
- protected void processFile(File logPathFile) throws FileNotFoundException, IOException {
+ protected void processFile(File logPathFile, boolean follow) throws FileNotFoundException, IOException {
LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile);
BufferedReader br = null;
- checkPointFile = null;
- jsonCheckPoint = null;
int lineCount = 0;
try {
@@ -125,7 +122,7 @@ public abstract class AbstractInputFile extends Input {
sleepIteration++;
if (sleepIteration == 2) {
flush();
- if (!tail) {
+ if (!follow) {
LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
break;
}
@@ -204,47 +201,50 @@ public abstract class AbstractInputFile extends Input {
private int getResumeFromLineNumber() {
int resumeFromLineNumber = 0;
- if (tail) {
- try {
- LOG.info("Checking existing checkpoint file. " + getShortDescription());
+ File checkPointFile = null;
+ try {
+ LOG.info("Checking existing checkpoint file. " + getShortDescription());
- String checkPointFileName = base64FileKey + checkPointExtension;
- File checkPointFolder = inputManager.getCheckPointFolderFile();
- checkPointFile = new File(checkPointFolder, checkPointFileName);
- if (!checkPointFile.exists()) {
- LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning");
- } else {
- try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
- int contentSize = checkPointWriter.readInt();
- byte b[] = new byte[contentSize];
- int readSize = checkPointWriter.read(b, 0, contentSize);
- if (readSize != contentSize) {
- LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
- readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
- } else {
- String jsonCheckPointStr = new String(b, 0, readSize);
- jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+ String checkPointFileName = base64FileKey + checkPointExtension;
+ File checkPointFolder = inputManager.getCheckPointFolderFile();
+ checkPointFile = new File(checkPointFolder, checkPointFileName);
+ checkPointFiles.put(base64FileKey, checkPointFile);
+ Map<String, Object> jsonCheckPoint = null;
+ if (!checkPointFile.exists()) {
+ LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning");
+ } else {
+ try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
+ int contentSize = checkPointWriter.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointWriter.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
+ readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
+ } else {
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
- resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+ resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
- LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
- ", resumeFromLineNumber=" + resumeFromLineNumber);
- }
- } catch (EOFException eofEx) {
- LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
- getShortDescription(), eofEx);
- }
- }
- if (jsonCheckPoint == null) {
- // This seems to be first time, so creating the initial checkPoint object
- jsonCheckPoint = new HashMap<String, Object>();
- jsonCheckPoint.put("file_path", filePath);
- jsonCheckPoint.put("file_key", base64FileKey);
+ LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
+ ", resumeFromLineNumber=" + resumeFromLineNumber);
+ }
+ } catch (EOFException eofEx) {
+ LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
+ getShortDescription(), eofEx);
}
-
- } catch (Throwable t) {
- LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
}
+ if (jsonCheckPoint == null) {
+ // This seems to be first time, so creating the initial checkPoint object
+ jsonCheckPoint = new HashMap<String, Object>();
+ jsonCheckPoint.put("file_path", filePath);
+ jsonCheckPoint.put("file_key", base64FileKey);
+ }
+
+ jsonCheckPoints.put(base64FileKey, jsonCheckPoint);
+
+ } catch (Throwable t) {
+ LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
}
return resumeFromLineNumber;
@@ -253,6 +253,9 @@ public abstract class AbstractInputFile extends Input {
@Override
public synchronized void checkIn(InputMarker inputMarker) {
try {
+ Map<String, Object> jsonCheckPoint = jsonCheckPoints.get(inputMarker.base64FileKey);
+ File checkPointFile = checkPointFiles.get(inputMarker.base64FileKey);
+
int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
if (lineNumber > inputMarker.lineNumber) {
// Already wrote higher line number for this input
@@ -260,12 +263,14 @@ public abstract class AbstractInputFile extends Input {
}
// If interval is greater than last checkPoint time, then write
long currMS = System.currentTimeMillis();
- if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+ long lastCheckPointTimeMs = lastCheckPointTimeMSs.containsKey(inputMarker.base64FileKey) ?
+ lastCheckPointTimeMSs.get(inputMarker.base64FileKey) : 0;
+ if (!isClosed() && (currMS - lastCheckPointTimeMs < checkPointIntervalMS)) {
// Let's save this one so we can update the check point file on flush
- lastCheckPointInputMarker = inputMarker;
+ lastCheckPointInputMarkers.put(inputMarker.base64FileKey, inputMarker);
return;
}
- lastCheckPointTimeMS = currMS;
+ lastCheckPointTimeMSs.put(inputMarker.base64FileKey, currMS);
jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
@@ -299,7 +304,7 @@ public abstract class AbstractInputFile extends Input {
@Override
public void lastCheckIn() {
- if (lastCheckPointInputMarker != null) {
+ for (InputMarker lastCheckPointInputMarker : lastCheckPointInputMarkers.values()) {
checkIn(lastCheckPointInputMarker);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index c36f96b..49151e7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -270,10 +270,6 @@ public abstract class Input extends ConfigItem implements Runnable {
}
}
- public boolean isTail() {
- return tail;
- }
-
public boolean isUseEventMD5() {
return useEventMD5;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index fc40ca4..e24a7aa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -22,6 +22,8 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Comparator;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
import org.apache.ambari.logfeeder.util.FileUtil;
@@ -41,7 +43,7 @@ public class InputFile extends AbstractInputFile {
if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
if (tail && logFiles.length > 1) {
LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
- ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
+ ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
}
LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
isReady = true;
@@ -58,7 +60,15 @@ public class InputFile extends AbstractInputFile {
return new File[]{searchFile};
} else {
FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
- return searchFile.getParentFile().listFiles(fileFilter);
+ File[] logFiles = searchFile.getParentFile().listFiles(fileFilter);
+ Arrays.sort(logFiles,
+ new Comparator<File>() {
+ @Override
+ public int compare(File o1, File o2) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ });
+ return logFiles;
}
}
@@ -66,12 +76,11 @@ public class InputFile extends AbstractInputFile {
void start() throws Exception {
boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true);
if (isProcessFile) {
- if (tail) {
- processFile(logFiles[0]);
- } else {
- for (File file : logFiles) {
+ for (int i = logFiles.length - 1; i >= 0; i--) {
+ File file = logFiles[i];
+ if (i == 0 || !tail) {
try {
- processFile(file);
+ processFile(file, i == 0);
if (isClosed() || isDrain()) {
LOG.info("isClosed or isDrain. Now breaking loop.");
break;
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
index 19894ae..01a11ec 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -197,13 +197,9 @@ public class InputManager {
if (input.isReady()) {
input.monitor();
} else {
- if (input.isTail()) {
- LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
- "So it might not be an issue. " + input.getShortDescription());
- notReadyList.add(input);
- } else {
- LOG.info("Input is not ready, so going to ignore it " + input.getShortDescription());
- }
+ LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
+ "So it might not be an issue. " + input.getShortDescription());
+ notReadyList.add(input);
}
} catch (Exception e) {
LOG.error("Error initializing input. " + input.getShortDescription(), e);
@@ -279,46 +275,8 @@ public class InputManager {
File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
int totalCheckFilesDeleted = 0;
for (File checkPointFile : checkPointFiles) {
- try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) {
- int contentSize = checkPointReader.readInt();
- byte b[] = new byte[contentSize];
- int readSize = checkPointReader.read(b, 0, contentSize);
- if (readSize != contentSize) {
- LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read="
- + readSize + ", checkPointFile=" + checkPointFile);
- } else {
- String jsonCheckPointStr = new String(b, 0, readSize);
- Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
- String logFilePath = (String) jsonCheckPoint.get("file_path");
- String logFileKey = (String) jsonCheckPoint.get("file_key");
- if (logFilePath != null && logFileKey != null) {
- boolean deleteCheckPointFile = false;
- File logFile = new File(logFilePath);
- if (logFile.exists()) {
- Object fileKeyObj = FileUtil.getFileKey(logFile);
- String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
- if (!logFileKey.equals(fileBase64)) {
- deleteCheckPointFile = true;
- LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
- logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
- }
- } else {
- LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
- checkPointFile.getAbsolutePath());
- deleteCheckPointFile = true;
- }
- if (deleteCheckPointFile) {
- LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
- checkPointFile.delete();
- totalCheckFilesDeleted++;
- }
- }
- }
- } catch (EOFException eof) {
- LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile);
- } catch (Throwable t) {
- LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+ if (checkCheckPointFile(checkPointFile)) {
+ totalCheckFilesDeleted++;
}
}
LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" +
@@ -329,6 +287,67 @@ public class InputManager {
}
}
+ private boolean checkCheckPointFile(File checkPointFile) {
+ boolean deleted = false;
+ try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) {
+ int contentSize = checkPointReader.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointReader.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read="
+ + readSize + ", checkPointFile=" + checkPointFile);
+ } else {
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+ String logFilePath = (String) jsonCheckPoint.get("file_path");
+ String logFileKey = (String) jsonCheckPoint.get("file_key");
+ if (logFilePath != null && logFileKey != null) {
+ boolean deleteCheckPointFile = false;
+ File logFile = new File(logFilePath);
+ if (logFile.exists()) {
+ Object fileKeyObj = FileUtil.getFileKey(logFile);
+ String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+ if (!logFileKey.equals(fileBase64)) {
+ LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
+ logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+ deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
+ }
+ } else {
+ LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
+ checkPointFile.getAbsolutePath());
+ deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
+ }
+ if (deleteCheckPointFile) {
+ LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
+ checkPointFile.delete();
+ deleted = true;
+ }
+ }
+ }
+ } catch (EOFException eof) {
+ LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile);
+ } catch (Throwable t) {
+ LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+ }
+
+ return deleted;
+ }
+
+ private boolean wasFileRenamed(File folder, String searchFileBase64) {
+ for (File file : folder.listFiles()) {
+ Object fileKeyObj = FileUtil.getFileKey(file);
+ String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+ if (searchFileBase64.equals(fileBase64)) {
+ // even though the file name in the checkpoint file is different from the one it was renamed to, checkpoint files are
+ // identified by their name, which is generated from the file key, which would be the same for the renamed file
+ LOG.info("CheckPoint clean: File key matches file " + file.getAbsolutePath() + ", it must have been renamed");
+ return true;
+ }
+ }
+ return false;
+ }
+
public void waitOnAllInputs() {
//wait on inputs
for (List<Input> inputList : inputs.values()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 2b19503..69d053a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -59,18 +59,17 @@ public class InputS3File extends AbstractInputFile {
return;
}
- if (tail) {
- processFile(logFiles[0]);
- } else {
- for (File s3FilePath : logFiles) {
+ for (int i = logFiles.length - 1; i >= 0; i--) {
+ File file = logFiles[i];
+ if (i == 0 || !tail) {
try {
- processFile(s3FilePath);
+ processFile(file, i == 0);
if (isClosed() || isDrain()) {
LOG.info("isClosed or isDrain. Now breaking loop.");
break;
}
} catch (Throwable t) {
- LOG.error("Error processing file=" + s3FilePath, t);
+ LOG.error("Error processing file=" + file.getAbsolutePath(), t);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d9fd451/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
index e9bbe7e..625e362 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
@@ -73,7 +73,6 @@ public class InputManagerTest {
expect(input1.monitor()).andReturn(false);
expect(input2.monitor()).andReturn(false);
- expect(input3.isTail()).andReturn(false);
expect(input3.getShortDescription()).andReturn("").once();
replay(input1, input2, input3);