You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2017/11/14 17:29:20 UTC

ambari git commit: AMBARI-22395. Log Feeder: cleanup checkpoint files periodically (oleewere)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.6 93820ddfd -> a22f66f71


AMBARI-22395. Log Feeder: cleanup checkpoint files periodically (oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a22f66f7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a22f66f7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a22f66f7

Branch: refs/heads/branch-2.6
Commit: a22f66f719fd11efd5ecfec09dc5a8112f9ada1b
Parents: 93820dd
Author: Oliver Szabo <ol...@gmail.com>
Authored: Tue Nov 14 13:21:42 2017 +0100
Committer: Oliver Szabo <ol...@gmail.com>
Committed: Tue Nov 14 18:27:47 2017 +0100

----------------------------------------------------------------------
 .../logfeeder/input/AbstractInputFile.java      |  9 +++
 .../apache/ambari/logfeeder/input/Input.java    | 29 +++++---
 .../ambari/logfeeder/input/InputManager.java    | 12 ++++
 .../input/monitor/AbstractLogFileMonitor.java   |  5 +-
 .../input/monitor/CheckpointCleanupMonitor.java | 48 +++++++++++++
 .../input/monitor/LogFileDetachMonitor.java     |  2 +-
 .../input/monitor/LogFilePathUpdateMonitor.java |  2 +-
 .../logfeeder/metrics/LogFeederAMSClient.java   |  3 +
 .../logfeeder/metrics/MetricsManager.java       |  8 ++-
 .../apache/ambari/logfeeder/util/FileUtil.java  | 38 ++++++----
 .../logfeeder/metrics/MetrcisManagerTest.java   |  9 ++-
 .../test-config/logfeeder/logfeeder.properties  |  3 +-
 .../shipper-conf/input.config-storm.json        | 75 ++++++++++++++++++++
 .../streamline-1-TestAgg-2-3/6701/worker.log    |  4 ++
 14 files changed, 210 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index 8548a20..64f2cbc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -54,6 +54,7 @@ public abstract class AbstractInputFile extends Input {
   private int checkPointIntervalMS;
   private Map<String, Object> jsonCheckPoint;
   private InputMarker lastCheckPointInputMarker;
+  private Integer maxAgeMin;
 
   @Override
   protected String getStatMetricName() {
@@ -75,6 +76,7 @@ public abstract class AbstractInputFile extends Input {
     setClosed(true);
     logPath = getStringValue("path");
     tail = getBooleanValue("tail", tail);
+    maxAgeMin = getIntValue("max_age_min", 0);
     checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS);
 
     if (StringUtils.isEmpty(logPath)) {
@@ -286,6 +288,9 @@ public abstract class AbstractInputFile extends Input {
         jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
         jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
         jsonCheckPoint.put("last_write_time_date", new Date());
+        if (maxAgeMin != 0) {
+          jsonCheckPoint.put("max_age_min", maxAgeMin.toString());
+        }
 
         String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
 
@@ -326,4 +331,8 @@ public abstract class AbstractInputFile extends Input {
     return "input:source=" + getStringValue("source") + ", path=" +
         (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
   }
+
+  public Integer getMaxAgeMin() {
+    return maxAgeMin;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 96320e9..7df0b6e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -34,6 +34,7 @@ import org.apache.ambari.logfeeder.input.monitor.LogFilePathUpdateMonitor;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
@@ -190,16 +191,20 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable {
     String folderPath = folderFileEntry.getKey();
     String filePath = new File(getFilePath()).getName();
     String fullPathWithWildCard = String.format("%s/%s", folderPath, filePath);
-    clonedObject.setMultiFolder(false);
-    clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail
-    clonedObject.logPath = fullPathWithWildCard;
-    clonedObject.setLogFileDetacherThread(null);
-    clonedObject.setLogFilePathUpdaterThread(null);
-    clonedObject.setInputChildMap(new HashMap<String, InputFile>());
-    Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard);
-    clonedObject.setThread(thread);
-    inputChildMap.put(fullPathWithWildCard, clonedObject);
-    thread.start();
+    if (clonedObject.getMaxAgeMin() != 0 && FileUtil.isFileTooOld(new File(fullPathWithWildCard), clonedObject.getMaxAgeMin().longValue())) {
+      LOG.info(String.format("File ('%s') is too old (max age min: %d), monitor thread not starting...", getFilePath(), clonedObject.getMaxAgeMin()));
+    } else {
+      clonedObject.setMultiFolder(false);
+      clonedObject.logFiles = folderFileEntry.getValue().toArray(new File[0]); // TODO: works only with tail
+      clonedObject.logPath = fullPathWithWildCard;
+      clonedObject.setLogFileDetacherThread(null);
+      clonedObject.setLogFilePathUpdaterThread(null);
+      clonedObject.setInputChildMap(new HashMap<String, InputFile>());
+      Thread thread = new Thread(threadGroup, clonedObject, "file=" + fullPathWithWildCard);
+      clonedObject.setThread(thread);
+      inputChildMap.put(fullPathWithWildCard, clonedObject);
+      thread.start();
+    }
   }
 
   public void stopChildInputFileThread(String folderPathKey) {
@@ -208,7 +213,9 @@ public abstract class Input extends ConfigBlock implements Runnable, Cloneable {
     String fullPathWithWildCard = String.format("%s/%s", folderPathKey, filePath);
     if (inputChildMap.containsKey(fullPathWithWildCard)) {
       InputFile inputFile = inputChildMap.get(fullPathWithWildCard);
-      inputFile.getThread().interrupt();
+      if (inputFile.getThread() != null && inputFile.getThread().isAlive()) {
+        inputFile.getThread().interrupt();
+      }
       inputChildMap.remove(fullPathWithWildCard);
     } else {
       LOG.warn(fullPathWithWildCard + " not found as an input child.");

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
index 8e70850..9b472ad 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.ambari.logfeeder.input.monitor.CheckpointCleanupMonitor;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -139,6 +140,10 @@ public class InputManager {
       if (isCheckPointFolderValid) {
         LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints");
       }
+      // check checkpoint cleanup every 2000 min
+      Thread checkpointCleanupThread = new Thread(new CheckpointCleanupMonitor(this, 2000),"checkpoint_cleanup");
+      checkpointCleanupThread.setDaemon(true);
+      checkpointCleanupThread.start();
     }
 
   }
@@ -269,6 +274,10 @@ public class InputManager {
 
             String logFilePath = (String) jsonCheckPoint.get("file_path");
             String logFileKey = (String) jsonCheckPoint.get("file_key");
+            Integer maxAgeMin = null;
+            if (jsonCheckPoint.containsKey("max_age_min")) {
+              maxAgeMin = Integer.parseInt(jsonCheckPoint.get("max_age_min").toString());
+            }
             if (logFilePath != null && logFileKey != null) {
               boolean deleteCheckPointFile = false;
               File logFile = new File(logFilePath);
@@ -279,6 +288,9 @@ public class InputManager {
                   deleteCheckPointFile = true;
                   LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
                       logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+                } else if (maxAgeMin != null && maxAgeMin != 0 && FileUtil.isFileTooOld(logFile, maxAgeMin)) {
+                  deleteCheckPointFile = true;
+                  LOG.info("Checkpoint clean: File reached max age minutes (" + maxAgeMin + "):" + logFilePath);
                 }
               } else {
                 LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
index 3910b9b..f01b39b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/AbstractLogFileMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.logfeeder.input.monitor;
 
 import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.FileUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,10 +61,6 @@ public abstract class AbstractLogFileMonitor implements Runnable {
     }
   }
 
-  protected boolean isFileTooOld(File monitoredFile, long detachTimeMin) {
-    return (System.currentTimeMillis() - monitoredFile.lastModified()) > detachTimeMin * 60 * 1000;
-  }
-
   protected abstract String getStartLog();
 
   protected abstract void monitorAndUpdate() throws Exception;

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
new file mode 100644
index 0000000..b8377c1
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/CheckpointCleanupMonitor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.monitor;
+
+import org.apache.ambari.logfeeder.input.InputManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CheckpointCleanupMonitor implements Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CheckpointCleanupMonitor.class);
+
+  private long waitIntervalMin;
+  private InputManager inputManager;
+
+  public CheckpointCleanupMonitor(InputManager inputManager, long waitIntervalMin) {
+    this.waitIntervalMin = waitIntervalMin;
+    this.inputManager = inputManager;
+  }
+
+  @Override
+  public void run() {
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        Thread.sleep(1000 * 60 * waitIntervalMin);
+        inputManager.cleanCheckPointFiles();
+      } catch (Exception e) {
+        LOG.error("Cleanup checkpoint files thread interrupted.", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
index 322a56d..0eb8ce8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFileDetachMonitor.java
@@ -58,7 +58,7 @@ public class LogFileDetachMonitor extends AbstractLogFileMonitor {
         for (Map.Entry<String, InputFile> inputFileEntry : copiedInputFileMap.entrySet()) {
           if (inputFileEntry.getKey().startsWith(entry.getKey())) {
             File monitoredFile = entry.getValue().get(0);
-            boolean isFileTooOld = isFileTooOld(monitoredFile, getDetachTime());
+            boolean isFileTooOld = FileUtil.isFileTooOld(monitoredFile, getDetachTime());
             if (isFileTooOld) {
               LOG.info("File ('{}') in folder ('{}') is too old (reached {} minutes), detach input thread.", entry.getKey(), getDetachTime());
               getInputFile().stopChildInputFileThread(entry.getKey());

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
index cc5d664..fcc9618 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/monitor/LogFilePathUpdateMonitor.java
@@ -63,7 +63,7 @@ public class LogFilePathUpdateMonitor extends AbstractLogFileMonitor {
       } else {
         LOG.info("New log file folder found: {}, start a new thread if tail file is not too old.", entry.getKey());
         File monitoredFile = entry.getValue().get(0);
-        if (isFileTooOld(monitoredFile, getDetachTime())) {
+        if (FileUtil.isFileTooOld(monitoredFile, getDetachTime())) {
           LOG.info("'{}' file is too old. No new thread start needed.", monitoredFile.getAbsolutePath());
         } else {
           getInputFile().startNewChildInputFileThread(entry);

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 2d1bf40..c0092c7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -64,6 +64,9 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
 
   @Override
   public String getCollectorUri(String host) {
+    if (collectorProtocol == null || host == null || collectorPort == null || collectorPath == null) {
+      return null;
+    }
     return String.format("%s://%s:%s%s", collectorProtocol, host, collectorPort, collectorPath);
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
index 1432c87..ebf7c6c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
@@ -47,7 +47,9 @@ public class MetricsManager {
 
   public void init() {
     LOG.info("Initializing MetricsManager()");
-    amsClient = new LogFeederAMSClient();
+    if (amsClient == null) {
+      amsClient = new LogFeederAMSClient();
+    }
 
     if (amsClient.getCollectorUri(null) != null) {
       findNodeHostName();
@@ -165,4 +167,8 @@ public class MetricsManager {
           (currMS - lastPublishTimeMS) / 1000 + " seconds ago, intervalConfigured=" + publishIntervalMS / 1000);
     }
   }
+
+  public void setAmsClient(LogFeederAMSClient amsClient) {
+    this.amsClient = amsClient;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index 843ae6b..86f0903 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -101,22 +101,26 @@ public class FileUtil {
       return new File[]{searchFile};
     } else {
       if (searchPath.contains("*")) {
-        String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath);
-        String fileNameAfterLastFolder = searchPath.substring(folderBeforeRegex.length());
-
-        DirectoryScanner scanner = new DirectoryScanner();
-        scanner.setIncludes(new String[]{fileNameAfterLastFolder});
-        scanner.setBasedir(folderBeforeRegex);
-        scanner.setCaseSensitive(true);
-        scanner.scan();
-        String[] fileNames = scanner.getIncludedFiles();
-
-        if (fileNames != null && fileNames.length > 0) {
-          File[] files = new File[fileNames.length];
-          for (int i = 0; i < fileNames.length; i++) {
-            files[i] = new File(folderBeforeRegex + fileNames[i]);
+        try {
+          String folderBeforeRegex = getLogDirNameBeforeWildCard(searchPath);
+          String fileNameAfterLastFolder = searchPath.substring(folderBeforeRegex.length());
+
+          DirectoryScanner scanner = new DirectoryScanner();
+          scanner.setIncludes(new String[]{fileNameAfterLastFolder});
+          scanner.setBasedir(folderBeforeRegex);
+          scanner.setCaseSensitive(true);
+          scanner.scan();
+          String[] fileNames = scanner.getIncludedFiles();
+
+          if (fileNames != null && fileNames.length > 0) {
+            File[] files = new File[fileNames.length];
+            for (int i = 0; i < fileNames.length; i++) {
+              files[i] = new File(folderBeforeRegex + fileNames[i]);
+            }
+            return files;
           }
-          return files;
+        } catch (Exception e) {
+          LOG.warn("Input file not found by pattern (exception thrown); {}, message: {}", searchPath, e.getMessage());
         }
 
       } else {
@@ -164,4 +168,8 @@ public class FileUtil {
       return beforeRegex;
     }
   }
+
+  public static boolean isFileTooOld(File file, long diffMin) {
+    return (System.currentTimeMillis() - file.lastModified()) > diffMin * 1000 * 60;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
index 8ee6d00..f8cbb18 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetrcisManagerTest.java
@@ -50,18 +50,21 @@ public class MetrcisManagerTest {
   @Before
   public void init() throws Exception {
     manager = new MetricsManager();
-    manager.init();
     
     mockClient = strictMock(LogFeederAMSClient.class);
     Field f = MetricsManager.class.getDeclaredField("amsClient");
     f.setAccessible(true);
     f.set(manager, mockClient);
-    
+
+    EasyMock.expect(mockClient.getCollectorUri(null)).andReturn("null://null:null/null").anyTimes();
     capture = EasyMock.newCapture(CaptureType.FIRST);
     mockClient.emitMetrics(EasyMock.capture(capture));
     EasyMock.expectLastCall().andReturn(true).once();
-    
+
     replay(mockClient);
+    manager.setAmsClient(mockClient);
+
+    manager.init();
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 068bc3a..0a6c7a8 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -23,7 +23,8 @@ logfeeder.config.files=shipper-conf/global.config.json,\
   shipper-conf/input.config-system_message.json,\
   shipper-conf/input.config-secure_log.json,\
   shipper-conf/input.config-hdfs.json,\
-  shipper-conf/input.config-ambari.json
+  shipper-conf/input.config-ambari.json,\
+  shipper-conf/input.config-storm.json
 logfeeder.log.filter.enable=true
 logfeeder.solr.config.interval=5
 logfeeder.solr.core.config.name=history

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
new file mode 100644
index 0000000..34bdcf0
--- /dev/null
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-storm.json
@@ -0,0 +1,75 @@
+{
+  "input":[
+    {
+      "type":"storm_worker",
+      "rowtype":"service",
+      "path":"/root/test-logs/storm/worker-logs/*/*/worker.log",
+      "init_default_fields": "true"
+    }
+  ],
+  "filter":[
+    {
+      "filter":"grok",
+      "sort_order": 1,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "log4j_format":"",
+      "multiline_pattern":"^(%{TIMESTAMP_ISO8601:logtime})",
+      "message_pattern":"(?m)^%{TIMESTAMP_ISO8601:logtime}\\s%{JAVACLASS:logger_name}\\s%{DATA:thread_name}\\s\\[%{LOGLEVEL:level}\\]\\s%{GREEDYDATA:log_message}",
+      "post_map_values":{
+        "logtime":{
+          "map_date":{
+            "target_date_pattern":"yyyy-MM-dd HH:mm:ss.SSS"
+          }
+        }
+      }
+    },
+    {
+      "filter":"grok",
+      "sort_order": 2,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "source_field": "thread_name",
+      "remove_source_field": "false",
+      "message_pattern":"(Thread\\-[\\-0-9]+\\-*[\\-0-9]*\\-%{DATA:storm_component_name}\\-executor%{DATA}|%{DATA})"
+    },
+    {
+      "filter":"grok",
+      "sort_order": 3,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "source_field": "path",
+      "remove_source_field": "false",
+      "message_pattern":"/root/test-logs/storm/worker-logs/%{DATA:storm_topology_id}/%{DATA:storm_worker_port}/worker\\.log"
+    },
+    {
+      "filter":"grok",
+      "sort_order": 4,
+      "conditions":{
+        "fields":{
+          "type":[
+            "storm_worker"
+          ]
+        }
+      },
+      "source_field": "storm_topology_id",
+      "remove_source_field": "false",
+      "message_pattern":"(streamline\\-%{DATA:streamline_topology_id}\\-%{DATA:streamline_topology_name}\\-[0-9]+\\-[0-9]+)|(%{GREEDYDATA})"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/a22f66f7/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
new file mode 100644
index 0000000..6a10ad9
--- /dev/null
+++ b/ambari-logsearch/docker/test-logs/storm/worker-logs/streamline-1-TestAgg-2-3/6701/worker.log
@@ -0,0 +1,4 @@
+2017-10-23 13:41:43.481 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Preparing bolt __acker:(5)
+2017-10-23 13:41:43.483 o.a.s.d.executor Thread-11-__acker-executor[5 5] [INFO] Prepared bolt __acker:(5)
+2017-10-23 13:41:48.834 c.h.s.s.n.EmailNotifier Thread-5-3-NOTIFICATION-executor[3 3] [ERROR] Got exception while initializing transport
+2017-10-23 13:41:58.242 o.a.s.d.executor main [INFO] Loading executor 3-NOTIFICATION:[3 3]
\ No newline at end of file