You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2017/11/14 17:29:20 UTC
ambari git commit: AMBARI-22395. Log Feeder: cleanup checkpoint files
periodically (oleewere)
Repository: ambari
Updated Branches:
refs/heads/branch-2.6 93820ddfd -> a22f66f71
AMBARI-22395. Log Feeder: cleanup checkpoint files periodically (oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a22f66f7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a22f66f7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a22f66f7
Branch: refs/heads/branch-2.6
Commit: a22f66f719fd11efd5ecfec09dc5a8112f9ada1b
Parents: 93820dd
Author: Oliver Szabo <ol...@gmail.com>
Authored: Tue Nov 14 13:21:42 2017 +0100
Committer: Oliver Szabo <ol...@gmail.com>
Committed: Tue Nov 14 18:27:47 2017 +0100
----------------------------------------------------------------------
.../logfeeder/input/AbstractInputFile.java | 9 +++
.../apache/ambari/logfeeder/input/Input.java | 29 +++++---
.../ambari/logfeeder/input/InputManager.java | 12 ++++
.../input/monitor/AbstractLogFileMonitor.java | 5 +-
.../input/monitor/CheckpointCleanupMonitor.java | 48 +++++++++++++
.../input/monitor/LogFileDetachMonitor.java | 2 +-
.../input/monitor/LogFilePathUpdateMonitor.java | 2 +-
.../logfeeder/metrics/LogFeederAMSClient.java | 3 +
.../logfeeder/metrics/MetricsManager.java | 8 ++-
.../apache/ambari/logfeeder/util/FileUtil.java | 38 ++++++----
.../logfeeder/metrics/MetrcisManagerTest.java | 9 ++-
.../test-config/logfeeder/logfeeder.properties | 3 +-
.../shipper-conf/input.config-storm.json | 75 ++++++++++++++++++++
.../streamline-1-TestAgg-2-3/6701/worker.log | 4 ++
14 files changed, 210 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index 8548a20..64f2cbc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -54,6 +54,7 @@ public abstract class AbstractInputFile extends Input {
private int checkPointIntervalMS;
private Map<String, Object> jsonCheckPoint;
private InputMarker lastCheckPointInputMarker;
+ private Integer maxAgeMin;
@Override
protected String getStatMetricName() {
@@ -75,6 +76,7 @@ public abstract class AbstractInputFile extends Input {
setClosed(true);
logPath = getStringValue("path");
tail = getBooleanValue("tail", tail);
+ maxAgeMin = getIntValue("max_age_min", 0);
checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS);
if (StringUtils.isEmpty(logPath)) {
@@ -286,6 +288,9 @@ public abstract class AbstractInputFile extends Input {
jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
jsonCheckPoint.put("last_write_time_date", new Date());
+ if (maxAgeMin != 0) {
+ jsonCheckPoint.put("max_age_min", maxAgeMin.toString());
+ }
String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
@@ -326,4 +331,8 @@ public abstract class AbstractInputFile extends Input {
return "input:source=" + getStringValue("source") + ", path=" +
(!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
}
+
+ public Integer getMaxAgeMin() {
+ return maxAgeMin;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 96320e9..7df0b6e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -34,6 +34,7 @@ import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
@@ -190,16 +191,20 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable {
String folderPath = folderFileEntry.getKey();
String filePath = new File(getFilePath()).getName();
String fullPathWithWildCard = String.format("%s/%s", folderPath, filePath);
- clonedObject.setMultiFolder(false);
- clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail
- clonedObject.logPath = fullPathWithWildCard;
- clonedObject.setLogFileDetacherThread(null);
- clonedObject.setLogFilePathUpdaterThread(null);
- clonedObject.setInputChildMap(new HashMap<String, InputFile>());
- Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard);
- clonedObject.setThread(thread);
- inputChildMap.put(fullPathWithWildCard, clonedObject);
- thread.start();
+ if (clonedObject.getMaxAgeMin() != 0 && FileUtil.isFileTooOld(new File(fullPathWithWildCard), clonedObject.getMaxAgeMin().longValue())) {
+ LOG.info(String.format("File ('%s') is too old (max age min: %d), monitor thread not starting...", getFilePath(), clonedObject.getMaxAgeMin()));
+ } else {
+ clonedObject.setMultiFolder(false);
+ clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail
+ clonedObject.logPath = fullPathWithWildCard;
+ clonedObject.setLogFileDetacherThread(null);
+ clonedObject.setLogFilePathUpdaterThread(null);
+ clonedObject.setInputChildMap(new HashMap<String, InputFile>());
+ Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard);
+ clonedObject.setThread(thread);
+ inputChildMap.put(fullPathWithWildCard, clonedObject);
+ thread.start();
+ }
}
public void stopChildInputFileThread(String folderPathKey) {
@@ -208,7 +213,9 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable {
String fullPathWithWildCard = String.format("%s/%s", folderPathKey, filePath);
if (inputChildMap.containsKey(fullPathWithWildCard)) {
InputFile inputFile = inputChildMap.get(fullPathWithWildCard);
- inputFile.getThread().interrupt();
+ if (inputFile.getThread() != null && inputFile.getThread().isAlive()) {
+ inputFile.getThread().interrupt();
+ }
inputChildMap.remove(fullPathWithWildCard);
} else {
LOG.warn(fullPathWithWildCard + " not found as an input child.");
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
index 8e70850..9b472ad 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -139,6 +140,10 @@ public class InputManager {
if (isCheckPointFolderValid) {
LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints");
}
+ // check checkpoint cleanup every 2000 min
+ Thread checkpointCleanupThread = new Thread(new CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
+ checkpointCleanupThread.setDaemon(true);
+ checkpointCleanupThread.start();
}
}
@@ -269,6 +274,10 @@ public class InputManager {
String logFilePath = (String) jsonCheckPoint.get("file_path");
String logFileKey = (String) jsonCheckPoint.get("file_key");
+ Integer maxAgeMin = null;
+ if (jsonCheckPoint.containsKey("max_age_min")) {
+ maxAgeMin = Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
+ }
if (logFilePath != null && logFileKey != null) {
boolean deleteCheckPointFile = false;
File logFile = new File(logFilePath);
@@ -279,6 +288,9 @@ public class InputManager {
deleteCheckPointFile = true;
LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+ } else if (maxAgeMin != null && maxAgeMin != 0 && FileUtil.isFileTooOld(logFile, maxAgeMin)) {
+ deleteCheckPointFile = true;
+ LOG.info("Checkpoint clean: File reached max age minutes (" + maxAgeMin + "):" + logFilePath);
}
} else {
LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
index 3910b9b..f01b39b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.input.monitor;
import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,10 +61,6 @@ public abstract class AbstractLogFileMonitor implements Runnable {
}
}
- protected boolean isFileTooOld(File monitoredFile, long detachTimeMin) {
- return (System.currentTimeMillis() - monitoredFile.lastModified()) > detachTimeMin * 60 * 1000;
- }
-
protected abstract String getStartLog();
protected abstract void monitorAndUpdate() throws Exception;
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
new file mode 100644
index 0000000..b8377c1
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CheckpointCleanupMonitor implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointCleanupMonitor.class);
+
+ private long waitIntervalMin;
+ private InputManager inputManager;
+
+ public CheckpointCleanupMonitor(InputManager inputManager, long waitIntervalMin) {
+ this.waitIntervalMin = waitIntervalMin;
+ this.inputManager = inputManager;
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(1000 * 60 * waitIntervalMin);
+ inputManager.cleanCheckPointFiles();
+ } catch (Exception e) {
+ LOG.error("Cleanup checkpoint files thread interrupted.", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
index 322a56d..0eb8ce8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
@@ -58,7 +58,7 @@ public class LogFileDetachMonitor extends AbstractLogFileMonitor {
for (Map.Entry<String, InputFile> inputFileEntry : copiedInputFileMap.entrySet()) {
if (inputFileEntry.getKey().startsWith(entry.getKey())) {
File monitoredFile = entry.getValue().get(0);
- boolean isFileTooOld = isFileTooOld(monitoredFile, getDetachTime());
+ boolean isFileTooOld = FileUtil.isFileTooOld(monitoredFile, getDetachTime());
if (isFileTooOld) {
LOG.info("File ('{}') in folder ('{}') is too old (reached {} minutes), detach input thread.", entry.getKey(), getDetachTime());
getInputFile().stopChildInputFileThread(entry.getKey());
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
index cc5d664..fcc9618 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
@@ -63,7 +63,7 @@ public class LogFilePathUpdateMonitor extends AbstractLogFileMonitor {
} else {
LOG.info("New log file folder found: {}, start a new thread if tail file is not too old.", entry.getKey());
File monitoredFile = entry.getValue().get(0);
- if (isFileTooOld(monitoredFile, getDetachTime())) {
+ if (FileUtil.isFileTooOld(monitoredFile, getDetachTime())) {
LOG.info("'{}' file is too old. No new thread start needed.", monitoredFile.getAbsolutePath());
} else {
getInputFile().startNewChildInputFileThread(entry);
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 2d1bf40..c0092c7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -64,6 +64,9 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
@Override
public String getCollectorUri(String host) {
+ if (collectorProtocol == null || host == null || collectorPort == null || collectorPath == null) {
+ return null;
+ }
return String.format("%s://%s:%s%s", collectorProtocol, host, collectorPort, collectorPath);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
index 1432c87..ebf7c6c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
@@ -47,7 +47,9 @@ public class MetricsManager {
public void init() {
LOG.info("Initializing MetricsManager()");
- amsClient = new LogFeederAMSClient();
+ if (amsClient == null) {
+ amsClient = new LogFeederAMSClient();
+ }
if (amsClient.getCollectorUri(null) != null) {
findNodeHostName();
@@ -165,4 +167,8 @@ public class MetricsManager {
(currMS - lastPublishTimeMS) / 1000 + " seconds ago, intervalConfigured=" + publishIntervalMS / 1000);
}
}
+
+ public void setAmsClient(LogFeederAMSClient amsClient) {
+ this.amsClient = amsClient;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index 843ae6b..86f0903 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -101,22 +101,26 @@ public class FileUtil {
return new File[]{searchFile};
} else {
if (searchPath.contains("*")) {
- String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath);
- String fileNameAfterLastFolder = searchPath.substring(folderBeforeRegex.length());
-
- DirectoryScanner scanner = new DirectoryScanner();
- scanner.setIncludes(new String[]{fileNameAfterLastFolder});
- scanner.setBasedir(folderBeforeRegex);
- scanner.setCaseSensitive(true);
- scanner.scan();
- String[] fileNames = scanner.getIncludedFiles();
-
- if (fileNames != null && fileNames.length > 0) {
- File[] files = new File[fileNames.length];
- for (int i = 0; i < fileNames.length; i++) {
- files[i] = new File(folderBeforeRegex + fileNames[i]);
+ try {
+ String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath);
+ String fileNameAfterLastFolder = searchPath.substring(folderBeforeRegex.length());
+
+ DirectoryScanner scanner = new DirectoryScanner();
+ scanner.setIncludes(new String[]{fileNameAfterLastFolder});
+ scanner.setBasedir(folderBeforeRegex);
+ scanner.setCaseSensitive(true);
+ scanner.scan();
+ String[] fileNames = scanner.getIncludedFiles();
+
+ if (fileNames != null && fileNames.length > 0) {
+ File[] files = new File[fileNames.length];
+ for (int i = 0; i < fileNames.length; i++) {
+ files[i] = new File(folderBeforeRegex + fileNames[i]);
+ }
+ return files;
}
- return files;
+ } catch (Exception e) {
+ LOG.warn("Input file not found by pattern (exception thrown); {}, message: {}", searchPath, e.getMessage());
}
} else {
@@ -164,4 +168,8 @@ public class FileUtil {
return beforeRegex;
}
}
+
+ public static boolean isFileTooOld(File file, long diffMin) {
+ return (System.currentTimeMillis() - file.lastModified()) > diffMin * 1000 * 60;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
index 8ee6d00..f8cbb18 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
@@ -50,18 +50,21 @@ public class MetrcisManagerTest {
@Before
public void init() throws Exception {
manager = new MetricsManager();
- manager.init();
mockClient = strictMock(LogFeederAMSClient.class);
Field f = MetricsManager.class.getDeclaredField("amsClient");
f.setAccessible(true);
f.set(manager, mockClient);
-
+
+ EasyMock.expect(mockClient.getCollectorUri(null)).andReturn("null://null:null/null").anyTimes();
capture = EasyMock.newCapture(CaptureType.FIRST);
mockClient.emitMetrics(EasyMock.capture(capture));
EasyMock.expectLastCall().andReturn(true).once();
-
+
replay(mockClient);
+ manager.setAmsClient(mockClient);
+
+ manager.init();
}
@Test
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 068bc3a..0a6c7a8 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -23,7 +23,8 @@ logfeeder.config.files=shipper-conf/global.config.json,\
shipper-conf/input.config-system_message.json,\
shipper-conf/input.config-secure_log.json,\
shipper-conf/input.config-hdfs.json,\
- shipper-conf/input.config-ambari.json
+ shipper-conf/input.config-ambari.json,\
+ shipper-conf/input.config-storm.json
logfeeder.log.filter.enable=true
logfeeder.solr.config.interval=5
logfeeder.solr.core.config.name=history
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
new file mode 100644
index 0000000..34bdcf0
--- /dev/null
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
@@ -0,0 +1,75 @@
+{
+ "input":[
+ {
+ "type":"storm_worker",
+ "rowtype":"service",
+ "path":"/root/test-logs/storm/worker-logs/*/*/worker.log",
+ "init_default_fields": "true"
+ }
+ ],
+ "filter":[
+ {
+ "filter":"grok",
+ "sort_order": 1,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "log4j_format":"",
+ "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
+ "message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}\\s%{JAVACLASS:logger_name}\\s%{DATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}",
+ "post_map_values":{
+ "logtime":{
+ "map_date":{
+ "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS"
+ }
+ }
+ }
+ },
+ {
+ "filter":"grok",
+ "sort_order": 2,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "source_field": "thread_name",
+ "remove_source_field": "false",
+ "message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:storm_component_name}\\-executor%{DATA}|%{DATA})"
+ },
+ {
+ "filter":"grok",
+ "sort_order": 3,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "source_field": "path",
+ "remove_source_field": "false",
+ "message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:storm_topology_id}/%{DATA:storm_worker_port}/worker\\.log"
+ },
+ {
+ "filter":"grok",
+ "sort_order": 4,
+ "conditions":{
+ "fields":{
+ "type":[
+ "storm_worker"
+ ]
+ }
+ },
+ "source_field": "storm_topology_id",
+ "remove_source_field": "false",
+ "message_pattern":"(streamline\\-%{DATA:streamline_topology_id}\\-%{DATA:streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{GREEDYDATA})"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
new file mode 100644
index 0000000..6a10ad9
--- /dev/null
+++ b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
@@ -0,0 +1,4 @@
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Preparing bolt __acker:(5)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Prepared bolt __acker:(5)
+2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier Thread-5-3-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing transport
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 3-NOTIFICATION:[3 3]
\ No newline at end of file