You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/28 09:41:59 UTC
[32/52] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder
(Miklos Gergely via oleewere)
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 5feb9c4..e13d9bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.input;
import java.io.File;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,88 +28,138 @@ import java.util.Map;
import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.log4j.Logger;
public abstract class Input extends ConfigBlock implements Runnable {
- static private Logger logger = Logger.getLogger(Input.class);
-
- protected OutputMgr outputMgr;
- protected InputMgr inputMgr;
+ private static final Logger LOG = Logger.getLogger(Input.class);
+ private static final boolean DEFAULT_TAIL = true;
+ private static final boolean DEFAULT_USE_EVENT_MD5 = false;
+ private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
+
+ protected InputManager inputManager;
+ protected OutputManager outputManager;
private List<Output> outputList = new ArrayList<Output>();
- private Filter firstFilter = null;
private Thread thread;
- private boolean isClosed = false;
- protected String filePath = null;
- private String type = null;
+ private String type;
+ protected String filePath;
+ private Filter firstFilter;
+ private boolean isClosed;
- protected boolean tail = true;
- private boolean useEventMD5 = false;
- private boolean genEventMD5 = true;
+ protected boolean tail;
+ private boolean useEventMD5;
+ private boolean genEventMD5;
- protected MetricCount readBytesMetric = new MetricCount();
+ protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
+ protected String getReadBytesMetricName() {
+ return null;
+ }
+
+ @Override
+ public void loadConfig(Map<String, Object> map) {
+ super.loadConfig(map);
+ String typeValue = getStringValue("type");
+ if (typeValue != null) {
+ // Explicitly add type and value to field list
+ contextFields.put("type", typeValue);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> addFields = (Map<String, Object>) map.get("add_fields");
+ if (addFields == null) {
+ addFields = new HashMap<String, Object>();
+ map.put("add_fields", addFields);
+ }
+ addFields.put("type", typeValue);
+ }
+ }
- /**
- * This method will be called from the thread spawned for the output. This
- * method should only exit after all data are read from the source or the
- * process is exiting
- */
- abstract void start() throws Exception;
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public void setInputManager(InputManager inputManager) {
+ this.inputManager = inputManager;
+ }
+
+ public void setOutputManager(OutputManager outputManager) {
+ this.outputManager = outputManager;
+ }
+
+ public void addFilter(Filter filter) {
+ if (firstFilter == null) {
+ firstFilter = filter;
+ } else {
+ Filter f = firstFilter;
+ while (f.getNextFilter() != null) {
+ f = f.getNextFilter();
+ }
+ f.setNextFilter(filter);
+ }
+ }
+
+ public void addOutput(Output output) {
+ outputList.add(output);
+ }
@Override
public void init() throws Exception {
super.init();
- tail = getBooleanValue("tail", tail);
- useEventMD5 = getBooleanValue("use_event_md5_as_id", useEventMD5);
- genEventMD5 = getBooleanValue("gen_event_md5", genEventMD5);
+ tail = getBooleanValue("tail", DEFAULT_TAIL);
+ useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
+ genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
if (firstFilter != null) {
firstFilter.init();
}
}
- @Override
- public String getNameForThread() {
- if (filePath != null) {
- try {
- return (type + "=" + (new File(filePath)).getName());
- } catch (Throwable ex) {
- logger.warn("Couldn't get basename for filePath=" + filePath,
- ex);
- }
+ boolean monitor() {
+ if (isReady()) {
+ LOG.info("Starting thread. " + getShortDescription());
+ thread = new Thread(this, getNameForThread());
+ thread.start();
+ return true;
+ } else {
+ return false;
}
- return super.getNameForThread() + ":" + type;
}
+ public abstract boolean isReady();
+
@Override
public void run() {
try {
- logger.info("Started to monitor. " + getShortDescription());
+ LOG.info("Started to monitor. " + getShortDescription());
start();
} catch (Exception e) {
- logger.error("Error writing to output.", e);
+ LOG.error("Error writing to output.", e);
}
- logger.info("Exiting thread. " + getShortDescription());
+ LOG.info("Exiting thread. " + getShortDescription());
}
+ /**
+ * This method will be called from the thread spawned for the output. This
+ * method should only exit after all data are read from the source or the
+ * process is exiting
+ */
+ abstract void start() throws Exception;
+
protected void outputLine(String line, InputMarker marker) {
- statMetric.count++;
- readBytesMetric.count += (line.length());
+ statMetric.value++;
+ readBytesMetric.value += (line.length());
if (firstFilter != null) {
try {
firstFilter.apply(line, marker);
} catch (LogfeederException e) {
- logger.error(e.getLocalizedMessage(),e);
+ LOG.error(e.getLocalizedMessage(), e);
}
} else {
- // TODO: For now, let's make filter mandatory, so that no one
- // accidently forgets to write filter
- // outputMgr.write(line, this);
+ // TODO: For now, let's make filter mandatory, so that no one accidently forgets to write filter
+ // outputManager.write(line, this);
}
}
@@ -120,60 +169,10 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
}
- public boolean monitor() {
- if (isReady()) {
- logger.info("Starting thread. " + getShortDescription());
- thread = new Thread(this, getNameForThread());
- thread.start();
- return true;
- } else {
- return false;
- }
- }
-
- public void checkIn(InputMarker inputMarker) {
- // Default implementation is to ignore.
- }
-
- /**
- * This is generally used by final checkin
- */
- public void checkIn() {
- }
-
- public boolean isReady() {
- return true;
- }
-
- public boolean isTail() {
- return tail;
- }
-
- public void setTail(boolean tail) {
- this.tail = tail;
- }
-
- public boolean isUseEventMD5() {
- return useEventMD5;
- }
-
- public void setUseEventMD5(boolean useEventMD5) {
- this.useEventMD5 = useEventMD5;
- }
-
- public boolean isGenEventMD5() {
- return genEventMD5;
- }
-
- public void setGenEventMD5(boolean genEventMD5) {
- this.genEventMD5 = genEventMD5;
- }
-
@Override
public void setDrain(boolean drain) {
- logger.info("Request to drain. " + getShortDescription());
+ LOG.info("Request to drain. " + getShortDescription());
super.setDrain(drain);
- ;
try {
thread.interrupt();
} catch (Throwable t) {
@@ -181,38 +180,36 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
}
- public Filter getFirstFilter() {
- return firstFilter;
- }
-
- public void setFirstFilter(Filter filter) {
- firstFilter = filter;
+ public void addMetricsContainers(List<MetricData> metricsList) {
+ super.addMetricsContainers(metricsList);
+ if (firstFilter != null) {
+ firstFilter.addMetricsContainers(metricsList);
+ }
+ metricsList.add(readBytesMetric);
}
- public void setInputMgr(InputMgr inputMgr) {
- this.inputMgr = inputMgr;
- }
+ @Override
+ public void logStat() {
+ super.logStat();
+ logStatForMetric(readBytesMetric, "Stat: Bytes Read");
- public void setOutputMgr(OutputMgr outputMgr) {
- this.outputMgr = outputMgr;
+ if (firstFilter != null) {
+ firstFilter.logStat();
+ }
}
- public String getFilePath() {
- return filePath;
- }
+ public abstract void checkIn(InputMarker inputMarker);
- public void setFilePath(String filePath) {
- this.filePath = filePath;
- }
+ public abstract void lastCheckIn();
public void close() {
- logger.info("Close called. " + getShortDescription());
+ LOG.info("Close called. " + getShortDescription());
try {
if (firstFilter != null) {
firstFilter.close();
} else {
- outputMgr.close();
+ outputManager.close();
}
} catch (Throwable t) {
// Ignore
@@ -220,86 +217,60 @@ public abstract class Input extends ConfigBlock implements Runnable {
isClosed = true;
}
- public void setClosed(boolean isClosed) {
- this.isClosed = isClosed;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-
- @Override
- public void loadConfig(Map<String, Object> map) {
- super.loadConfig(map);
- String typeValue = getStringValue("type");
- if (typeValue != null) {
- // Explicitly add type and value to field list
- contextFields.put("type", typeValue);
- @SuppressWarnings("unchecked")
- Map<String, Object> addFields = (Map<String, Object>) map
- .get("add_fields");
- if (addFields == null) {
- addFields = new HashMap<String, Object>();
- map.put("add_fields", addFields);
- }
- addFields.put("type", typeValue);
- }
+ public boolean isTail() {
+ return tail;
}
- @Override
- public String getShortDescription() {
- return null;
+ public boolean isUseEventMD5() {
+ return useEventMD5;
}
- @Override
- public void logStat() {
- super.logStat();
- logStatForMetric(readBytesMetric, "Stat: Bytes Read");
-
- if (firstFilter != null) {
- firstFilter.logStat();
- }
+ public boolean isGenEventMD5() {
+ return genEventMD5;
}
- @Override
- public String toString() {
- return getShortDescription();
+ public Filter getFirstFilter() {
+ return firstFilter;
}
- public void rollOver() {
- // Only some inputs support it. E.g. InputFile
+ public String getFilePath() {
+ return filePath;
}
- public String getType() {
- return type;
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
}
- public void setType(String type) {
- this.type = type;
+ public void setClosed(boolean isClosed) {
+ this.isClosed = isClosed;
}
- public Date getEventTime() {
- return null;
+ public boolean isClosed() {
+ return isClosed;
}
public List<Output> getOutputList() {
return outputList;
}
-
- public void addOutput(Output output) {
- outputList.add(output);
- }
-
- public void addMetricsContainers(List<MetricCount> metricsList) {
- super.addMetricsContainers(metricsList);
- if (firstFilter != null) {
- firstFilter.addMetricsContainers(metricsList);
- }
- metricsList.add(readBytesMetric);
- }
public Thread getThread(){
return thread;
}
+ @Override
+ public String getNameForThread() {
+ if (filePath != null) {
+ try {
+ return (type + "=" + (new File(filePath)).getName());
+ } catch (Throwable ex) {
+ LOG.warn("Couldn't get basename for filePath=" + filePath, ex);
+ }
+ }
+ return super.getNameForThread() + ":" + type;
+ }
+
+ @Override
+ public String toString() {
+ return getShortDescription();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index c9f5ded..3737839 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -19,528 +19,99 @@
package org.apache.ambari.logfeeder.input;
import java.io.BufferedReader;
-import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.solr.common.util.Base64;
-public class InputFile extends Input {
- private static final Logger logger = Logger.getLogger(InputFile.class);
-
- private String logPath = null;
- private boolean isStartFromBegining = true;
-
- private boolean isReady = false;
- private File[] logPathFiles = null;
- private Object fileKey = null;
- private String base64FileKey = null;
-
- private boolean isRolledOver = false;
- private boolean addWildCard = false;
-
- private long lastCheckPointTimeMS = 0;
- private int checkPointIntervalMS = 5 * 1000; // 5 seconds
- private RandomAccessFile checkPointWriter = null;
- private Map<String, Object> jsonCheckPoint = null;
-
- private File checkPointFile = null;
-
- private InputMarker lastCheckPointInputMarker = null;
-
- private String checkPointExtension = ".cp";
-
- @Override
- public void init() throws Exception {
- logger.info("init() called");
- statMetric.metricsName = "input.files.read_lines";
- readBytesMetric.metricsName = "input.files.read_bytes";
- checkPointExtension = LogFeederUtil.getStringProperty(
- "logfeeder.checkpoint.extension", checkPointExtension);
-
- // Let's close the file and set it to true after we start monitoring it
- setClosed(true);
- logPath = getStringValue("path");
- tail = getBooleanValue("tail", tail);
- addWildCard = getBooleanValue("add_wild_card", addWildCard);
- checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
- checkPointIntervalMS);
-
- if (logPath == null || logPath.isEmpty()) {
- logger.error("path is empty for file input. "
- + getShortDescription());
- return;
- }
-
- String startPosition = getStringValue("start_position");
- if (StringUtils.isEmpty(startPosition)
- || startPosition.equalsIgnoreCase("beginning")
- || startPosition.equalsIgnoreCase("begining")) {
- isStartFromBegining = true;
- }
-
- if (!tail) {
- // start position end doesn't apply if we are not tailing
- isStartFromBegining = true;
- }
-
- setFilePath(logPath);
- boolean isFileReady = isReady();
-
- logger.info("File to monitor " + logPath + ", tail=" + tail
- + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
-
- super.init();
- }
+public class InputFile extends AbstractInputFile {
@Override
public boolean isReady() {
if (!isReady) {
// Let's try to check whether the file is available
- logPathFiles = getActualFiles(logPath);
- if (logPathFiles != null && logPathFiles.length > 0
- && logPathFiles[0].isFile()) {
-
- if (isTail() && logPathFiles.length > 1) {
- logger.warn("Found multiple files (" + logPathFiles.length
- + ") for the file filter " + filePath
- + ". Will use only the first one. Using "
- + logPathFiles[0].getAbsolutePath());
+ logFiles = getActualFiles(logPath);
+ if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
+ if (tail && logFiles.length > 1) {
+ LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
+ ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
}
- logger.info("File filter " + filePath + " expanded to "
- + logPathFiles[0].getAbsolutePath());
+ LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
isReady = true;
} else {
- logger.debug(logPath + " file doesn't exist. Ignoring for now");
+ LOG.debug(logPath + " file doesn't exist. Ignoring for now");
}
}
return isReady;
}
private File[] getActualFiles(String searchPath) {
- if (addWildCard) {
- if (!searchPath.endsWith("*")) {
- searchPath = searchPath + "*";
- }
- }
- File checkFile = new File(searchPath);
- if (checkFile.isFile()) {
- return new File[]{checkFile};
+ File searchFile = new File(searchPath);
+ if (searchFile.isFile()) {
+ return new File[]{searchFile};
+ } else {
+ FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
+ return searchFile.getParentFile().listFiles(fileFilter);
}
- // Let's do wild card search
- // First check current folder
- File checkFiles[] = findFileForWildCard(searchPath, new File("."));
- if (checkFiles == null || checkFiles.length == 0) {
- // Let's check from the parent folder
- File parentDir = (new File(searchPath)).getParentFile();
- if (parentDir != null) {
- String wildCard = (new File(searchPath)).getName();
- checkFiles = findFileForWildCard(wildCard, parentDir);
- }
- }
- return checkFiles;
- }
-
- private File[] findFileForWildCard(String searchPath, File dir) {
- logger.debug("findFileForWildCard(). filePath=" + searchPath + ", dir="
- + dir + ", dir.fullpath=" + dir.getAbsolutePath());
- FileFilter fileFilter = new WildcardFileFilter(searchPath);
- return dir.listFiles(fileFilter);
- }
-
- @Override
- synchronized public void checkIn(InputMarker inputMarker) {
- super.checkIn(inputMarker);
- if (checkPointWriter != null) {
- try {
- int lineNumber = LogFeederUtil.objectToInt(
- jsonCheckPoint.get("line_number"), 0, "line_number");
- if (lineNumber > inputMarker.lineNumber) {
- // Already wrote higher line number for this input
- return;
- }
- // If interval is greater than last checkPoint time, then write
- long currMS = System.currentTimeMillis();
- if (!isClosed()
- && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
- // Let's save this one so we can update the check point file
- // on flush
- lastCheckPointInputMarker = inputMarker;
- return;
- }
- lastCheckPointTimeMS = currMS;
-
- jsonCheckPoint.put("line_number", ""
- + new Integer(inputMarker.lineNumber));
- jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
- jsonCheckPoint.put("last_write_time_date", new Date());
-
- String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
-
- // Let's rewind
- checkPointWriter.seek(0);
- checkPointWriter.writeInt(jsonStr.length());
- checkPointWriter.write(jsonStr.getBytes());
-
- if (isClosed()) {
- final String LOG_MESSAGE_KEY = this.getClass()
- .getSimpleName() + "_FINAL_CHECKIN";
- LogFeederUtil.logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Wrote final checkPoint, input="
- + getShortDescription()
- + ", checkPointFile="
- + checkPointFile.getAbsolutePath()
- + ", checkPoint=" + jsonStr, null, logger,
- Level.INFO);
- }
- } catch (Throwable t) {
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_CHECKIN_EXCEPTION";
- LogFeederUtil
- .logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Caught exception checkIn. , input="
- + getShortDescription(), t, logger,
- Level.ERROR);
- }
- }
-
- }
-
- @Override
- public void checkIn() {
- super.checkIn();
- if (lastCheckPointInputMarker != null) {
- checkIn(lastCheckPointInputMarker);
- }
- }
-
- @Override
- public void rollOver() {
- logger.info("Marking this input file for rollover. "
- + getShortDescription());
- isRolledOver = true;
}
@Override
void start() throws Exception {
-
- if (logPathFiles == null || logPathFiles.length == 0) {
- return;
- }
boolean isProcessFile = getBooleanValue("process_file", true);
if (isProcessFile) {
- if (isTail()) {
- processFile(logPathFiles[0]);
+ if (tail) {
+ processFile(logFiles[0]);
} else {
- for (File file : logPathFiles) {
+ for (File file : logFiles) {
try {
processFile(file);
if (isClosed() || isDrain()) {
- logger.info("isClosed or isDrain. Now breaking loop.");
+ LOG.info("isClosed or isDrain. Now breaking loop.");
break;
}
} catch (Throwable t) {
- logger.error("Error processing file=" + file.getAbsolutePath(), t);
+ LOG.error("Error processing file=" + file.getAbsolutePath(), t);
}
}
}
close();
- }else{
- copyFiles(logPathFiles);
+ } else {
+ copyFiles(logFiles);
}
-
}
@Override
- public void close() {
- super.close();
- logger.info("close() calling checkPoint checkIn(). "
- + getShortDescription());
- checkIn();
- }
-
- private void processFile(File logPathFile) throws FileNotFoundException,
- IOException {
- logger.info("Monitoring logPath=" + logPath + ", logPathFile="
- + logPathFile);
- BufferedReader br = null;
- checkPointFile = null;
- checkPointWriter = null;
- jsonCheckPoint = null;
- int resumeFromLineNumber = 0;
-
- int lineCount = 0;
- try {
- setFilePath(logPathFile.getAbsolutePath());
- br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logPathFile));
-
- // Whether to send to output from the beginning.
- boolean resume = isStartFromBegining;
-
- // Seems FileWatch is not reliable, so let's only use file key comparison
- fileKey = getFileKey(logPathFile);
- base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
- .getBytes());
- logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey
- + ". " + getShortDescription());
-
- if (isTail()) {
- try {
- logger.info("Checking existing checkpoint file. "
- + getShortDescription());
-
- String fileBase64 = Base64.byteArrayToBase64(fileKey
- .toString().getBytes());
- String checkPointFileName = fileBase64
- + checkPointExtension;
- File checkPointFolder = inputMgr.getCheckPointFolderFile();
- checkPointFile = new File(checkPointFolder,
- checkPointFileName);
- checkPointWriter = new RandomAccessFile(checkPointFile,
- "rw");
-
- try {
- int contentSize = checkPointWriter.readInt();
- byte b[] = new byte[contentSize];
- int readSize = checkPointWriter.read(b, 0, contentSize);
- if (readSize != contentSize) {
- logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
- + contentSize
- + ", read="
- + readSize
- + ", checkPointFile="
- + checkPointFile
- + ", input=" + getShortDescription());
- } else {
- String jsonCheckPointStr = new String(b, 0, readSize);
- jsonCheckPoint = LogFeederUtil
- .toJSONObject(jsonCheckPointStr);
-
- resumeFromLineNumber = LogFeederUtil.objectToInt(
- jsonCheckPoint.get("line_number"), 0,
- "line_number");
-
- if (resumeFromLineNumber > 0) {
- // Let's read from last line read
- resume = false;
- }
- logger.info("CheckPoint. checkPointFile="
- + checkPointFile + ", json="
- + jsonCheckPointStr
- + ", resumeFromLineNumber="
- + resumeFromLineNumber + ", resume="
- + resume);
- }
- } catch (EOFException eofEx) {
- logger.info("EOFException. Will reset checkpoint file "
- + checkPointFile.getAbsolutePath() + " for "
- + getShortDescription());
- }
- if (jsonCheckPoint == null) {
- // This seems to be first time, so creating the initial
- // checkPoint object
- jsonCheckPoint = new HashMap<String, Object>();
- jsonCheckPoint.put("file_path", filePath);
- jsonCheckPoint.put("file_key", fileBase64);
- }
-
- } catch (Throwable t) {
- logger.error(
- "Error while configuring checkpoint file. Will reset file. checkPointFile="
- + checkPointFile, t);
- }
- }
-
- setClosed(false);
- int sleepStep = 2;
- int sleepIteration = 0;
- while (true) {
- try {
- if (isDrain()) {
- break;
- }
-
- String line = br.readLine();
- if (line == null) {
- if (!resume) {
- resume = true;
- }
- sleepIteration++;
- try {
- // Since FileWatch service is not reliable, we will check
- // file inode every n seconds after no write
- if (sleepIteration > 4) {
- Object newFileKey = getFileKey(logPathFile);
- if (newFileKey != null) {
- if (fileKey == null
- || !newFileKey.equals(fileKey)) {
- logger.info("File key is different. Calling rollover. oldKey="
- + fileKey
- + ", newKey="
- + newFileKey
- + ". "
- + getShortDescription());
- // File has rotated.
- rollOver();
- }
- }
- }
- // Flush on the second iteration
- if (!tail && sleepIteration >= 2) {
- logger.info("End of file. Done with filePath="
- + logPathFile.getAbsolutePath()
- + ", lineCount=" + lineCount);
- flush();
- break;
- } else if (sleepIteration == 2) {
- flush();
- } else if (sleepIteration >= 2) {
- if (isRolledOver) {
- isRolledOver = false;
- // Close existing file
- try {
- logger.info("File is rolled over. Closing current open file."
- + getShortDescription()
- + ", lineCount=" + lineCount);
- br.close();
- } catch (Exception ex) {
- logger.error("Error closing file"
- + getShortDescription());
- break;
- }
- try {
- logger.info("Opening new rolled over file."
- + getShortDescription());
- br = new BufferedReader(LogsearchReaderFactory.
- INSTANCE.getReader(logPathFile));
- lineCount = 0;
- fileKey = getFileKey(logPathFile);
- base64FileKey = Base64
- .byteArrayToBase64(fileKey
- .toString().getBytes());
- logger.info("fileKey=" + fileKey
- + ", base64=" + base64FileKey
- + ", " + getShortDescription());
- } catch (Exception ex) {
- logger.error("Error opening rolled over file. "
- + getShortDescription());
- // Let's add this to monitoring and exit this thread
- logger.info("Added input to not ready list."
- + getShortDescription());
- isReady = false;
- inputMgr.addToNotReady(this);
- break;
- }
- logger.info("File is successfully rolled over. "
- + getShortDescription());
- continue;
- }
- }
- Thread.sleep(sleepStep * 1000);
- sleepStep = (sleepStep * 2);
- sleepStep = sleepStep > 10 ? 10 : sleepStep;
- } catch (InterruptedException e) {
- logger.info("Thread interrupted."
- + getShortDescription());
- }
- } else {
- lineCount++;
- sleepStep = 1;
- sleepIteration = 0;
-
- if (!resume && lineCount > resumeFromLineNumber) {
- logger.info("Resuming to read from last line. lineCount="
- + lineCount
- + ", input="
- + getShortDescription());
- resume = true;
- }
- if (resume) {
- InputMarker marker = new InputMarker();
- marker.base64FileKey = base64FileKey;
- marker.input = this;
- marker.lineNumber = lineCount;
- outputLine(line, marker);
- }
- }
- } catch (Throwable t) {
- final String LOG_MESSAGE_KEY = this.getClass()
- .getSimpleName() + "_READ_LOOP_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Caught exception in read loop. lineNumber="
- + lineCount + ", input="
- + getShortDescription(), t, logger,
- Level.ERROR);
-
- }
- }
- } finally {
- if (br != null) {
- logger.info("Closing reader." + getShortDescription()
- + ", lineCount=" + lineCount);
- try {
- br.close();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-
- static public Object getFileKey(File file) {
- try {
- Path fileFullPath = Paths.get(file.getAbsolutePath());
- if (fileFullPath != null) {
- BasicFileAttributes basicAttr = Files.readAttributes(
- fileFullPath, BasicFileAttributes.class);
- return basicAttr.fileKey();
- }
- } catch (Throwable ex) {
- logger.error("Error getting file attributes for file=" + file, ex);
- }
- return file.toString();
+ protected BufferedReader openLogFile(File logFile) throws FileNotFoundException {
+ BufferedReader br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logFile));
+ fileKey = getFileKey(logFile);
+ base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+ LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
+ return br;
}
@Override
- public String getShortDescription() {
- return "input:source="
- + getStringValue("source")
- + ", path="
- + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0]
- .getAbsolutePath() : getStringValue("path"));
+ protected Object getFileKey(File logFile) {
+ return FileUtil.getFileKey(logFile);
}
-
- public void copyFiles(File[] files) {
+
+ private void copyFiles(File[] files) {
boolean isCopyFile = getBooleanValue("copy_file", false);
if (isCopyFile && files != null) {
for (File file : files) {
try {
- InputMarker marker = new InputMarker();
- marker.input = this;
- outputMgr.copyFile(file, marker);
+ InputMarker marker = new InputMarker(this, null, 0);
+ outputManager.copyFile(file, marker);
if (isClosed() || isDrain()) {
- logger.info("isClosed or isDrain. Now breaking loop.");
+ LOG.info("isClosed or isDrain. Now breaking loop.");
break;
}
} catch (Throwable t) {
- logger.error("Error processing file=" + file.getAbsolutePath(), t);
+ LOG.error("Error processing file=" + file.getAbsolutePath(), t);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
new file mode 100644
index 0000000..8e70850
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputManager {
+ private static final Logger LOG = Logger.getLogger(InputManager.class);
+
+ private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints";
+ public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp";
+
+ private List<Input> inputList = new ArrayList<Input>();
+ private Set<Input> notReadyList = new HashSet<Input>();
+
+ private boolean isDrain = false;
+ private boolean isAnyInputTail = false;
+
+ private File checkPointFolderFile = null;
+
+ private MetricData filesCountMetric = new MetricData("input.files.count", true);
+
+ private String checkPointExtension;
+
+ private Thread inputIsReadyMonitor = null;
+
+ public List<Input> getInputList() {
+ return inputList;
+ }
+
+ public void add(Input input) {
+ inputList.add(input);
+ }
+
+ public void removeInput(Input input) {
+ LOG.info("Trying to remove from inputList. " + input.getShortDescription());
+ Iterator<Input> iter = inputList.iterator();
+ while (iter.hasNext()) {
+ Input iterInput = iter.next();
+ if (iterInput.equals(input)) {
+ LOG.info("Removing Input from inputList. " + input.getShortDescription());
+ iter.remove();
+ }
+ }
+ }
+
+ private int getActiveFilesCount() {
+ int count = 0;
+ for (Input input : inputList) {
+ if (input.isReady()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public void init() {
+ checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", DEFAULT_CHECKPOINT_EXTENSION);
+ for (Input input : inputList) {
+ try {
+ input.init();
+ if (input.isTail()) {
+ isAnyInputTail = true;
+ }
+ } catch (Exception e) {
+ LOG.error("Error initializing input. " + input.getShortDescription(), e);
+ }
+ }
+
+ if (isAnyInputTail) {
+ LOG.info("Determining valid checkpoint folder");
+ boolean isCheckPointFolderValid = false;
+ // We need to keep track of the files we are reading.
+ String checkPointFolder = LogFeederUtil.getStringProperty("logfeeder.checkpoint.folder");
+ if (!StringUtils.isEmpty(checkPointFolder)) {
+ checkPointFolderFile = new File(checkPointFolder);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ }
+ if (!isCheckPointFolderValid) {
+ // Let's try home folder
+ String userHome = LogFeederUtil.getStringProperty("user.home");
+ if (userHome != null) {
+ checkPointFolderFile = new File(userHome, CHECKPOINT_SUBFOLDER_NAME);
+ LOG.info("Checking if home folder can be used for checkpoints. Folder=" + checkPointFolderFile);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ }
+ }
+ if (!isCheckPointFolderValid) {
+ // Let's use tmp folder
+ String tmpFolder = LogFeederUtil.getStringProperty("java.io.tmpdir");
+ if (tmpFolder == null) {
+ tmpFolder = "/tmp";
+ }
+ checkPointFolderFile = new File(tmpFolder, CHECKPOINT_SUBFOLDER_NAME);
+ LOG.info("Checking if tmps folder can be used for checkpoints. Folder=" + checkPointFolderFile);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ if (isCheckPointFolderValid) {
+ LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check points. This is not recommended." +
+ "Please set logfeeder.checkpoint.folder property");
+ }
+ }
+
+ if (isCheckPointFolderValid) {
+ LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints");
+ }
+ }
+
+ }
+
+ private boolean verifyCheckPointFolder(File folderPathFile) {
+ if (!folderPathFile.exists()) {
+ try {
+ if (!folderPathFile.mkdir()) {
+ LOG.warn("Error creating folder for check point. folder=" + folderPathFile);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Error creating folder for check point. folder=" + folderPathFile, t);
+ }
+ }
+
+ if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+ // Let's check whether we can create a file
+ File testFile = new File(folderPathFile, UUID.randomUUID().toString());
+ try {
+ testFile.createNewFile();
+ return testFile.delete();
+ } catch (IOException e) {
+ LOG.warn("Couldn't create test file in " + folderPathFile.getAbsolutePath() + " for checkPoint", e);
+ }
+ }
+ return false;
+ }
+
+ public File getCheckPointFolderFile() {
+ return checkPointFolderFile;
+ }
+
+ public void monitor() {
+ for (Input input : inputList) {
+ if (input.isReady()) {
+ input.monitor();
+ } else {
+ if (input.isTail()) {
+ LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
+ "So it might not be an issue. " + input.getShortDescription());
+ notReadyList.add(input);
+ } else {
+ LOG.info("Input is not ready, so going to ignore it " + input.getShortDescription());
+ }
+ }
+ }
+ // Start the monitoring thread if any file is in tail mode
+ if (isAnyInputTail) {
+ inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
+ @Override
+ public void run() {
+ LOG.info("Going to monitor for these missing files: " + notReadyList.toString());
+ while (true) {
+ if (isDrain) {
+ LOG.info("Exiting missing file monitor.");
+ break;
+ }
+ try {
+ Iterator<Input> iter = notReadyList.iterator();
+ while (iter.hasNext()) {
+ Input input = iter.next();
+ try {
+ if (input.isReady()) {
+ input.monitor();
+ iter.remove();
+ }
+ } catch (Throwable t) {
+ LOG.error("Error while enabling monitoring for input. " + input.getShortDescription());
+ }
+ }
+ Thread.sleep(30 * 1000);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ };
+ inputIsReadyMonitor.start();
+ }
+ }
+
+ void addToNotReady(Input notReadyInput) {
+ notReadyList.add(notReadyInput);
+ }
+
+ public void addMetricsContainers(List<MetricData> metricsList) {
+ for (Input input : inputList) {
+ input.addMetricsContainers(metricsList);
+ }
+ filesCountMetric.value = getActiveFilesCount();
+ metricsList.add(filesCountMetric);
+ }
+
+ public void logStats() {
+ for (Input input : inputList) {
+ input.logStat();
+ }
+
+ filesCountMetric.value = getActiveFilesCount();
+ LogFeederUtil.logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", "");
+ }
+
+
+ public void cleanCheckPointFiles() {
+
+ if (checkPointFolderFile == null) {
+ LOG.info("Will not clean checkPoint files. checkPointFolderFile=" + checkPointFolderFile);
+ return;
+ }
+ LOG.info("Cleaning checkPoint files. checkPointFolderFile=" + checkPointFolderFile.getAbsolutePath());
+ try {
+ // Loop over the check point files and if filePath is not present, then move to closed
+ String searchPath = "*" + checkPointExtension;
+ FileFilter fileFilter = new WildcardFileFilter(searchPath);
+ File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
+ int totalCheckFilesDeleted = 0;
+ for (File checkPointFile : checkPointFiles) {
+ try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) {
+ int contentSize = checkPointReader.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointReader.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read="
+ + readSize + ", checkPointFile=" + checkPointFile);
+ } else {
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ Map<String, Object> jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+ String logFilePath = (String) jsonCheckPoint.get("file_path");
+ String logFileKey = (String) jsonCheckPoint.get("file_key");
+ if (logFilePath != null && logFileKey != null) {
+ boolean deleteCheckPointFile = false;
+ File logFile = new File(logFilePath);
+ if (logFile.exists()) {
+ Object fileKeyObj = FileUtil.getFileKey(logFile);
+ String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
+ if (!logFileKey.equals(fileBase64)) {
+ deleteCheckPointFile = true;
+ LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
+ logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+ }
+ } else {
+ LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
+ checkPointFile.getAbsolutePath());
+ deleteCheckPointFile = true;
+ }
+ if (deleteCheckPointFile) {
+ LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath);
+ checkPointFile.delete();
+ totalCheckFilesDeleted++;
+ }
+ }
+ }
+ } catch (EOFException eof) {
+ LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile);
+ } catch (Throwable t) {
+ LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
+ }
+ }
+ LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" +
+ checkPointFolderFile.getAbsolutePath());
+
+ } catch (Throwable t) {
+ LOG.error("Error while cleaning checkPointFiles", t);
+ }
+ }
+
+ public void waitOnAllInputs() {
+ //wait on inputs
+ for (Input input : inputList) {
+ if (input != null) {
+ Thread inputThread = input.getThread();
+ if (inputThread != null) {
+ try {
+ inputThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ // wait on monitor
+ if (inputIsReadyMonitor != null) {
+ try {
+ this.close();
+ inputIsReadyMonitor.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public void checkInAll() {
+ for (Input input : inputList) {
+ input.lastCheckIn();
+ }
+ }
+
+ public void close() {
+ for (Input input : inputList) {
+ try {
+ input.setDrain(true);
+ } catch (Throwable t) {
+ LOG.error("Error while draining. input=" + input.getShortDescription(), t);
+ }
+ }
+ isDrain = true;
+
+ // Need to get this value from property
+ int iterations = 30;
+ int waitTimeMS = 1000;
+ for (int i = 0; i < iterations; i++) {
+ boolean allClosed = true;
+ for (Input input : inputList) {
+ if (!input.isClosed()) {
+ try {
+ allClosed = false;
+ LOG.warn("Waiting for input to close. " + input.getShortDescription() + ", " + (iterations - i) + " more seconds");
+ Thread.sleep(waitTimeMS);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ if (allClosed) {
+ LOG.info("All inputs are closed. Iterations=" + i);
+ return;
+ }
+ }
+
+ LOG.warn("Some inputs were not closed after " + iterations + " iterations");
+ for (Input input : inputList) {
+ if (!input.isClosed()) {
+ LOG.warn("Input not closed. Will ignore it." + input.getShortDescription());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
index 48a7f1d..6767687 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
@@ -23,13 +23,18 @@ package org.apache.ambari.logfeeder.input;
* This file contains the file inode, line number of the log currently been read
*/
public class InputMarker {
- public int lineNumber = 0;
- public Input input;
- public String base64FileKey = null;
-
+ public final Input input;
+ public final String base64FileKey;
+ public final int lineNumber;
+
+ public InputMarker(Input input, String base64FileKey, int lineNumber) {
+ this.input = input;
+ this.base64FileKey = base64FileKey;
+ this.lineNumber = lineNumber;
+ }
+
@Override
public String toString() {
- return "InputMarker [lineNumber=" + lineNumber + ", input="
- + input.getShortDescription() + "]";
+ return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]";
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
deleted file mode 100644
index b18c9b0..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.input;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.log4j.Logger;
-import org.apache.solr.common.util.Base64;
-
-public class InputMgr {
- private static final Logger logger = Logger.getLogger(InputMgr.class);
-
- private List<Input> inputList = new ArrayList<Input>();
- private Set<Input> notReadyList = new HashSet<Input>();
-
- private boolean isDrain = false;
- private boolean isAnyInputTail = false;
-
- private String checkPointSubFolderName = "logfeeder_checkpoints";
- private File checkPointFolderFile = null;
-
- private MetricCount filesCountMetric = new MetricCount();
-
- private String checkPointExtension = ".cp";
-
- private Thread inputIsReadyMonitor = null;
-
- public List<Input> getInputList() {
- return inputList;
- }
-
- public void add(Input input) {
- inputList.add(input);
- }
-
- public void removeInput(Input input) {
- logger.info("Trying to remove from inputList. "
- + input.getShortDescription());
- Iterator<Input> iter = inputList.iterator();
- while (iter.hasNext()) {
- Input iterInput = iter.next();
- if (iterInput.equals(input)) {
- logger.info("Removing Input from inputList. "
- + input.getShortDescription());
- iter.remove();
- }
- }
- }
-
- public int getActiveFilesCount() {
- int count = 0;
- for (Input input : inputList) {
- if (input.isReady()) {
- count++;
- }
- }
- return count;
- }
-
- public void init() {
- filesCountMetric.metricsName = "input.files.count";
- filesCountMetric.isPointInTime = true;
-
- checkPointExtension = LogFeederUtil.getStringProperty(
- "logfeeder.checkpoint.extension", checkPointExtension);
- for (Input input : inputList) {
- try {
- input.init();
- if (input.isTail()) {
- isAnyInputTail = true;
- }
- } catch (Exception e) {
- logger.error(
- "Error initializing input. "
- + input.getShortDescription(), e);
- }
- }
-
- if (isAnyInputTail) {
- logger.info("Determining valid checkpoint folder");
- boolean isCheckPointFolderValid = false;
- // We need to keep track of the files we are reading.
- String checkPointFolder = LogFeederUtil
- .getStringProperty("logfeeder.checkpoint.folder");
- if (checkPointFolder != null && !checkPointFolder.isEmpty()) {
- checkPointFolderFile = new File(checkPointFolder);
- isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
- }
- if (!isCheckPointFolderValid) {
- // Let's try home folder
- String userHome = LogFeederUtil.getStringProperty("user.home");
- if (userHome != null) {
- checkPointFolderFile = new File(userHome,
- checkPointSubFolderName);
- logger.info("Checking if home folder can be used for checkpoints. Folder="
- + checkPointFolderFile);
- isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
- }
- }
- if (!isCheckPointFolderValid) {
- // Let's use tmp folder
- String tmpFolder = LogFeederUtil
- .getStringProperty("java.io.tmpdir");
- if (tmpFolder == null) {
- tmpFolder = "/tmp";
- }
- checkPointFolderFile = new File(tmpFolder,
- checkPointSubFolderName);
- logger.info("Checking if tmps folder can be used for checkpoints. Folder="
- + checkPointFolderFile);
- isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
- if (isCheckPointFolderValid) {
- logger.warn("Using tmp folder "
- + checkPointFolderFile
- + " to store check points. This is not recommended."
- + "Please set logfeeder.checkpoint.folder property");
- }
- }
-
- if (isCheckPointFolderValid) {
- logger.info("Using folder " + checkPointFolderFile
- + " for storing checkpoints");
- }
- }
-
- }
-
- public File getCheckPointFolderFile() {
- return checkPointFolderFile;
- }
-
- private boolean verifyCheckPointFolder(File folderPathFile) {
- if (!folderPathFile.exists()) {
- // Create the folder
- try {
- if (!folderPathFile.mkdir()) {
- logger.warn("Error creating folder for check point. folder="
- + folderPathFile);
- }
- } catch (Throwable t) {
- logger.warn("Error creating folder for check point. folder="
- + folderPathFile, t);
- }
- }
-
- if (folderPathFile.exists() && folderPathFile.isDirectory()) {
- // Let's check whether we can create a file
- File testFile = new File(folderPathFile, UUID.randomUUID()
- .toString());
- try {
- testFile.createNewFile();
- return testFile.delete();
- } catch (IOException e) {
- logger.warn(
- "Couldn't create test file in "
- + folderPathFile.getAbsolutePath()
- + " for checkPoint", e);
- }
- }
- return false;
- }
-
- public void monitor() {
- for (Input input : inputList) {
- if (input.isReady()) {
- input.monitor();
- } else {
- if (input.isTail()) {
- logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. "
- + input.getShortDescription());
- notReadyList.add(input);
- } else {
- logger.info("Input is not ready, so going to ignore it "
- + input.getShortDescription());
- }
- }
- }
- // Start the monitoring thread if any file is in tail mode
- if (isAnyInputTail) {
- inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
- @Override
- public void run() {
- logger.info("Going to monitor for these missing files: "
- + notReadyList.toString());
- while (true) {
- if (isDrain) {
- logger.info("Exiting missing file monitor.");
- break;
- }
- try {
- Iterator<Input> iter = notReadyList.iterator();
- while (iter.hasNext()) {
- Input input = iter.next();
- try {
- if (input.isReady()) {
- input.monitor();
- iter.remove();
- }
- } catch (Throwable t) {
- logger.error("Error while enabling monitoring for input. "
- + input.getShortDescription());
- }
- }
- Thread.sleep(30 * 1000);
- } catch (Throwable t) {
- // Ignore
- }
- }
- }
- };
- inputIsReadyMonitor.start();
- }
- }
-
- public void addToNotReady(Input notReadyInput) {
- notReadyList.add(notReadyInput);
- }
-
- public void addMetricsContainers(List<MetricCount> metricsList) {
- for (Input input : inputList) {
- input.addMetricsContainers(metricsList);
- }
- filesCountMetric.count = getActiveFilesCount();
- metricsList.add(filesCountMetric);
- }
-
- public void logStats() {
- for (Input input : inputList) {
- input.logStat();
- }
-
- filesCountMetric.count = getActiveFilesCount();
- LogFeederUtil.logStatForMetric(filesCountMetric,
- "Stat: Files Monitored Count", null);
- }
-
- public void close() {
- for (Input input : inputList) {
- try {
- input.setDrain(true);
- } catch (Throwable t) {
- logger.error(
- "Error while draining. input="
- + input.getShortDescription(), t);
- }
- }
- isDrain = true;
-
- // Need to get this value from property
- int iterations = 30;
- int waitTimeMS = 1000;
- int i = 0;
- boolean allClosed = true;
- for (i = 0; i < iterations; i++) {
- allClosed = true;
- for (Input input : inputList) {
- if (!input.isClosed()) {
- try {
- allClosed = false;
- logger.warn("Waiting for input to close. "
- + input.getShortDescription() + ", "
- + (iterations - i) + " more seconds");
- Thread.sleep(waitTimeMS);
- } catch (Throwable t) {
- // Ignore
- }
- }
- }
- if (allClosed) {
- break;
- }
- }
- if (!allClosed) {
- logger.warn("Some inputs were not closed. Iterations=" + i);
- for (Input input : inputList) {
- if (!input.isClosed()) {
- logger.warn("Input not closed. Will ignore it."
- + input.getShortDescription());
- }
- }
- } else {
- logger.info("All inputs are closed. Iterations=" + i);
- }
-
- }
-
- public void checkInAll() {
- for (Input input : inputList) {
- input.checkIn();
- }
- }
-
- public void cleanCheckPointFiles() {
-
- if (checkPointFolderFile == null) {
- logger.info("Will not clean checkPoint files. checkPointFolderFile="
- + checkPointFolderFile);
- return;
- }
- logger.info("Cleaning checkPoint files. checkPointFolderFile="
- + checkPointFolderFile.getAbsolutePath());
- try {
- // Loop over the check point files and if filePath is not present, then move to closed
- String searchPath = "*" + checkPointExtension;
- FileFilter fileFilter = new WildcardFileFilter(searchPath);
- File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
- int totalCheckFilesDeleted = 0;
- for (File checkPointFile : checkPointFiles) {
- RandomAccessFile checkPointReader = null;
- try {
- checkPointReader = new RandomAccessFile(checkPointFile, "r");
-
- int contentSize = checkPointReader.readInt();
- byte b[] = new byte[contentSize];
- int readSize = checkPointReader.read(b, 0, contentSize);
- if (readSize != contentSize) {
- logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
- + contentSize
- + ", read="
- + readSize
- + ", checkPointFile=" + checkPointFile);
- } else {
- // Create JSON string
- String jsonCheckPointStr = new String(b, 0, readSize);
- Map<String, Object> jsonCheckPoint = LogFeederUtil
- .toJSONObject(jsonCheckPointStr);
-
- String logFilePath = (String) jsonCheckPoint
- .get("file_path");
- String logFileKey = (String) jsonCheckPoint
- .get("file_key");
- if (logFilePath != null && logFileKey != null) {
- boolean deleteCheckPointFile = false;
- File logFile = new File(logFilePath);
- if (logFile.exists()) {
- Object fileKeyObj = InputFile
- .getFileKey(logFile);
- String fileBase64 = Base64
- .byteArrayToBase64(fileKeyObj
- .toString().getBytes());
- if (!logFileKey.equals(fileBase64)) {
- deleteCheckPointFile = true;
- logger.info("CheckPoint clean: File key has changed. old="
- + logFileKey
- + ", new="
- + fileBase64
- + ", filePath="
- + logFilePath
- + ", checkPointFile="
- + checkPointFile.getAbsolutePath());
- }
- } else {
- logger.info("CheckPoint clean: Log file doesn't exist. filePath="
- + logFilePath
- + ", checkPointFile="
- + checkPointFile.getAbsolutePath());
- deleteCheckPointFile = true;
- }
- if (deleteCheckPointFile) {
- logger.info("Deleting CheckPoint file="
- + checkPointFile.getAbsolutePath()
- + ", logFile=" + logFilePath);
- checkPointFile.delete();
- totalCheckFilesDeleted++;
- }
- }
- }
- } catch (EOFException eof) {
- logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. "
- + checkPointFile);
- } catch (Throwable t) {
- logger.error("Error while checking checkPoint file. "
- + checkPointFile, t);
- } finally {
- if (checkPointReader != null) {
- try {
- checkPointReader.close();
- } catch (Throwable t) {
- logger.error("Error closing checkPoint file. "
- + checkPointFile, t);
- }
- }
- }
- }
- logger.info("Deleted " + totalCheckFilesDeleted
- + " checkPoint file(s). checkPointFolderFile="
- + checkPointFolderFile.getAbsolutePath());
-
- } catch (Throwable t) {
- logger.error("Error while cleaning checkPointFiles", t);
- }
- }
-
- public void waitOnAllInputs() {
- //wait on inputs
- if (inputList != null) {
- for (Input input : inputList) {
- if (input != null) {
- Thread inputThread = input.getThread();
- if (inputThread != null) {
- try {
- inputThread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
- }
- }
- // wait on monitor
- if (inputIsReadyMonitor != null) {
- try {
- this.close();
- inputIsReadyMonitor.join();
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index c9d28bd..f560379 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -19,201 +19,57 @@
package org.apache.ambari.logfeeder.input;
import java.io.BufferedReader;
-import java.io.EOFException;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.solr.common.util.Base64;
-public class InputS3File extends Input {
- private static final Logger logger = Logger.getLogger(InputS3File.class);
-
- private String logPath = null;
- private boolean isStartFromBegining = true;
-
- private boolean isReady = false;
- private String[] s3LogPathFiles = null;
- private Object fileKey = null;
- private String base64FileKey = null;
-
- private boolean isRolledOver = false;
- private boolean addWildCard = false;
-
- private long lastCheckPointTimeMS = 0;
- private int checkPointIntervalMS = 5 * 1000; // 5 seconds
- private RandomAccessFile checkPointWriter = null;
- private Map<String, Object> jsonCheckPoint = null;
-
- private File checkPointFile = null;
-
- private InputMarker lastCheckPointInputMarker = null;
-
- private String checkPointExtension = ".cp";
-
-
- @Override
- public void init() throws Exception {
- logger.info("init() called");
- statMetric.metricsName = "input.files.read_lines";
- readBytesMetric.metricsName = "input.files.read_bytes";
- checkPointExtension = LogFeederUtil.getStringProperty(
- "logfeeder.checkpoint.extension", checkPointExtension);
-
- // Let's close the file and set it to true after we start monitoring it
- setClosed(true);
- logPath = getStringValue("path");
- tail = getBooleanValue("tail", tail);
- addWildCard = getBooleanValue("add_wild_card", addWildCard);
- checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
- checkPointIntervalMS);
- if (logPath == null || logPath.isEmpty()) {
- logger.error("path is empty for file input. " + getShortDescription());
- return;
- }
-
- String startPosition = getStringValue("start_position");
- if (StringUtils.isEmpty(startPosition)
- || startPosition.equalsIgnoreCase("beginning")
- || startPosition.equalsIgnoreCase("begining")) {
- isStartFromBegining = true;
- }
-
- if (!tail) {
- // start position end doesn't apply if we are not tailing
- isStartFromBegining = true;
- }
-
- setFilePath(logPath);
- boolean isFileReady = isReady();
-
- logger.info("File to monitor " + logPath + ", tail=" + tail
- + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
-
- super.init();
- }
+public class InputS3File extends AbstractInputFile {
@Override
public boolean isReady() {
if (!isReady) {
// Let's try to check whether the file is available
- s3LogPathFiles = getActualFiles(logPath);
- if (s3LogPathFiles != null && s3LogPathFiles.length > 0) {
- if (isTail() && s3LogPathFiles.length > 1) {
- logger.warn("Found multiple files (" + s3LogPathFiles.length
- + ") for the file filter " + filePath
- + ". Will use only the first one. Using " + s3LogPathFiles[0]);
+ logFiles = getActualFiles(logPath);
+ if (!ArrayUtils.isEmpty(logFiles)) {
+ if (tail && logFiles.length > 1) {
+ LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
+ ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
}
- logger.info("File filter " + filePath + " expanded to "
- + s3LogPathFiles[0]);
+ LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
isReady = true;
} else {
- logger.debug(logPath + " file doesn't exist. Ignoring for now");
+ LOG.debug(logPath + " file doesn't exist. Ignoring for now");
}
}
return isReady;
}
- private String[] getActualFiles(String searchPath) {
+ private File[] getActualFiles(String searchPath) {
// TODO search file on s3
- return new String[] { searchPath };
- }
-
- @Override
- synchronized public void checkIn(InputMarker inputMarker) {
- super.checkIn(inputMarker);
- if (checkPointWriter != null) {
- try {
- int lineNumber = LogFeederUtil.objectToInt(
- jsonCheckPoint.get("line_number"), 0, "line_number");
- if (lineNumber > inputMarker.lineNumber) {
- // Already wrote higher line number for this input
- return;
- }
- // If interval is greater than last checkPoint time, then write
- long currMS = System.currentTimeMillis();
- if (!isClosed()
- && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
- // Let's save this one so we can update the check point file
- // on flush
- lastCheckPointInputMarker = inputMarker;
- return;
- }
- lastCheckPointTimeMS = currMS;
-
- jsonCheckPoint.put("line_number", ""
- + new Integer(inputMarker.lineNumber));
- jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
- jsonCheckPoint.put("last_write_time_date", new Date());
-
- String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
-
- // Let's rewind
- checkPointWriter.seek(0);
- checkPointWriter.writeInt(jsonStr.length());
- checkPointWriter.write(jsonStr.getBytes());
-
- if (isClosed()) {
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_FINAL_CHECKIN";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Wrote final checkPoint, input=" + getShortDescription()
- + ", checkPointFile=" + checkPointFile.getAbsolutePath()
- + ", checkPoint=" + jsonStr, null, logger, Level.INFO);
- }
- } catch (Throwable t) {
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_CHECKIN_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Caught exception checkIn. , input=" + getShortDescription(), t,
- logger, Level.ERROR);
- }
- }
-
- }
-
- @Override
- public void checkIn() {
- super.checkIn();
- if (lastCheckPointInputMarker != null) {
- checkIn(lastCheckPointInputMarker);
- }
- }
-
- @Override
- public void rollOver() {
- logger.info("Marking this input file for rollover. "
- + getShortDescription());
- isRolledOver = true;
+ return new File[] { new File(searchPath) };
}
@Override
void start() throws Exception {
- if (s3LogPathFiles == null || s3LogPathFiles.length == 0) {
+ if (ArrayUtils.isEmpty(logFiles)) {
return;
}
- if (isTail()) {
- processFile(s3LogPathFiles[0]);
+ if (tail) {
+ processFile(logFiles[0]);
} else {
- for (String s3FilePath : s3LogPathFiles) {
+ for (File s3FilePath : logFiles) {
try {
processFile(s3FilePath);
if (isClosed() || isDrain()) {
- logger.info("isClosed or isDrain. Now breaking loop.");
+ LOG.info("isClosed or isDrain. Now breaking loop.");
break;
}
} catch (Throwable t) {
- logger.error("Error processing file=" + s3FilePath, t);
+ LOG.error("Error processing file=" + s3FilePath, t);
}
}
}
@@ -221,244 +77,18 @@ public class InputS3File extends Input {
}
@Override
- public void close() {
- super.close();
- logger.info("close() calling checkPoint checkIn(). "
- + getShortDescription());
- checkIn();
- }
-
- private void processFile(String logPathFile) throws FileNotFoundException,
- IOException {
- logger.info("Monitoring logPath=" + logPath + ", logPathFile="
- + logPathFile);
- BufferedReader br = null;
- checkPointFile = null;
- checkPointWriter = null;
- jsonCheckPoint = null;
- int resumeFromLineNumber = 0;
-
- int lineCount = 0;
- try {
- setFilePath(logPathFile);
- String s3AccessKey = getStringValue("s3_access_key");
- String s3SecretKey = getStringValue("s3_secret_key");
- br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
- if(br==null){
- //log err
- return;
- }
-
- // Whether to send to output from the beginning.
- boolean resume = isStartFromBegining;
-
- // Seems FileWatch is not reliable, so let's only use file key comparison
- fileKey = getFileKey(logPathFile);
- base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
- logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". "
- + getShortDescription());
-
- if (isTail()) {
- try {
- // Let's see if there is a checkpoint for this file
- logger.info("Checking existing checkpoint file. "
- + getShortDescription());
-
- String fileBase64 = Base64.byteArrayToBase64(fileKey.toString()
- .getBytes());
- String checkPointFileName = fileBase64 + checkPointExtension;
- File checkPointFolder = inputMgr.getCheckPointFolderFile();
- checkPointFile = new File(checkPointFolder, checkPointFileName);
- checkPointWriter = new RandomAccessFile(checkPointFile, "rw");
-
- try {
- int contentSize = checkPointWriter.readInt();
- byte b[] = new byte[contentSize];
- int readSize = checkPointWriter.read(b, 0, contentSize);
- if (readSize != contentSize) {
- logger
- .error("Couldn't read expected number of bytes from checkpoint file. expected="
- + contentSize
- + ", read="
- + readSize
- + ", checkPointFile="
- + checkPointFile
- + ", input="
- + getShortDescription());
- } else {
- String jsonCheckPointStr = new String(b, 0, readSize);
- jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
- resumeFromLineNumber = LogFeederUtil.objectToInt(
- jsonCheckPoint.get("line_number"), 0, "line_number");
-
- if (resumeFromLineNumber > 0) {
- // Let's read from last line read
- resume = false;
- }
- logger.info("CheckPoint. checkPointFile=" + checkPointFile
- + ", json=" + jsonCheckPointStr + ", resumeFromLineNumber="
- + resumeFromLineNumber + ", resume=" + resume);
- }
- } catch (EOFException eofEx) {
- logger.info("EOFException. Will reset checkpoint file "
- + checkPointFile.getAbsolutePath() + " for "
- + getShortDescription());
- }
- if (jsonCheckPoint == null) {
- // This seems to be first time, so creating the initial
- // checkPoint object
- jsonCheckPoint = new HashMap<String, Object>();
- jsonCheckPoint.put("file_path", filePath);
- jsonCheckPoint.put("file_key", fileBase64);
- }
-
- } catch (Throwable t) {
- logger.error(
- "Error while configuring checkpoint file. Will reset file. checkPointFile="
- + checkPointFile, t);
- }
- }
-
- setClosed(false);
- int sleepStep = 2;
- int sleepIteration = 0;
- while (true) {
- try {
- if (isDrain()) {
- break;
- }
-
- String line = br.readLine();
- if (line == null) {
- if (!resume) {
- resume = true;
- }
- sleepIteration++;
- try {
- // Since FileWatch service is not reliable, we will check
- // file inode every n seconds after no write
- if (sleepIteration > 4) {
- Object newFileKey = getFileKey(logPathFile);
- if (newFileKey != null) {
- if (fileKey == null || !newFileKey.equals(fileKey)) {
- logger
- .info("File key is different. Calling rollover. oldKey="
- + fileKey
- + ", newKey="
- + newFileKey
- + ". "
- + getShortDescription());
- // File has rotated.
- rollOver();
- }
- }
- }
- // Flush on the second iteration
- if (!tail && sleepIteration >= 2) {
- logger.info("End of file. Done with filePath=" + logPathFile
- + ", lineCount=" + lineCount);
- flush();
- break;
- } else if (sleepIteration == 2) {
- flush();
- } else if (sleepIteration >= 2) {
- if (isRolledOver) {
- isRolledOver = false;
- // Close existing file
- try {
- logger
- .info("File is rolled over. Closing current open file."
- + getShortDescription() + ", lineCount="
- + lineCount);
- br.close();
- } catch (Exception ex) {
- logger.error("Error closing file" + getShortDescription());
- break;
- }
- try {
- // Open new file
- logger.info("Opening new rolled over file."
- + getShortDescription());
- br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
- lineCount = 0;
- fileKey = getFileKey(logPathFile);
- base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
- .getBytes());
- logger.info("fileKey=" + fileKey + ", base64="
- + base64FileKey + ", " + getShortDescription());
- } catch (Exception ex) {
- logger.error("Error opening rolled over file. "
- + getShortDescription());
- // Let's add this to monitoring and exit this thread
- logger.info("Added input to not ready list."
- + getShortDescription());
- isReady = false;
- inputMgr.addToNotReady(this);
- break;
- }
- logger.info("File is successfully rolled over. "
- + getShortDescription());
- continue;
- }
- }
- Thread.sleep(sleepStep * 1000);
- sleepStep = (sleepStep * 2);
- sleepStep = sleepStep > 10 ? 10 : sleepStep;
- } catch (InterruptedException e) {
- logger.info("Thread interrupted." + getShortDescription());
- }
- } else {
- lineCount++;
- sleepStep = 1;
- sleepIteration = 0;
-
- if (!resume && lineCount > resumeFromLineNumber) {
- logger.info("Resuming to read from last line. lineCount="
- + lineCount + ", input=" + getShortDescription());
- resume = true;
- }
- if (resume) {
- InputMarker marker = new InputMarker();
- marker.base64FileKey = base64FileKey;
- marker.input = this;
- marker.lineNumber = lineCount;
- outputLine(line, marker);
- }
- }
- } catch (Throwable t) {
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_READ_LOOP_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Caught exception in read loop. lineNumber=" + lineCount
- + ", input=" + getShortDescription(), t, logger, Level.ERROR);
-
- }
- }
- } finally {
- if (br != null) {
- logger.info("Closing reader." + getShortDescription() + ", lineCount="
- + lineCount);
- try {
- br.close();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-
- static public Object getFileKey(String s3FilePath) {
- return s3FilePath.toString();
+ protected BufferedReader openLogFile(File logPathFile) throws IOException {
+ String s3AccessKey = getStringValue("s3_access_key");
+ String s3SecretKey = getStringValue("s3_secret_key");
+ BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey);
+ fileKey = getFileKey(logPathFile);
+ base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+ LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
+ return br;
}
@Override
- public String getShortDescription() {
- return "input:source="
- + getStringValue("source")
- + ", path="
- + (s3LogPathFiles != null && s3LogPathFiles.length > 0 ? s3LogPathFiles[0]
- : getStringValue("path"));
+ protected Object getFileKey(File logFile) {
+ return logFile.getPath();
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 5ba56a5..743be69 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -18,7 +18,7 @@
*/
package org.apache.ambari.logfeeder.input;
-import java.net.Inet4Address;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -66,7 +66,7 @@ public class InputSimulate extends Input {
Filter filter = new FilterJSON();
filter.setInput(this);
- setFirstFilter(filter);
+ addFilter(filter);
}
private List<String> getSimulatedLogTypes() {
@@ -88,23 +88,18 @@ public class InputSimulate extends Input {
return LOG_TEXT_PATTERN.replaceAll("<LOG_MESSAGE_PATTERN>", logMessagePattern);
}
-
- @Override
- public String getNameForThread() {
- return "Simulated input";
- }
@Override
- public String getShortDescription() {
- return "Simulated input";
+ public boolean isReady() {
+ return true;
}
-
+
@Override
void start() throws Exception {
if (types.isEmpty())
return;
- getFirstFilter().setOutputMgr(outputMgr);
+ getFirstFilter().setOutputManager(outputManager);
while (true) {
String type = imitateRandomLogFile();
@@ -129,10 +124,7 @@ public class InputSimulate extends Input {
}
private InputMarker getInputMarker(String type) throws Exception {
- InputMarker marker = new InputMarker();
- marker.input = this;
- marker.lineNumber = getLineNumber(type);
- marker.base64FileKey = getBase64FileKey();
+ InputMarker marker = new InputMarker(this, getBase64FileKey(), getLineNumber(type));
return marker;
}
@@ -147,7 +139,7 @@ public class InputSimulate extends Input {
}
private String getBase64FileKey() throws Exception {
- String fileKey = Inet4Address.getLocalHost().getHostAddress() + "|" + filePath;
+ String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + filePath;
return Base64.byteArrayToBase64(fileKey.getBytes());
}
@@ -155,4 +147,20 @@ public class InputSimulate extends Input {
Date d = new Date();
return String.format(logText, d.getTime(), level, marker.lineNumber);
}
+
+ @Override
+ public void checkIn(InputMarker inputMarker) {}
+
+ @Override
+ public void lastCheckIn() {}
+
+ @Override
+ public String getNameForThread() {
+ return "Simulated input";
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "Simulated input";
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
index a2a9db2..9ccc4f2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
@@ -18,7 +18,6 @@
*/
package org.apache.ambari.logfeeder.input.reader;
-import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -30,15 +29,11 @@ import org.apache.log4j.Logger;
class GZIPReader extends InputStreamReader {
- private static Logger logger = Logger.getLogger(GZIPReader.class);
+ private static final Logger LOG = Logger.getLogger(GZIPReader.class);
GZIPReader(String fileName) throws FileNotFoundException {
super(getStream(fileName));
- logger.info("Created GZIPReader for file : " + fileName);
- }
-
- GZIPReader(File file) throws FileNotFoundException {
- super(getStream(file.getName()));
+ LOG.info("Created GZIPReader for file : " + fileName);
}
private static InputStream getStream(String fileName) {
@@ -48,7 +43,7 @@ class GZIPReader extends InputStreamReader {
fileStream = new FileInputStream(fileName);
gzipStream = new GZIPInputStream(fileStream);
} catch (Exception e) {
- logger.error(e, e.getCause());
+ LOG.error(e, e.getCause());
}
return gzipStream;
}
@@ -58,21 +53,13 @@ class GZIPReader extends InputStreamReader {
*/
static boolean isValidFile(String fileName) {
// TODO make it generic and put in factory itself
- InputStream is = null;
- try {
- is = new FileInputStream(fileName);
+
+ try (InputStream is = new FileInputStream(fileName)) {
byte[] signature = new byte[2];
int nread = is.read(signature); // read the gzip signature
return nread == 2 && signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b;
} catch (IOException e) {
return false;
- } finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- }
- }
}
}
}