You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2017/10/16 13:06:34 UTC
ambari git commit: AMBARI-21145. Allow wildcard for log directory
folder in the path component of Logfeeder input (oleewere)
Repository: ambari
Updated Branches:
refs/heads/AMBARI-21145 [created] 1a3fcd234
AMBARI-21145. Allow wildcard for log directory folder in the path component of Logfeeder input (oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1a3fcd23
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1a3fcd23
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1a3fcd23
Branch: refs/heads/AMBARI-21145
Commit: 1a3fcd234df72ef9414fc4867634ff5f2cab1201
Parents: 268c964
Author: Oliver Szabo <ol...@gmail.com>
Authored: Mon Oct 9 21:17:38 2017 +0200
Committer: Oliver Szabo <ol...@gmail.com>
Committed: Mon Oct 16 15:05:27 2017 +0200
----------------------------------------------------------------------
.../ambari-logsearch-logfeeder/pom.xml | 5 +
.../ambari/logfeeder/filter/FilterGrok.java | 16 +-
.../apache/ambari/logfeeder/input/Input.java | 147 ++++++++++++++++++-
.../ambari/logfeeder/input/InputFile.java | 32 ++--
.../input/monitor/AbstractLogFileMonitor.java | 74 ++++++++++
.../input/monitor/LogFileDetachMonitor.java | 79 ++++++++++
.../input/monitor/LogFilePathUpdateMonitor.java | 74 ++++++++++
.../logfeeder/logconfig/LogConfigHandler.java | 2 +-
.../apache/ambari/logfeeder/util/FileUtil.java | 105 ++++++++++++-
ambari-logsearch/docker/Dockerfile | 6 +-
ambari-logsearch/docker/logsearch-docker.sh | 4 +-
11 files changed, 515 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 49122e8..3a0524d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -154,6 +154,11 @@
<artifactId>commons-io</artifactId>
<version>${common.io.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant</artifactId>
+ <version>1.7.1</version>
+ </dependency>
</dependencies>
<build>
<finalName>LogFeeder</finalName>
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 7e2da70..deff1b2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -87,11 +87,15 @@ public class FilterGrok extends Filter {
LOG.error("message_pattern is not set for filter.");
return;
}
- extractNamedParams(messagePattern, namedParamList);
grokMessage = new Grok();
loadPatterns(grokMessage);
grokMessage.compile(messagePattern);
+ if (getBooleanValue("deep_extract", false)) {
+ extractNamedParams(grokMessage.getNamedRegexCollection());
+ } else {
+ extractNamedParams(messagePattern, namedParamList);
+ }
if (!StringUtils.isEmpty(multilinePattern)) {
extractNamedParams(multilinePattern, multiLineamedParamList);
@@ -108,6 +112,16 @@ public class FilterGrok extends Filter {
}
+ private void extractNamedParams(Map<String, String> namedRegexCollection) {
+ if (namedRegexCollection != null) {
+ for (String paramValue : namedRegexCollection.values()) {
+ if (paramValue.toLowerCase().equals(paramValue)) {
+ namedParamList.add(paramValue);
+ }
+ }
+ }
+ }
+
private String escapePattern(String inPattern) {
String inStr = inPattern;
if (inStr != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 9f54d8a..4bd00ea 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -29,13 +29,15 @@ import org.apache.ambari.logfeeder.input.cache.LRUCache;
import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.monitor.LogFileDetachMonitor;
+import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
-public abstract class Input extends ConfigBlock implements Runnable {
+public abstract class Input extends ConfigBlock implements Runnable, Cloneable {
private static final Logger LOG = Logger.getLogger(Input.class);
private static final boolean DEFAULT_TAIL = true;
@@ -46,6 +48,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
private static final int DEFAULT_CACHE_SIZE = 100;
private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
+ private static final int DEFAULT_DETACH_INTERVAL_MIN = 300;
+ private static final int DEFAULT_DETACH_TIME_MIN = 2000;
+ private static final int DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN = 5;
private static final String CACHE_ENABLED = "cache_enabled";
private static final String CACHE_KEY_FIELD = "cache_key_field";
@@ -58,6 +63,12 @@ public abstract class Input extends ConfigBlock implements Runnable {
private List<Output> outputList = new ArrayList<Output>();
private Thread thread;
+ private Thread logFileDetacherThread;
+ private Thread logFilePathUpdaterThread;
+ private ThreadGroup threadGroup;
+ private int detachIntervalMin;
+ private int pathUpdateIntervalMin;
+ private int detachTimeMin;
private String type;
protected String filePath;
private Filter firstFilter;
@@ -69,6 +80,10 @@ public abstract class Input extends ConfigBlock implements Runnable {
private LRUCache cache;
private String cacheKeyField;
+ private boolean multiFolder = false;
+ private Map<String, List<File>> folderMap;
+ private Map<String, List<File>> fileMap;
+ private Map<String, InputFile> inputChildMap = new HashMap<>(); // TODO: weird it has this relationship
protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
protected String getReadBytesMetricName() {
@@ -79,6 +94,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
public void loadConfig(Map<String, Object> map) {
super.loadConfig(map);
String typeValue = getStringValue("type");
+ detachIntervalMin = getIntValue("detach_interval_min", DEFAULT_DETACH_INTERVAL_MIN * 60 * 1000);
+ detachTimeMin = getIntValue("detach_time_min", DEFAULT_DETACH_TIME_MIN * 60 * 1000);
+ pathUpdateIntervalMin = getIntValue("path_update_interval_min", DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60 * 1000);
if (typeValue != null) {
// Explicitly add type and value to field list
contextFields.put("type", typeValue);
@@ -131,20 +149,70 @@ public abstract class Input extends ConfigBlock implements Runnable {
if (firstFilter != null) {
firstFilter.init();
}
-
}
boolean monitor() {
if (isReady()) {
- LOG.info("Starting thread. " + getShortDescription());
- thread = new Thread(this, getNameForThread());
- thread.start();
+ if (isMultiFolder()) {
+ try {
+ threadGroup = new ThreadGroup(getNameForThread());
+ if (getFolderMap() != null) {
+ for (Map.Entry<String, List<File>> folderFileEntry : getFolderMap().entrySet()) {
+ startNewChildInputFileThread(folderFileEntry);
+ }
+ logFilePathUpdaterThread = new Thread(new LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "logfile_path_updater=" + filePath);
+ logFilePathUpdaterThread.setDaemon(true);
+ logFileDetacherThread = new Thread(new LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), "logfile_detacher=" + filePath);
+ logFileDetacherThread.setDaemon(true);
+
+ logFilePathUpdaterThread.start();
+ logFileDetacherThread.start();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.info("Starting thread. " + getShortDescription());
+ thread = new Thread(this, getNameForThread());
+ thread.start();
+ }
return true;
} else {
return false;
}
}
+ public void startNewChildInputFileThread(Map.Entry<String, List<File>> folderFileEntry) throws CloneNotSupportedException {
+ LOG.info("Start child input thread - " + folderFileEntry.getKey());
+ InputFile clonedObject = (InputFile) this.clone();
+ String folderPath = folderFileEntry.getKey();
+ String filePath = new File(getFilePath()).getName();
+ String fullPathWithWildCard = String.format("%s/%s", folderPath, filePath);
+ clonedObject.setMultiFolder(false);
+ clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail
+ clonedObject.logPath = fullPathWithWildCard;
+ clonedObject.setLogFileDetacherThread(null);
+ clonedObject.setLogFilePathUpdaterThread(null);
+ clonedObject.setInputChildMap(new HashMap<String, InputFile>());
+ Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard);
+ clonedObject.setThread(thread);
+ inputChildMap.put(fullPathWithWildCard, clonedObject);
+ thread.start();
+ }
+
+ public void stopChildInputFileThread(String folderPathKey) {
+ LOG.info("Stop child input thread - " + folderPathKey);
+ String filePath = new File(getFilePath()).getName();
+ String fullPathWithWildCard = String.format("%s/%s", folderPathKey, filePath);
+ if (inputChildMap.containsKey(fullPathWithWildCard)) {
+ InputFile inputFile = inputChildMap.get(fullPathWithWildCard);
+ inputFile.getThread().interrupt();
+ inputChildMap.remove(fullPathWithWildCard);
+ } else {
+ LOG.warn(fullPathWithWildCard + " not found as an input child.");
+ }
+ }
+
public abstract boolean isReady();
@Override
@@ -192,7 +260,14 @@ public abstract class Input extends ConfigBlock implements Runnable {
LOG.info("Request to drain. " + getShortDescription());
super.setDrain(drain);
try {
- thread.interrupt();
+ if (isMultiFolder()) {
+ logFileDetacherThread.interrupt();
+ logFilePathUpdaterThread.interrupt();
+ threadGroup.interrupt();
+ }
+ if (thread != null) {
+ thread.interrupt();
+ }
} catch (Throwable t) {
// ignore
}
@@ -318,6 +393,30 @@ public abstract class Input extends ConfigBlock implements Runnable {
this.cacheKeyField = cacheKeyField;
}
+ public ThreadGroup getThreadGroup() {
+ return threadGroup;
+ }
+
+ public void setThreadGroup(ThreadGroup threadGroup) {
+ this.threadGroup = threadGroup;
+ }
+
+ public Map<String, List<File>> getFolderMap() {
+ return folderMap;
+ }
+
+ public void setFolderMap(Map<String, List<File>> folderMap) {
+ this.folderMap = folderMap;
+ }
+
+ public Map<String, List<File>> getFileMap() {
+ return fileMap;
+ }
+
+ public void setFileMap(Map<String, List<File>> fileMap) {
+ this.fileMap = fileMap;
+ }
+
@Override
public String getNameForThread() {
if (filePath != null) {
@@ -334,4 +433,40 @@ public abstract class Input extends ConfigBlock implements Runnable {
public String toString() {
return getShortDescription();
}
+
+ public boolean isMultiFolder() {
+ return multiFolder;
+ }
+
+ public void setMultiFolder(boolean multiFolder) {
+ this.multiFolder = multiFolder;
+ }
+
+ public Thread getLogFileDetacherThread() {
+ return logFileDetacherThread;
+ }
+
+ public void setLogFileDetacherThread(Thread logFileDetacherThread) {
+ this.logFileDetacherThread = logFileDetacherThread;
+ }
+
+ public Thread getLogFilePathUpdaterThread() {
+ return logFilePathUpdaterThread;
+ }
+
+ public void setLogFilePathUpdaterThread(Thread logFilePathUpdaterThread) {
+ this.logFilePathUpdaterThread = logFilePathUpdaterThread;
+ }
+
+ public Map<String, InputFile> getInputChildMap() {
+ return inputChildMap;
+ }
+
+ public void setInputChildMap(Map<String, InputFile> inputChildMap) {
+ this.inputChildMap = inputChildMap;
+ }
+
+ public void setThread(Thread thread) {
+ this.thread = thread;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 3737839..0a28320 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -20,12 +20,12 @@ package org.apache.ambari.logfeeder.input;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileFilter;
import java.io.FileNotFoundException;
+import java.util.List;
+import java.util.Map;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.solr.common.util.Base64;
@@ -34,8 +34,20 @@ public class InputFile extends AbstractInputFile {
@Override
public boolean isReady() {
if (!isReady) {
+ // Check there can have pattern in folder
+ if (getFilePath() != null && getFilePath().contains("/")) {
+ int lastIndexOfSlash = getFilePath().lastIndexOf("/");
+ String folderBeforeLogName = getFilePath().substring(0, lastIndexOfSlash);
+ if (folderBeforeLogName.contains("*")) {
+ setMultiFolder(true);
+ }
+ }
// Let's try to check whether the file is available
- logFiles = getActualFiles(logPath);
+ logFiles = getActualInputLogFiles();
+ Map<String, List<File>> fileMap = FileUtil.getFilesMapForFolders(logFiles);
+ Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
+ setFolderMap(foldersMap);
+ setFileMap(fileMap);
if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
if (tail && logFiles.length > 1) {
LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
@@ -50,16 +62,6 @@ public class InputFile extends AbstractInputFile {
return isReady;
}
- private File[] getActualFiles(String searchPath) {
- File searchFile = new File(searchPath);
- if (searchFile.isFile()) {
- return new File[]{searchFile};
- } else {
- FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
- return searchFile.getParentFile().listFiles(fileFilter);
- }
- }
-
@Override
void start() throws Exception {
boolean isProcessFile = getBooleanValue("process_file", true);
@@ -99,6 +101,10 @@ public class InputFile extends AbstractInputFile {
return FileUtil.getFileKey(logFile);
}
+ public File[] getActualInputLogFiles() {
+ return FileUtil.getInputFiles(logPath);
+ }
+
private void copyFiles(File[] files) {
boolean isCopyFile = getBooleanValue("copy_file", false);
if (isCopyFile && files != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
new file mode 100644
index 0000000..35bf702
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public abstract class AbstractLogFileMonitor implements Runnable {
+
+ private Logger LOG = LoggerFactory.getLogger(AbstractLogFileMonitor.class);
+
+ private final InputFile inputFile;
+ private final int waitInterval;
+ private final int detachTime;
+
+ AbstractLogFileMonitor(InputFile inputFile, int waitInterval, int detachTime) {
+ this.inputFile = inputFile;
+ this.waitInterval = waitInterval;
+ this.detachTime = detachTime;
+ }
+
+ public InputFile getInputFile() {
+ return inputFile;
+ }
+
+ public int getWaitInterval() {
+ return waitInterval;
+ }
+
+ public int getDetachTime() {
+ return detachTime;
+ }
+
+ @Override
+ public void run() {
+ LOG.info(getStartLog());
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(1000 * waitInterval);
+ monitorAndUpdate();
+ } catch (Exception e) {
+ LOG.error("{}", e);
+ }
+ }
+ }
+
+ protected boolean isFileTooOld(File monitoredFile, long detachTimeMin) {
+ return (System.currentTimeMillis() - monitoredFile.lastModified()) > detachTimeMin * 60 * 1000;
+ }
+
+ protected abstract String getStartLog();
+
+ protected abstract void monitorAndUpdate() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
new file mode 100644
index 0000000..322a56d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Detach log files in case of folders do not exist or monitored files are too old
+ */
+public class LogFileDetachMonitor extends AbstractLogFileMonitor {
+
+ private Logger LOG = LoggerFactory.getLogger(LogFileDetachMonitor.class);
+
+ public LogFileDetachMonitor(InputFile inputFile, int interval, int detachTime) {
+ super(inputFile, interval, detachTime);
+ }
+
+ @Override
+ public String getStartLog() {
+ return "Start file detach monitor thread for " + getInputFile().getFilePath();
+ }
+
+ @Override
+ protected void monitorAndUpdate() throws Exception {
+ File[] logFiles = getInputFile().getActualInputLogFiles();
+ Map<String, List<File>> actualFolderMap = FileUtil.getFoldersForFiles(logFiles);
+
+ // create map copies
+ Map<String, InputFile> copiedInputFileMap = new HashMap<>(getInputFile().getInputChildMap());
+ Map<String, List<File>> copiedFolderMap = new HashMap<>(getInputFile().getFolderMap());
+ // detach old entries
+ for (Map.Entry<String, List<File>> entry : copiedFolderMap.entrySet()) {
+ if (new File(entry.getKey()).exists()) {
+ for (Map.Entry<String, InputFile> inputFileEntry : copiedInputFileMap.entrySet()) {
+ if (inputFileEntry.getKey().startsWith(entry.getKey())) {
+ File monitoredFile = entry.getValue().get(0);
+ boolean isFileTooOld = isFileTooOld(monitoredFile, getDetachTime());
+ if (isFileTooOld) {
+ LOG.info("File ('{}') in folder ('{}') is too old (reached {} minutes), detach input thread.", entry.getKey(), getDetachTime());
+ getInputFile().stopChildInputFileThread(entry.getKey());
+ }
+ }
+ }
+ } else {
+ LOG.info("Folder not exists. ({}) Stop thread.", entry.getKey());
+ for (Map.Entry<String, InputFile> inputFileEntry : copiedInputFileMap.entrySet()) {
+ if (inputFileEntry.getKey().startsWith(entry.getKey())) {
+ getInputFile().stopChildInputFileThread(entry.getKey());
+ getInputFile().setFolderMap(actualFolderMap);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
new file mode 100644
index 0000000..cc5d664
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Update log file paths periodically, useful if the log file name pattern format is like: mylog-2017-10-09.log (so the tail one can change)
+ */
+public class LogFilePathUpdateMonitor extends AbstractLogFileMonitor {
+
+ private Logger LOG = LoggerFactory.getLogger(LogFilePathUpdateMonitor.class);
+
+ public LogFilePathUpdateMonitor(InputFile inputFile, int interval, int detachTime) {
+ super(inputFile, interval, detachTime);
+ }
+
+ @Override
+ public String getStartLog() {
+ return "Start file path update monitor thread for " + getInputFile().getFilePath();
+ }
+
+ @Override
+ protected void monitorAndUpdate() throws Exception {
+ File[] logFiles = getInputFile().getActualInputLogFiles();
+ Map<String, List<File>> foldersMap = FileUtil.getFoldersForFiles(logFiles);
+ Map<String, List<File>> originalFoldersMap = getInputFile().getFolderMap();
+ for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) {
+ if (originalFoldersMap.keySet().contains(entry.getKey())) {
+ List<File> originalLogFiles = originalFoldersMap.get(entry.getKey());
+ if (!entry.getValue().isEmpty()) { // check tail only for now
+ File lastFile = entry.getValue().get(0);
+ if (!originalLogFiles.get(0).getAbsolutePath().equals(lastFile.getAbsolutePath())) {
+ LOG.info("New file found (old: '{}', new: {}), reload thread for {}",
+ lastFile.getAbsolutePath(), originalLogFiles.get(0).getAbsolutePath(), entry.getKey());
+ getInputFile().stopChildInputFileThread(entry.getKey());
+ getInputFile().startNewChildInputFileThread(entry);
+ }
+ }
+ } else {
+ LOG.info("New log file folder found: {}, start a new thread if tail file is not too old.", entry.getKey());
+ File monitoredFile = entry.getValue().get(0);
+ if (isFileTooOld(monitoredFile, getDetachTime())) {
+ LOG.info("'{}' file is too old. No new thread start needed.", monitoredFile.getAbsolutePath());
+ } else {
+ getInputFile().startNewChildInputFileThread(entry);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
index 0ece637..6a052d0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
@@ -200,7 +200,7 @@ public class LogConfigHandler extends Thread {
try {
Thread.sleep(RETRY_INTERVAL * 1000);
} catch (InterruptedException e) {
- LOG.error(e);
+ LOG.warn(e);
}
LOG.info("Checking if config is available");
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index ffd6cec..71cdcd1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -27,17 +27,22 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Logger;
+import org.apache.tools.ant.DirectoryScanner;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FileUtil {
- private static final Logger LOG = Logger.getLogger(FileUtil.class);
-
+ private static final Logger LOG = LoggerFactory.getLogger(FileUtil.class);
+ private static final String FOLDER_SEPARATOR = "/";
+
private FileUtil() {
throw new UnsupportedOperationException();
}
@@ -85,8 +90,100 @@ public class FileUtil {
HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {});
return jsonmap;
} catch (IOException e) {
- LOG.error(e, e.getCause());
+ LOG.error("{}", e);
}
return new HashMap<String, Object>();
}
+
+ public static File[] getInputFiles(String searchPath) {
+ File searchFile = new File(searchPath);
+ if (searchFile.isFile()) {
+ return new File[]{searchFile};
+ } else {
+ if (searchPath.contains("*")) {
+ String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath);
+ String fileNameAfterLastFolder = searchPath.replaceAll(folderBeforeRegex, "");
+
+ DirectoryScanner scanner = new DirectoryScanner();
+ scanner.setIncludes(new String[]{fileNameAfterLastFolder});
+ scanner.setBasedir(folderBeforeRegex);
+ scanner.setCaseSensitive(true);
+ scanner.scan();
+ String[] fileNames = scanner.getIncludedFiles();
+
+ if (fileNames != null && fileNames.length > 0) {
+ File[] files = new File[fileNames.length];
+ for (int i = 0; i < fileNames.length; i++) {
+ files[i] = new File(folderBeforeRegex + fileNames[i]);
+ }
+ return files;
+ }
+
+ } else {
+ LOG.warn("Input file config not found by pattern; {}", searchPath);
+ }
+ return new File[]{};
+ }
+ }
+
+ public static Map<String, List<File>> getFilesMapForFolders(File[] inputFiles) {
+ Map<String, List<File>> filesMap = new HashMap<>();
+
+ if (inputFiles != null && inputFiles.length > 0) {
+ for (File inputFile : inputFiles) {
+ if (filesMap.containsKey(inputFile.getName())) {
+ filesMap.get(inputFile.getName()).add(inputFile);
+ } else {
+ List<File> fileList = new ArrayList<>();
+ fileList.add(inputFile);
+ filesMap.put(inputFile.getName(), fileList);
+ }
+ }
+ }
+ if (!filesMap.isEmpty()) {
+ for (Map.Entry<String, List<File>> entry : filesMap.entrySet()) {
+ Collections.sort(entry.getValue(), Collections.reverseOrder());
+ }
+ }
+ return filesMap;
+ }
+
+ public static Map<String, List<File>> getFoldersForFiles(File[] inputFiles) {
+ Map<String, List<File>> foldersMap = new HashMap<>();
+ if (inputFiles != null && inputFiles.length > 0) {
+ for (File inputFile : inputFiles) {
+ File folder = inputFile.getParentFile();
+ if (folder.exists()) {
+ if (foldersMap.containsKey(folder.getAbsolutePath())) {
+ foldersMap.get(folder.getAbsolutePath()).add(inputFile);
+ } else {
+ List<File> fileList = new ArrayList<>();
+ fileList.add(inputFile);
+ foldersMap.put(folder.getAbsolutePath(), fileList);
+ }
+ }
+ }
+ }
+ if (!foldersMap.isEmpty()) {
+ for (Map.Entry<String, List<File>> entry : foldersMap.entrySet()) {
+ Collections.sort(entry.getValue(), Collections.reverseOrder());
+ }
+ }
+ return foldersMap;
+ }
+
+ private static String getLogDirNameBeforeWildCard(String pattern) {
+ String[] splitByFirstRegex = pattern.split("\\*");
+ String beforeRegex = splitByFirstRegex[0];
+ if (beforeRegex.contains(FOLDER_SEPARATOR)) {
+ int endIndex = beforeRegex.lastIndexOf(FOLDER_SEPARATOR);
+ String parentFolder = beforeRegex;
+ if (endIndex != -1) {
+ parentFolder = beforeRegex.substring(0, endIndex) + FOLDER_SEPARATOR;
+ }
+ return parentFolder;
+ } else {
+ return beforeRegex;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/docker/Dockerfile
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/Dockerfile b/ambari-logsearch/docker/Dockerfile
index dfa1462..e67836b 100644
--- a/ambari-logsearch/docker/Dockerfile
+++ b/ambari-logsearch/docker/Dockerfile
@@ -22,8 +22,10 @@ RUN yum -y install glibc-common
ENV HOME /root
#Install JAVA
-RUN wget --no-check-certificate --no-cookies --header "Cookie:oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/7u55-b13/jdk-7u55-linux-x64.rpm -O jdk-7u55-linux-x64.rpm
-RUN rpm -ivh jdk-7u55-linux-x64.rpm
+ENV JAVA_VERSION 8u131
+ENV BUILD_VERSION b11
+RUN wget --no-check-certificate --no-cookies --header "Cookie:oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/$JAVA_VERSION-$BUILD_VERSION/d54c1d3a095b4ff2b6607d096fa80163/jdk-$JAVA_VERSION-linux-x64.rpm -O jdk-8-linux-x64.rpm
+RUN rpm -ivh jdk-8-linux-x64.rpm
ENV JAVA_HOME /usr/java/default/
#Install Maven
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a3fcd23/ambari-logsearch/docker/logsearch-docker.sh
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/logsearch-docker.sh b/ambari-logsearch/docker/logsearch-docker.sh
index a2df90f..fa54bc1 100755
--- a/ambari-logsearch/docker/logsearch-docker.sh
+++ b/ambari-logsearch/docker/logsearch-docker.sh
@@ -26,7 +26,7 @@ function build_logsearch_project() {
function build_logsearch_container() {
pushd $sdir
- docker build -t ambari-logsearch:v1.0 .
+ docker build -t ambari-logsearch:v0.5 .
popd
}
@@ -41,7 +41,7 @@ function start_logsearch_container() {
-v $AMBARI_LOCATION:/root/ambari -v $MAVEN_REPOSITORY_LOCATION:/root/.m2 $LOGSEARCH_EXPOSED_PORTS $LOGSEARCH_ENV_OPTS $LOGSEARCH_EXTRA_OPTS $LOGSEARCH_VOLUME_OPTS \
-v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-logfeeder/target/classes:/root/ambari/ambari-logsearch/ambari-logsearch-logfeeder/target/package/classes \
-v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-portal/target/classes:/root/ambari/ambari-logsearch/ambari-logsearch-portal/target/package/classes \
- -v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-portal/src/main/webapp:/root/ambari/ambari-logsearch/ambari-logsearch-portal/target/package/classes/webapps/app ambari-logsearch:v1.0
+ -v $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-portal/src/main/webapp:/root/ambari/ambari-logsearch/ambari-logsearch-portal/target/package/classes/webapps/app ambari-logsearch:v0.5
ip_address=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' logsearch)
echo "Log Search container started on $ip_address (for Mac OSX route to boot2docker/docker-machine VM address, e.g.: 'sudo route add -net 172.17.0.0/16 192.168.59.103')"
echo "You can follow Log Search logs with 'docker logs -f logsearch' command"