You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/07 20:42:08 UTC
[07/10] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos
Gergely via oleewere)
AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8399960d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8399960d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8399960d
Branch: refs/heads/branch-dev-logsearch
Commit: 8399960da7f013c7df7ca45622ceb0c0d1e36e71
Parents: 0a30feb
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Sep 7 22:24:51 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Wed Sep 7 22:24:51 2016 +0200
----------------------------------------------------------------------
.../org/apache/ambari/logfeeder/LogFeeder.java | 594 +++++++------------
.../ambari/logfeeder/common/ConfigBlock.java | 71 +--
.../logfeeder/common/LogFeederConstants.java | 39 ++
.../apache/ambari/logfeeder/filter/Filter.java | 55 +-
.../ambari/logfeeder/filter/FilterGrok.java | 94 +--
.../ambari/logfeeder/filter/FilterJSON.java | 8 +-
.../ambari/logfeeder/filter/FilterKeyValue.java | 44 +-
.../logfeeder/input/AbstractInputFile.java | 319 ++++++++++
.../apache/ambari/logfeeder/input/Input.java | 313 +++++-----
.../ambari/logfeeder/input/InputFile.java | 503 ++--------------
.../ambari/logfeeder/input/InputManager.java | 379 ++++++++++++
.../ambari/logfeeder/input/InputMarker.java | 17 +-
.../apache/ambari/logfeeder/input/InputMgr.java | 451 --------------
.../ambari/logfeeder/input/InputS3File.java | 424 +------------
.../ambari/logfeeder/input/InputSimulate.java | 40 +-
.../logfeeder/input/reader/GZIPReader.java | 23 +-
.../input/reader/LogsearchReaderFactory.java | 8 +-
.../logconfig/FetchConfigFromSolr.java | 194 ------
.../logfeeder/logconfig/FilterLogData.java | 83 +++
.../logfeeder/logconfig/LogConfigFetcher.java | 168 ++++++
.../logfeeder/logconfig/LogConfigHandler.java | 189 ++++++
.../logfeeder/logconfig/LogFeederConstants.java | 34 --
.../logfeeder/logconfig/LogFeederFilter.java | 90 +++
.../logconfig/LogFeederFilterWrapper.java | 55 ++
.../logfeeder/logconfig/LogfeederScheduler.java | 59 --
.../logconfig/filter/ApplyLogFilter.java | 62 --
.../logconfig/filter/DefaultDataFilter.java | 49 --
.../logconfig/filter/FilterLogData.java | 53 --
.../apache/ambari/logfeeder/mapper/Mapper.java | 14 +-
.../ambari/logfeeder/mapper/MapperDate.java | 32 +-
.../logfeeder/mapper/MapperFieldName.java | 20 +-
.../logfeeder/mapper/MapperFieldValue.java | 31 +-
.../logfeeder/metrics/LogFeederAMSClient.java | 10 +-
.../ambari/logfeeder/metrics/MetricCount.java | 31 -
.../ambari/logfeeder/metrics/MetricData.java | 46 ++
.../logfeeder/metrics/MetricsManager.java | 168 ++++++
.../ambari/logfeeder/metrics/MetricsMgr.java | 178 ------
.../apache/ambari/logfeeder/output/Output.java | 13 +-
.../ambari/logfeeder/output/OutputData.java | 8 +-
.../ambari/logfeeder/output/OutputDevNull.java | 7 +-
.../ambari/logfeeder/output/OutputFile.java | 42 +-
.../ambari/logfeeder/output/OutputHDFSFile.java | 70 +--
.../ambari/logfeeder/output/OutputKafka.java | 58 +-
.../ambari/logfeeder/output/OutputManager.java | 250 ++++++++
.../ambari/logfeeder/output/OutputMgr.java | 263 --------
.../ambari/logfeeder/output/OutputS3File.java | 41 +-
.../ambari/logfeeder/output/OutputSolr.java | 62 +-
.../logfeeder/output/S3LogPathResolver.java | 6 +-
.../logfeeder/output/S3OutputConfiguration.java | 5 +-
.../ambari/logfeeder/output/S3Uploader.java | 64 +-
.../logfeeder/output/spool/LogSpooler.java | 23 +-
.../output/spool/LogSpoolerContext.java | 2 +-
.../output/spool/LogSpoolerException.java | 2 +-
.../output/spool/RolloverCondition.java | 2 +-
.../logfeeder/output/spool/RolloverHandler.java | 2 +-
.../apache/ambari/logfeeder/util/AWSUtil.java | 52 +-
.../apache/ambari/logfeeder/util/AliasUtil.java | 103 ++--
.../ambari/logfeeder/util/CompressionUtil.java | 15 +-
.../apache/ambari/logfeeder/util/DateUtil.java | 39 +-
.../apache/ambari/logfeeder/util/FileUtil.java | 66 ++-
.../ambari/logfeeder/util/LogFeederUtil.java | 511 +++++-----------
.../logfeeder/util/LogfeederHDFSUtil.java | 58 +-
.../ambari/logfeeder/util/PlaceholderUtil.java | 32 +-
.../apache/ambari/logfeeder/util/S3Util.java | 81 +--
.../apache/ambari/logfeeder/util/SolrUtil.java | 186 ------
.../ambari/logfeeder/view/VLogfeederFilter.java | 90 ---
.../logfeeder/view/VLogfeederFilterWrapper.java | 55 --
.../org/apache/ambari/logfeeder/AppTest.java | 116 ----
.../ambari/logfeeder/filter/FilterGrokTest.java | 55 +-
.../ambari/logfeeder/filter/FilterJSONTest.java | 41 +-
.../logfeeder/filter/FilterKeyValueTest.java | 34 +-
.../ambari/logfeeder/input/InputFileTest.java | 24 +-
.../logfeeder/input/InputManagerTest.java | 241 ++++++++
.../logconfig/LogConfigHandlerTest.java | 117 ++++
.../ambari/logfeeder/mapper/MapperDateTest.java | 17 +-
.../logfeeder/mapper/MapperFieldNameTest.java | 2 +-
.../logfeeder/mapper/MapperFieldValueTest.java | 2 +-
.../logfeeder/metrics/MetrcisManagerTest.java | 128 ++++
.../logfeeder/output/OutputKafkaTest.java | 5 +-
.../logfeeder/output/OutputManagerTest.java | 256 ++++++++
.../logfeeder/output/OutputS3FileTest.java | 17 +-
.../ambari/logfeeder/output/OutputSolrTest.java | 5 +-
.../logfeeder/output/S3LogPathResolverTest.java | 2 +-
.../ambari/logfeeder/output/S3UploaderTest.java | 42 +-
.../logfeeder/output/spool/LogSpoolerTest.java | 2 +-
.../ambari/logfeeder/util/AWSUtilTest.java | 29 -
.../logfeeder/util/PlaceholderUtilTest.java | 20 +-
.../ambari/logfeeder/util/S3UtilTest.java | 4 +-
.../src/test/resources/logfeeder.properties | 20 +
89 files changed, 3854 insertions(+), 4481 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 373d743..6d0f22c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -20,14 +20,10 @@
package org.apache.ambari.logfeeder;
import java.io.BufferedInputStream;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
@@ -39,18 +35,21 @@ import java.util.Set;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMgr;
+import org.apache.ambari.logfeeder.input.InputManager;
import org.apache.ambari.logfeeder.input.InputSimulate;
-import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.metrics.MetricsMgr;
+import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.metrics.MetricsManager;
import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.AliasUtil;
import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -58,171 +57,142 @@ import org.apache.log4j.Logger;
import com.google.gson.reflect.TypeToken;
public class LogFeeder {
- private static final Logger logger = Logger.getLogger(LogFeeder.class);
+ private static final Logger LOG = Logger.getLogger(LogFeeder.class);
private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
+ private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours
- private Collection<Output> outputList = new ArrayList<Output>();
+ private OutputManager outputManager = new OutputManager();
+ private InputManager inputManager = new InputManager();
+ private MetricsManager metricsManager = new MetricsManager();
- private OutputMgr outMgr = new OutputMgr();
- private InputMgr inputMgr = new InputMgr();
- private MetricsMgr metricsMgr = new MetricsMgr();
+ public static Map<String, Object> globalConfigs = new HashMap<>();
- public static Map<String, Object> globalMap = null;
- private String[] inputParams;
-
- private List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>();
- private List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
- private List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>();
- private List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>();
+ private List<Map<String, Object>> inputConfigList = new ArrayList<>();
+ private List<Map<String, Object>> filterConfigList = new ArrayList<>();
+ private List<Map<String, Object>> outputConfigList = new ArrayList<>();
- private int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours
private long lastCheckPointCleanedMS = 0;
-
- private static boolean isLogfeederCompleted = false;
-
+ private boolean isLogfeederCompleted = false;
private Thread statLoggerThread = null;
- private LogFeeder(String[] args) {
- inputParams = args;
+ private LogFeeder() {}
+
+ public void run() {
+ try {
+ init();
+ monitor();
+ waitOnAllDaemonThreads();
+ } catch (Throwable t) {
+ LOG.fatal("Caught exception in main.", t);
+ System.exit(1);
+ }
}
private void init() throws Throwable {
+ Date startTime = new Date();
- LogFeederUtil.loadProperties("logfeeder.properties", inputParams);
-
- String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files");
- logger.info("logfeeder.config.files=" + configFiles);
+ loadConfigFiles();
+ addSimulatedInputs();
+ mergeAllConfigs();
- String[] configFileList = null;
- if (configFiles != null) {
- configFileList = configFiles.split(",");
- }
- //list of config those are there in cmd line config dir , end with .json
- String[] cmdLineConfigs = getConfigFromCmdLine();
- //merge both config
- String mergedConfigList[] = LogFeederUtil.mergeArray(configFileList,
- cmdLineConfigs);
- //mergedConfigList is null then set default conifg
- if (mergedConfigList == null || mergedConfigList.length == 0) {
- mergedConfigList = LogFeederUtil.getStringProperty("config.file",
- "config.json").split(",");
- }
- for (String configFileName : mergedConfigList) {
- logger.info("Going to load config file:" + configFileName);
- //escape space from config file path
- configFileName= configFileName.replace("\\ ", "%20");
+ LogConfigHandler.handleConfig();
+
+ outputManager.init();
+ inputManager.init();
+ metricsManager.init();
+
+ LOG.debug("==============");
+
+ Date endTime = new Date();
+ LOG.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize");
+ }
+
+ private void loadConfigFiles() throws Exception {
+ List<String> configFiles = getConfigFiles();
+ for (String configFileName : configFiles) {
+ LOG.info("Going to load config file:" + configFileName);
+ configFileName = configFileName.replace("\\ ", "%20");
File configFile = new File(configFileName);
if (configFile.exists() && configFile.isFile()) {
- logger.info("Config file exists in path."
- + configFile.getAbsolutePath());
+ LOG.info("Config file exists in path." + configFile.getAbsolutePath());
loadConfigsUsingFile(configFile);
} else {
- // Let's try to load it from class loader
- logger.info("Trying to load config file from classloader: "
- + configFileName);
+ LOG.info("Trying to load config file from classloader: " + configFileName);
loadConfigsUsingClassLoader(configFileName);
- logger.info("Loaded config file from classloader: "
- + configFileName);
+ LOG.info("Loaded config file from classloader: " + configFileName);
}
}
-
- addSimulatedInputs();
-
- mergeAllConfigs();
-
- LogfeederScheduler.INSTANCE.start();
-
- outMgr.setOutputList(outputList);
- for (Output output : outputList) {
- output.init();
- }
- inputMgr.init();
- metricsMgr.init();
- logger.debug("==============");
}
- private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
- BufferedInputStream fileInputStream = null;
- BufferedReader br = null;
- try {
- fileInputStream = (BufferedInputStream) this
- .getClass().getClassLoader()
- .getResourceAsStream(configFileName);
- if (fileInputStream != null) {
- br = new BufferedReader(new InputStreamReader(
- fileInputStream));
- String configData = readFile(br);
- loadConfigs(configData);
- } else {
- throw new Exception("Can't find configFile=" + configFileName);
- }
- } finally {
- if (br != null) {
- try {
- br.close();
- } catch (IOException e) {
- }
- }
+ private List<String> getConfigFiles() {
+ List<String> configFiles = new ArrayList<>();
+
+ String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
+ LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
+ if (logfeederConfigFilesProperty != null) {
+ configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
+ }
- if (fileInputStream != null) {
- try {
- fileInputStream.close();
- } catch (IOException e) {
- }
+ String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
+ if (StringUtils.isNotEmpty(inputConfigDir)) {
+ File configDirFile = new File(inputConfigDir);
+ List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
+ for (File inputConfigFile : inputConfigFiles) {
+ configFiles.add(inputConfigFile.getAbsolutePath());
}
}
+
+ if (CollectionUtils.isEmpty(configFiles)) {
+ String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
+ configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
+ }
+
+ return configFiles;
}
- /**
- * This method loads the configurations from the given file.
- */
private void loadConfigsUsingFile(File configFile) throws Exception {
- FileInputStream fileInputStream = null;
try {
- fileInputStream = new FileInputStream(configFile);
- BufferedReader br = new BufferedReader(new InputStreamReader(
- fileInputStream));
- String configData = readFile(br);
+ String configData = FileUtils.readFileToString(configFile);
loadConfigs(configData);
} catch (Exception t) {
- logger.error("Error opening config file. configFilePath="
- + configFile.getAbsolutePath());
+ LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
throw t;
- } finally {
- if (fileInputStream != null) {
- try {
- fileInputStream.close();
- } catch (Throwable t) {
- // ignore
- }
- }
+ }
+ }
+
+ private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
+ try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
+ String configData = IOUtils.toString(fis);
+ loadConfigs(configData);
}
}
@SuppressWarnings("unchecked")
private void loadConfigs(String configData) throws Exception {
- Type type = new TypeToken<Map<String, Object>>() {
- }.getType();
- Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(
- configData, type);
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
+ Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
// Get the globals
for (String key : configMap.keySet()) {
- if (key.equalsIgnoreCase("global")) {
- globalConfigList.add((Map<String, Object>) configMap.get(key));
- } else if (key.equalsIgnoreCase("input")) {
- List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
- .get(key);
- inputConfigList.addAll(mapList);
- } else if (key.equalsIgnoreCase("filter")) {
- List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
- .get(key);
- filterConfigList.addAll(mapList);
- } else if (key.equalsIgnoreCase("output")) {
- List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
- .get(key);
- outputConfigList.addAll(mapList);
+ switch (key) {
+ case "global" :
+ globalConfigs.putAll((Map<String, Object>) configMap.get(key));
+ break;
+ case "input" :
+ List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
+ inputConfigList.addAll(inputConfig);
+ break;
+ case "filter" :
+ List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
+ filterConfigList.addAll(filterConfig);
+ break;
+ case "output" :
+ List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
+ outputConfigList.addAll(outputConfig);
+ break;
+ default :
+ LOG.warn("Unknown config key: " + key);
}
}
}
@@ -244,231 +214,175 @@ public class LogFeeder {
}
private void mergeAllConfigs() {
- globalMap = mergeConfigs(globalConfigList);
+ loadOutputs();
+ loadInputs();
+ loadFilters();
+
+ assignOutputsToInputs();
+ }
- sortBlocks(filterConfigList);
- // First loop for output
+ private void loadOutputs() {
for (Map<String, Object> map : outputConfigList) {
if (map == null) {
continue;
}
- mergeBlocks(globalMap, map);
+ mergeBlocks(globalConfigs, map);
String value = (String) map.get("destination");
- Output output;
- if (value == null || value.isEmpty()) {
- logger.error("Output block doesn't have destination element");
- continue;
- }
- String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.OUTPUT, ALIAS_PARAM.KLASS);
- if (classFullName == null || classFullName.isEmpty()) {
- logger.error("Destination block doesn't have output element");
+ if (StringUtils.isEmpty(value)) {
+ LOG.error("Output block doesn't have destination element");
continue;
}
- output = (Output) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.OUTPUT);
-
+ Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
if (output == null) {
- logger.error("Destination Object is null");
+ LOG.error("Output object could not be found");
continue;
}
-
output.setDestination(value);
output.loadConfig(map);
- // We will only check for is_enabled out here. Down below we will
- // check whether this output is enabled for the input
- boolean isEnabled = output.getBooleanValue("is_enabled", true);
- if (isEnabled) {
- outputList.add(output);
+ // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
+ if (output.getBooleanValue("is_enabled", true)) {
output.logConfgs(Level.INFO);
+ outputManager.add(output);
} else {
- logger.info("Output is disabled. So ignoring it. "
- + output.getShortDescription());
+ LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
}
}
+ }
- // Second loop for input
+ private void loadInputs() {
for (Map<String, Object> map : inputConfigList) {
if (map == null) {
continue;
}
- mergeBlocks(globalMap, map);
+ mergeBlocks(globalConfigs, map);
String value = (String) map.get("source");
- Input input;
- if (value == null || value.isEmpty()) {
- logger.error("Input block doesn't have source element");
- continue;
- }
- String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.INPUT, ALIAS_PARAM.KLASS);
- if (classFullName == null || classFullName.isEmpty()) {
- logger.error("Source block doesn't have source element");
+ if (StringUtils.isEmpty(value)) {
+ LOG.error("Input block doesn't have source element");
continue;
}
- input = (Input) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.INPUT);
-
+ Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
if (input == null) {
- logger.error("Source Object is null");
+ LOG.error("Input object could not be found");
continue;
}
-
input.setType(value);
input.loadConfig(map);
if (input.isEnabled()) {
- input.setOutputMgr(outMgr);
- input.setInputMgr(inputMgr);
- inputMgr.add(input);
+ input.setOutputManager(outputManager);
+ input.setInputManager(inputManager);
+ inputManager.add(input);
input.logConfgs(Level.INFO);
} else {
- logger.info("Input is disabled. So ignoring it. "
- + input.getShortDescription());
+ LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
}
}
+ }
+
+ private void loadFilters() {
+ sortFilters();
- // Third loop is for filter, but we will have to create a filter
- // instance for each input, so it can maintain the state per input
List<Input> toRemoveInputList = new ArrayList<Input>();
- for (Input input : inputMgr.getInputList()) {
- Filter prevFilter = null;
+ for (Input input : inputManager.getInputList()) {
for (Map<String, Object> map : filterConfigList) {
if (map == null) {
continue;
}
- mergeBlocks(globalMap, map);
+ mergeBlocks(globalConfigs, map);
String value = (String) map.get("filter");
- Filter filter;
- if (value == null || value.isEmpty()) {
- logger.error("Filter block doesn't have filter element");
- continue;
- }
-
- String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.FILTER, ALIAS_PARAM.KLASS);
- if (classFullName == null || classFullName.isEmpty()) {
- logger.error("Filter block doesn't have filter element");
+ if (StringUtils.isEmpty(value)) {
+ LOG.error("Filter block doesn't have filter element");
continue;
}
- filter = (Filter) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.FILTER);
-
+ Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
if (filter == null) {
- logger.error("Filter Object is null");
+ LOG.error("Filter object could not be found");
continue;
}
filter.loadConfig(map);
filter.setInput(input);
if (filter.isEnabled()) {
- filter.setOutputMgr(outMgr);
- if (prevFilter == null) {
- input.setFirstFilter(filter);
- } else {
- prevFilter.setNextFilter(filter);
- }
- prevFilter = filter;
+ filter.setOutputManager(outputManager);
+ input.addFilter(filter);
filter.logConfgs(Level.INFO);
} else {
- logger.debug("Ignoring filter "
- + filter.getShortDescription() + " for input "
- + input.getShortDescription());
+ LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
}
}
+
if (input.getFirstFilter() == null) {
toRemoveInputList.add(input);
}
}
- // Fourth loop is for associating valid outputs to input
- Set<Output> usedOutputSet = new HashSet<Output>();
- for (Input input : inputMgr.getInputList()) {
- for (Output output : outputList) {
- boolean ret = LogFeederUtil.isEnabled(output.getConfigs(),
- input.getConfigs());
- if (ret) {
- usedOutputSet.add(output);
- input.addOutput(output);
- }
- }
- }
- outputList = usedOutputSet;
-
for (Input toRemoveInput : toRemoveInputList) {
- logger.warn("There are no filters, we will ignore this input. "
- + toRemoveInput.getShortDescription());
- inputMgr.removeInput(toRemoveInput);
+ LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
+ inputManager.removeInput(toRemoveInput);
}
}
- private void sortBlocks(List<Map<String, Object>> blockList) {
-
- Collections.sort(blockList, new Comparator<Map<String, Object>>() {
+ private void sortFilters() {
+ Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
@Override
public int compare(Map<String, Object> o1, Map<String, Object> o2) {
Object o1Sort = o1.get("sort_order");
Object o2Sort = o2.get("sort_order");
- if (o1Sort == null) {
- return 0;
- }
- if (o2Sort == null) {
+ if (o1Sort == null || o2Sort == null) {
return 0;
}
- int o1Value = 0;
- if (!(o1Sort instanceof Number)) {
- try {
- o1Value = (new Double(Double.parseDouble(o1Sort
- .toString()))).intValue();
- } catch (Throwable t) {
- logger.error("Value is not of type Number. class="
- + o1Sort.getClass().getName() + ", value="
- + o1Sort.toString() + ", map=" + o1.toString());
- }
- } else {
- o1Value = ((Number) o1Sort).intValue();
- }
- int o2Value = 0;
- if (!(o2Sort instanceof Integer)) {
+
+ int o1Value = parseSort(o1, o1Sort);
+ int o2Value = parseSort(o2, o2Sort);
+
+ return o1Value - o2Value;
+ }
+
+ private int parseSort(Map<String, Object> map, Object o) {
+ if (!(o instanceof Number)) {
try {
- o2Value = (new Double(Double.parseDouble(o2Sort
- .toString()))).intValue();
+ return (new Double(Double.parseDouble(o.toString()))).intValue();
} catch (Throwable t) {
- logger.error("Value is not of type Number. class="
- + o2Sort.getClass().getName() + ", value="
- + o2Sort.toString() + ", map=" + o2.toString());
+ LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
+ + ", map=" + map.toString());
+ return 0;
}
} else {
-
+ return ((Number) o).intValue();
}
- return o1Value - o2Value;
}
});
}
- private Map<String, Object> mergeConfigs(
- List<Map<String, Object>> configList) {
- Map<String, Object> mergedConfig = new HashMap<String, Object>();
- for (Map<String, Object> config : configList) {
- mergeBlocks(config, mergedConfig);
+ private void assignOutputsToInputs() {
+ Set<Output> usedOutputSet = new HashSet<Output>();
+ for (Input input : inputManager.getInputList()) {
+ for (Output output : outputManager.getOutputs()) {
+ if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
+ usedOutputSet.add(output);
+ input.addOutput(output);
+ }
+ }
}
- return mergedConfig;
+ outputManager.retainUsedOutputs(usedOutputSet);
}
- private void mergeBlocks(Map<String, Object> fromMap,
- Map<String, Object> toMap) {
- // Merge the non-string
+ @SuppressWarnings("unchecked")
+ private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
for (String key : fromMap.keySet()) {
Object objValue = fromMap.get(key);
if (objValue == null) {
continue;
}
if (objValue instanceof Map) {
- @SuppressWarnings("unchecked")
- Map<String, Object> globalFields = LogFeederUtil
- .cloneObject((Map<String, Object>) fromMap.get(key));
+ Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
- @SuppressWarnings("unchecked")
- Map<String, Object> localFields = (Map<String, Object>) toMap
- .get(key);
+ Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
if (localFields == null) {
localFields = new HashMap<String, Object>();
toMap.put(key, localFields);
@@ -477,8 +391,7 @@ public class LogFeeder {
if (globalFields != null) {
for (String fieldKey : globalFields.keySet()) {
if (!localFields.containsKey(fieldKey)) {
- localFields.put(fieldKey,
- globalFields.get(fieldKey));
+ localFields.put(fieldKey, globalFields.get(fieldKey));
}
}
}
@@ -493,11 +406,29 @@ public class LogFeeder {
}
}
+ private class JVMShutdownHook extends Thread {
+
+ public void run() {
+ try {
+ LOG.info("Processing is shutting down.");
+
+ inputManager.close();
+ outputManager.close();
+ inputManager.checkInAll();
+
+ logStats();
+
+ LOG.info("LogSearch is exiting.");
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+
private void monitor() throws Exception {
- inputMgr.monitor();
+ inputManager.monitor();
JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
- ShutdownHookManager.get().addShutdownHook(logfeederJVMHook,
- LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
+ ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
statLoggerThread = new Thread("statLogger") {
@@ -512,17 +443,14 @@ public class LogFeeder {
try {
logStats();
} catch (Throwable t) {
- logger.error(
- "LogStats: Caught exception while logging stats.",
- t);
+ LOG.error("LogStats: Caught exception while logging stats.", t);
}
- if (System.currentTimeMillis() > (lastCheckPointCleanedMS + checkPointCleanIntervalMS)) {
+ if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
lastCheckPointCleanedMS = System.currentTimeMillis();
- inputMgr.cleanCheckPointFiles();
+ inputManager.cleanCheckPointFiles();
}
- // logfeeder is stopped then break the loop
if (isLogfeederCompleted) {
break;
}
@@ -536,84 +464,20 @@ public class LogFeeder {
}
private void logStats() {
- inputMgr.logStats();
- outMgr.logStats();
-
- if (metricsMgr.isMetricsEnabled()) {
- List<MetricCount> metricsList = new ArrayList<MetricCount>();
- inputMgr.addMetricsContainers(metricsList);
- outMgr.addMetricsContainers(metricsList);
- metricsMgr.useMetrics(metricsList);
- }
- }
-
- private String readFile(BufferedReader br) throws Exception {
- try {
- StringBuilder sb = new StringBuilder();
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- }
- return sb.toString();
- } catch (Exception t) {
- logger.error("Error loading properties file.", t);
- throw t;
- }
- }
-
- public Collection<Output> getOutputList() {
- return outputList;
- }
-
- public OutputMgr getOutMgr() {
- return outMgr;
- }
-
- public static void main(String[] args) {
- LogFeeder logFeeder = new LogFeeder(args);
- logFeeder.run();
- }
-
- public void run() {
- try {
- Date startTime = new Date();
- this.init();
- Date endTime = new Date();
- logger.info("Took " + (endTime.getTime() - startTime.getTime())
- + " ms to initialize");
- this.monitor();
- //wait for all background thread before stop main thread
- this.waitOnAllDaemonThreads();
- } catch (Throwable t) {
- logger.fatal("Caught exception in main.", t);
- System.exit(1);
+ inputManager.logStats();
+ outputManager.logStats();
+
+ if (metricsManager.isMetricsEnabled()) {
+ List<MetricData> metricsList = new ArrayList<MetricData>();
+ inputManager.addMetricsContainers(metricsList);
+ outputManager.addMetricsContainers(metricsList);
+ metricsManager.useMetrics(metricsList);
}
}
- private class JVMShutdownHook extends Thread {
-
- public void run() {
- try {
- logger.info("Processing is shutting down.");
-
- inputMgr.close();
- outMgr.close();
- inputMgr.checkInAll();
-
- logStats();
-
- logger.info("LogSearch is exiting.");
- } catch (Throwable t) {
- // Ignore
- }
- }
- }
-
private void waitOnAllDaemonThreads() {
- String foreground = LogFeederUtil.getStringProperty("foreground");
- if (foreground != null && foreground.equalsIgnoreCase("true")) {
- inputMgr.waitOnAllInputs();
+ if ("true".equals(LogFeederUtil.getStringProperty("foreground"))) {
+ inputManager.waitOnAllInputs();
isLogfeederCompleted = true;
if (statLoggerThread != null) {
try {
@@ -624,24 +488,16 @@ public class LogFeeder {
}
}
}
-
- private String[] getConfigFromCmdLine() {
- String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
- if (inputConfigDir != null && !inputConfigDir.isEmpty()) {
- String[] searchFileWithExtensions = new String[] { "json" };
- File configDirFile = new File(inputConfigDir);
- List<File> configFiles = FileUtil.getAllFileFromDir(configDirFile,
- searchFileWithExtensions, false);
- if (configFiles != null && configFiles.size() > 0) {
- String configPaths[] = new String[configFiles.size()];
- for (int index = 0; index < configFiles.size(); index++) {
- File configFile = configFiles.get(index);
- String configFilePath = configFile.getAbsolutePath();
- configPaths[index] = configFilePath;
- }
- return configPaths;
- }
+
+ public static void main(String[] args) {
+ try {
+ LogFeederUtil.loadProperties("logfeeder.properties", args);
+ } catch (Throwable t) {
+ LOG.warn("Could not load logfeeder properites");
+ System.exit(1);
}
- return new String[0];
+
+ LogFeeder logFeeder = new LogFeeder();
+ logFeeder.run();
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
index 287982f..47ddc51 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
@@ -23,27 +23,27 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
public abstract class ConfigBlock {
- static private Logger logger = Logger.getLogger(ConfigBlock.class);
+ private static final Logger LOG = Logger.getLogger(ConfigBlock.class);
private boolean drain = false;
protected Map<String, Object> configs;
protected Map<String, String> contextFields = new HashMap<String, String>();
- public MetricCount statMetric = new MetricCount();
-
- /**
- *
- */
+ public MetricData statMetric = new MetricData(getStatMetricName(), false);
+ protected String getStatMetricName() {
+ return null;
+ }
+
public ConfigBlock() {
- super();
}
/**
@@ -58,10 +58,7 @@ public abstract class ConfigBlock {
return this.getClass().getSimpleName();
}
- /**
- * @param metricsList
- */
- public void addMetricsContainers(List<MetricCount> metricsList) {
+ public void addMetricsContainers(List<MetricData> metricsList) {
metricsList.add(statMetric);
}
@@ -89,25 +86,21 @@ public abstract class ConfigBlock {
boolean isEnabled = getBooleanValue("is_enabled", true);
if (isEnabled) {
// Let's check for static conditions
- Map<String, Object> conditions = (Map<String, Object>) configs
- .get("conditions");
+ Map<String, Object> conditions = (Map<String, Object>) configs.get("conditions");
boolean allow = true;
- if (conditions != null && conditions.size() > 0) {
+ if (MapUtils.isNotEmpty(conditions)) {
allow = false;
for (String conditionType : conditions.keySet()) {
if (conditionType.equalsIgnoreCase("fields")) {
- Map<String, Object> fields = (Map<String, Object>) conditions
- .get("fields");
+ Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
for (String fieldName : fields.keySet()) {
Object values = fields.get(fieldName);
if (values instanceof String) {
- allow = isFieldConditionMatch(fieldName,
- (String) values);
+ allow = isFieldConditionMatch(fieldName, (String) values);
} else {
List<String> listValues = (List<String>) values;
for (String stringValue : listValues) {
- allow = isFieldConditionMatch(fieldName,
- stringValue);
+ allow = isFieldConditionMatch(fieldName, stringValue);
if (allow) {
break;
}
@@ -135,8 +128,7 @@ public abstract class ConfigBlock {
allow = true;
} else {
@SuppressWarnings("unchecked")
- Map<String, Object> addFields = (Map<String, Object>) configs
- .get("add_fields");
+ Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields");
if (addFields != null && addFields.get(fieldName) != null) {
String addFieldValue = (String) addFields.get(fieldName);
if (stringValue.equalsIgnoreCase(addFieldValue)) {
@@ -184,12 +176,7 @@ public abstract class ConfigBlock {
String strValue = getStringValue(key);
boolean retValue = defaultValue;
if (!StringUtils.isEmpty(strValue)) {
- if (strValue.equalsIgnoreCase("true")
- || strValue.equalsIgnoreCase("yes")) {
- retValue = true;
- } else {
- retValue = false;
- }
+ retValue = (strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("yes"));
}
return retValue;
}
@@ -201,8 +188,7 @@ public abstract class ConfigBlock {
try {
retValue = Integer.parseInt(strValue);
} catch (Throwable t) {
- logger.error("Error parsing integer value. key=" + key
- + ", value=" + strValue);
+ LOG.error("Error parsing integer value. key=" + key + ", value=" + strValue);
}
}
return retValue;
@@ -215,8 +201,7 @@ public abstract class ConfigBlock {
try {
retValue = Long.parseLong(strValue);
} catch (Throwable t) {
- logger.error("Error parsing long value. key=" + key + ", value="
- + strValue);
+ LOG.error("Error parsing long value. key=" + key + ", value=" + strValue);
}
}
return retValue;
@@ -227,29 +212,27 @@ public abstract class ConfigBlock {
}
public void incrementStat(int count) {
- statMetric.count += count;
+ statMetric.value += count;
}
- public void logStatForMetric(MetricCount metric, String prefixStr) {
- LogFeederUtil.logStatForMetric(metric, prefixStr, ", key="
- + getShortDescription());
+ public void logStatForMetric(MetricData metric, String prefixStr) {
+ LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
}
- synchronized public void logStat() {
+ public synchronized void logStat() {
logStatForMetric(statMetric, "Stat");
}
public boolean logConfgs(Priority level) {
- if (level.toInt() == Priority.INFO_INT && !logger.isInfoEnabled()) {
+ if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
return false;
}
- if (level.toInt() == Priority.DEBUG_INT && !logger.isDebugEnabled()) {
+ if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
return false;
}
- logger.log(level, "Printing configuration Block="
- + getShortDescription());
- logger.log(level, "configs=" + configs);
- logger.log(level, "contextFields=" + contextFields);
+ LOG.log(level, "Printing configuration Block=" + getShortDescription());
+ LOG.log(level, "configs=" + configs);
+ LOG.log(level, "contextFields=" + contextFields);
return true;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
new file mode 100644
index 0000000..d1e7fba
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.common;
+
+public class LogFeederConstants {
+
+ public static final String ALL = "all";
+ public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
+ public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN";
+
+ // solr fields
+ public static final String SOLR_LEVEL = "level";
+ public static final String SOLR_COMPONENT = "type";
+ public static final String SOLR_HOST = "host";
+
+ // UserConfig Constants History
+ public static final String VALUES = "jsons";
+ public static final String ROW_TYPE = "rowtype";
+
+ // S3 Constants
+ public static final String S3_PATH_START_WITH = "s3://";
+ public static final String S3_PATH_SEPARATOR = "/";
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index ab371f1..684f3c4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -29,21 +29,19 @@ import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.AliasUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
+import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
public abstract class Filter extends ConfigBlock {
- private static final Logger logger = Logger.getLogger(Filter.class);
+ private static final Logger LOG = Logger.getLogger(Filter.class);
protected Input input;
private Filter nextFilter = null;
- private OutputMgr outputMgr;
+ private OutputManager outputManager;
private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
@@ -74,15 +72,12 @@ public abstract class Filter extends ConfigBlock {
}
for (Map<String, Object> mapObject : mapList) {
for (String mapClassCode : mapObject.keySet()) {
- Mapper mapper = getMapper(mapClassCode);
+ Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER);
if (mapper == null) {
break;
}
- if (mapper.init(getInput().getShortDescription(),
- fieldName, mapClassCode,
- mapObject.get(mapClassCode))) {
- List<Mapper> fieldMapList = postFieldValueMappers
- .get(fieldName);
+ if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapObject.get(mapClassCode))) {
+ List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName);
if (fieldMapList == null) {
fieldMapList = new ArrayList<Mapper>();
postFieldValueMappers.put(fieldName, fieldMapList);
@@ -94,17 +89,8 @@ public abstract class Filter extends ConfigBlock {
}
}
- private Mapper getMapper(String mapClassCode) {
- String classFullName = AliasUtil.getInstance().readAlias(mapClassCode, ALIAS_TYPE.MAPPER, ALIAS_PARAM.KLASS);
- if (classFullName != null && !classFullName.isEmpty()) {
- Mapper mapper = (Mapper) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.MAPPER);
- return mapper;
- }
- return null;
- }
-
- public void setOutputMgr(OutputMgr outputMgr) {
- this.outputMgr = outputMgr;
+ public void setOutputManager(OutputManager outputManager) {
+ this.outputManager = outputManager;
}
public Filter getNextFilter() {
@@ -131,25 +117,23 @@ public abstract class Filter extends ConfigBlock {
if (nextFilter != null) {
nextFilter.apply(inputStr, inputMarker);
} else {
- outputMgr.write(inputStr, inputMarker);
+ outputManager.write(inputStr, inputMarker);
}
}
public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException {
- if (postFieldValueMappers.size() > 0) {
- for (String fieldName : postFieldValueMappers.keySet()) {
- Object value = jsonObj.get(fieldName);
- if (value != null) {
- for (Mapper mapper : postFieldValueMappers.get(fieldName)) {
- value = mapper.apply(jsonObj, value);
- }
+ for (String fieldName : postFieldValueMappers.keySet()) {
+ Object value = jsonObj.get(fieldName);
+ if (value != null) {
+ for (Mapper mapper : postFieldValueMappers.get(fieldName)) {
+ value = mapper.apply(jsonObj, value);
}
}
}
if (nextFilter != null) {
nextFilter.apply(jsonObj, inputMarker);
} else {
- outputMgr.write(jsonObj, inputMarker);
+ outputManager.write(jsonObj, inputMarker);
}
}
@@ -193,16 +177,15 @@ public abstract class Filter extends ConfigBlock {
if (!super.logConfgs(level)) {
return false;
}
- logger.log(level, "input=" + input.getShortDescription());
+ LOG.log(level, "input=" + input.getShortDescription());
return true;
}
@Override
- public void addMetricsContainers(List<MetricCount> metricsList) {
+ public void addMetricsContainers(List<MetricData> metricsList) {
super.addMetricsContainers(metricsList);
if (nextFilter != null) {
nextFilter.addMetricsContainers(metricsList);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 372c208..7e2da70 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -36,7 +36,7 @@ import oi.thekraken.grok.api.exception.GrokException;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
@@ -45,7 +45,7 @@ import org.apache.log4j.Logger;
import com.google.gson.reflect.TypeToken;
public class FilterGrok extends Filter {
- static private Logger logger = Logger.getLogger(FilterGrok.class);
+ private static final Logger LOG = Logger.getLogger(FilterGrok.class);
private static final String GROK_PATTERN_FILE = "grok-patterns";
@@ -68,25 +68,23 @@ public class FilterGrok extends Filter {
private Type jsonType = new TypeToken<Map<String, String>>() {}.getType();
- private MetricCount grokErrorMetric = new MetricCount();
+ private MetricData grokErrorMetric = new MetricData("filter.error.grok", false);
@Override
public void init() throws Exception {
super.init();
try {
- grokErrorMetric.metricsName = "filter.error.grok";
messagePattern = escapePattern(getStringValue("message_pattern"));
multilinePattern = escapePattern(getStringValue("multiline_pattern"));
sourceField = getStringValue("source_field");
removeSourceField = getBooleanValue("remove_source_field",
removeSourceField);
- logger.info("init() done. grokPattern=" + messagePattern
- + ", multilinePattern=" + multilinePattern + ", "
- + getShortDescription());
+ LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
+ getShortDescription());
if (StringUtils.isEmpty(messagePattern)) {
- logger.error("message_pattern is not set for filter.");
+ LOG.error("message_pattern is not set for filter.");
return;
}
extractNamedParams(messagePattern, namedParamList);
@@ -102,9 +100,7 @@ public class FilterGrok extends Filter {
grokMultiline.compile(multilinePattern);
}
} catch (Throwable t) {
- logger.fatal(
- "Caught exception while initializing Grok. multilinePattern="
- + multilinePattern + ", messagePattern="
+ LOG.fatal("Caught exception while initializing Grok. multilinePattern=" + multilinePattern + ", messagePattern="
+ messagePattern, t);
grokMessage = null;
grokMultiline = null;
@@ -123,9 +119,10 @@ public class FilterGrok extends Filter {
}
private void extractNamedParams(String patternStr, Set<String> paramList) {
- String grokRegEx = "%\\{" + "(?<name>" + "(?<pattern>[A-z0-9]+)"
- + "(?::(?<subname>[A-z0-9_:]+))?" + ")" + "(?:=(?<definition>"
- + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" + "\\}";
+ String grokRegEx = "%\\{" +
+ "(?<name>" + "(?<pattern>[A-z0-9]+)" + "(?::(?<subname>[A-z0-9_:]+))?" + ")" +
+ "(?:=(?<definition>" + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" +
+ "\\}";
Pattern pattern = Pattern.compile(grokRegEx);
java.util.regex.Matcher matcher = pattern.matcher(patternStr);
@@ -139,28 +136,23 @@ public class FilterGrok extends Filter {
private boolean loadPatterns(Grok grok) {
InputStreamReader grokPatternsReader = null;
- logger.info("Loading pattern file " + GROK_PATTERN_FILE);
+ LOG.info("Loading pattern file " + GROK_PATTERN_FILE);
try {
- BufferedInputStream fileInputStream = (BufferedInputStream) this
- .getClass().getClassLoader()
- .getResourceAsStream(GROK_PATTERN_FILE);
+ BufferedInputStream fileInputStream =
+ (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(GROK_PATTERN_FILE);
if (fileInputStream == null) {
- logger.fatal("Couldn't load grok-patterns file "
- + GROK_PATTERN_FILE + ". Things will not work");
+ LOG.fatal("Couldn't load grok-patterns file " + GROK_PATTERN_FILE + ". Things will not work");
return false;
}
grokPatternsReader = new InputStreamReader(fileInputStream);
} catch (Throwable t) {
- logger.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE
- + " from classpath. Grok filtering will not work.", t);
+ LOG.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE + " from classpath. Grok filtering will not work.", t);
return false;
}
try {
grok.addPatternFromReader(grokPatternsReader);
} catch (GrokException e) {
- logger.fatal(
- "Error loading patterns from grok-patterns reader for file "
- + GROK_PATTERN_FILE, e);
+ LOG.fatal("Error loading patterns from grok-patterns reader for file " + GROK_PATTERN_FILE, e);
return false;
}
@@ -177,8 +169,7 @@ public class FilterGrok extends Filter {
String jsonStr = grokMultiline.capture(inputStr);
if (!"{}".equals(jsonStr)) {
if (strBuff != null) {
- Map<String, Object> jsonObj = Collections
- .synchronizedMap(new HashMap<String, Object>());
+ Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
try {
applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
} finally {
@@ -192,15 +183,13 @@ public class FilterGrok extends Filter {
if (strBuff == null) {
strBuff = new StringBuilder();
} else {
- strBuff.append('\r');
- strBuff.append('\n');
+ strBuff.append("\r\n");
}
strBuff.append(inputStr);
savedInputMarker = inputMarker;
} else {
savedInputMarker = inputMarker;
- Map<String, Object> jsonObj = Collections
- .synchronizedMap(new HashMap<String, Object>());
+ Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
applyMessage(inputStr, jsonObj, null);
}
}
@@ -216,14 +205,8 @@ public class FilterGrok extends Filter {
}
}
- /**
- * @param inputStr
- * @param jsonObj
- * @throws LogfeederException
- */
- private void applyMessage(String inputStr, Map<String, Object> jsonObj,
- String multilineJsonStr) throws LogfeederException {
- String jsonStr = grokParse(inputStr);
+ private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogfeederException {
+ String jsonStr = grokMessage.capture(inputStr);
boolean parseError = false;
if ("{}".equals(jsonStr)) {
@@ -239,8 +222,7 @@ public class FilterGrok extends Filter {
if (parseError) {
jsonStr = multilineJsonStr;
}
- Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr,
- jsonType);
+ Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr, jsonType);
for (String namedParam : namedParamList) {
if (jsonSrc.get(namedParam) != null) {
jsonObj.put(namedParam, jsonSrc.get(namedParam));
@@ -260,37 +242,26 @@ public class FilterGrok extends Filter {
}
}
super.apply(jsonObj, savedInputMarker);
- statMetric.count++;
- }
-
- public String grokParse(String inputStr) {
- String jsonStr = grokMessage.capture(inputStr);
- return jsonStr;
+ statMetric.value++;
}
private void logParseError(String inputStr) {
- grokErrorMetric.count++;
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_PARSEERROR";
+ grokErrorMetric.value++;
+ String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
int inputStrLength = inputStr != null ? inputStr.length() : 0;
- LogFeederUtil.logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Error parsing string. length=" + inputStrLength
- + ", input=" + input.getShortDescription()
- + ". First upto 100 characters="
- + LogFeederUtil.subString(inputStr, 100), null, logger,
- Level.WARN);
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStrLength + ", input=" +
+ input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
+ Level.WARN);
}
@Override
public void flush() {
if (strBuff != null) {
- Map<String, Object> jsonObj = Collections
- .synchronizedMap(new HashMap<String, Object>());
+ Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
try {
applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
} catch (LogfeederException e) {
- logger.error(e.getLocalizedMessage(), e.getCause());
+ LOG.error(e.getLocalizedMessage(), e.getCause());
}
strBuff = null;
savedInputMarker = null;
@@ -304,7 +275,7 @@ public class FilterGrok extends Filter {
}
@Override
- public void addMetricsContainers(List<MetricCount> metricsList) {
+ public void addMetricsContainers(List<MetricData> metricsList) {
super.addMetricsContainers(metricsList);
metricsList.add(grokErrorMetric);
}
@@ -314,5 +285,4 @@ public class FilterGrok extends Filter {
super.logStat();
logStatForMetric(grokErrorMetric, "Stat: Grok Errors");
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index 2954106..ba63c61 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -22,12 +22,13 @@ import java.util.Map;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
public class FilterJSON extends Filter {
- private static final Logger logger = Logger.getLogger(FilterJSON.class);
+ private static final Logger LOG = Logger.getLogger(FilterJSON.class);
@Override
public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
@@ -35,7 +36,7 @@ public class FilterJSON extends Filter {
try {
jsonMap = LogFeederUtil.toJSONObject(inputStr);
} catch (Exception e) {
- logger.error(e.getLocalizedMessage());
+ LOG.error(e.getLocalizedMessage());
throw new LogfeederException("Json parsing failed for inputstr = " + inputStr ,e.getCause());
}
Double lineNumberD = (Double) jsonMap.get("line_number");
@@ -45,10 +46,9 @@ public class FilterJSON extends Filter {
}
String timeStampStr = (String) jsonMap.get("logtime");
if (timeStampStr != null && !timeStampStr.isEmpty()) {
- String logtime = LogFeederUtil.getDate(timeStampStr);
+ String logtime = DateUtil.getDate(timeStampStr);
jsonMap.put("logtime", logtime);
}
super.apply(jsonMap, inputMarker);
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index 7adb468..c9c3f2c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -25,38 +25,35 @@ import java.util.StringTokenizer;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class FilterKeyValue extends Filter {
- private static final Logger logger = Logger.getLogger(FilterKeyValue.class);
+ private static final Logger LOG = Logger.getLogger(FilterKeyValue.class);
private String sourceField = null;
private String valueSplit = "=";
private String fieldSplit = "\t";
- private MetricCount errorMetric = new MetricCount();
+ private MetricData errorMetric = new MetricData("filter.error.keyvalue", false);
@Override
public void init() throws Exception {
super.init();
- errorMetric.metricsName = "filter.error.keyvalue";
sourceField = getStringValue("source_field");
valueSplit = getStringValue("value_split", valueSplit);
fieldSplit = getStringValue("field_split", fieldSplit);
- logger.info("init() done. source_field=" + sourceField
- + ", value_split=" + valueSplit + ", " + ", field_split="
- + fieldSplit + ", " + getShortDescription());
+ LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" +
+ fieldSplit + ", " + getShortDescription());
if (StringUtils.isEmpty(sourceField)) {
- logger.fatal("source_field is not set for filter. This filter will not be applied");
+ LOG.fatal("source_field is not set for filter. This filter will not be applied");
return;
}
-
}
@Override
@@ -71,40 +68,30 @@ public class FilterKeyValue extends Filter {
}
Object valueObj = jsonObj.get(sourceField);
if (valueObj != null) {
- StringTokenizer fieldTokenizer = new StringTokenizer(
- valueObj.toString(), fieldSplit);
+ StringTokenizer fieldTokenizer = new StringTokenizer(valueObj.toString(), fieldSplit);
while (fieldTokenizer.hasMoreTokens()) {
String nv = fieldTokenizer.nextToken();
- StringTokenizer nvTokenizer = new StringTokenizer(nv,
- valueSplit);
+ StringTokenizer nvTokenizer = new StringTokenizer(nv, valueSplit);
while (nvTokenizer.hasMoreTokens()) {
String name = nvTokenizer.nextToken();
if (nvTokenizer.hasMoreTokens()) {
String value = nvTokenizer.nextToken();
jsonObj.put(name, value);
} else {
- logParseError("name=" + name + ", pair=" + nv
- + ", field=" + sourceField + ", field_value="
- + valueObj);
+ logParseError("name=" + name + ", pair=" + nv + ", field=" + sourceField + ", field_value=" + valueObj);
}
}
}
}
super.apply(jsonObj, inputMarker);
- statMetric.count++;
+ statMetric.value++;
}
private void logParseError(String inputStr) {
- errorMetric.count++;
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_PARSEERROR";
- LogFeederUtil
- .logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Error parsing string. length=" + inputStr.length()
- + ", input=" + input.getShortDescription()
- + ". First upto 100 characters="
- + LogFeederUtil.subString(inputStr, 100), null, logger,
+ errorMetric.value++;
+ String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStr.length() + ", input=" +
+ input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
Level.ERROR);
}
@@ -114,9 +101,8 @@ public class FilterKeyValue extends Filter {
}
@Override
- public void addMetricsContainers(List<MetricCount> metricsList) {
+ public void addMetricsContainers(List<MetricData> metricsList) {
super.addMetricsContainers(metricsList);
metricsList.add(errorMetric);
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8399960d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
new file mode 100644
index 0000000..41a1fa5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public abstract class AbstractInputFile extends Input {
+ protected static final Logger LOG = Logger.getLogger(AbstractInputFile.class);
+
+ private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
+
+ protected File[] logFiles;
+ protected String logPath;
+ protected Object fileKey;
+ protected String base64FileKey;
+
+ protected boolean isReady;
+ private boolean isStartFromBegining = true;
+
+ private String checkPointExtension;
+ private File checkPointFile;
+ private RandomAccessFile checkPointWriter;
+ private long lastCheckPointTimeMS;
+ private int checkPointIntervalMS;
+ private Map<String, Object> jsonCheckPoint;
+ private InputMarker lastCheckPointInputMarker;
+
+ @Override
+ protected String getStatMetricName() {
+ return "input.files.read_lines";
+ }
+
+ @Override
+ protected String getReadBytesMetricName() {
+ return "input.files.read_bytes";
+ }
+
+ @Override
+ public void init() throws Exception {
+ LOG.info("init() called");
+
+ checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", InputManager.DEFAULT_CHECKPOINT_EXTENSION);
+
+ // Let's close the file and set it to true after we start monitoring it
+ setClosed(true);
+ logPath = getStringValue("path");
+ tail = getBooleanValue("tail", tail);
+ checkPointIntervalMS = getIntValue("checkpoint.interval.ms", DEFAULT_CHECKPOINT_INTERVAL_MS);
+
+ if (StringUtils.isEmpty(logPath)) {
+ LOG.error("path is empty for file input. " + getShortDescription());
+ return;
+ }
+
+ String startPosition = getStringValue("start_position");
+ if (StringUtils.isEmpty(startPosition) || startPosition.equalsIgnoreCase("beginning") ||
+ startPosition.equalsIgnoreCase("begining") || !tail) {
+ isStartFromBegining = true;
+ }
+
+ setFilePath(logPath);
+ boolean isFileReady = isReady();
+
+ LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
+
+ super.init();
+ }
+
+ protected void processFile(File logPathFile) throws FileNotFoundException, IOException {
+ LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile);
+ BufferedReader br = null;
+ checkPointFile = null;
+ checkPointWriter = null;
+ jsonCheckPoint = null;
+
+ int lineCount = 0;
+ try {
+ setFilePath(logPathFile.getAbsolutePath());
+
+ br = openLogFile(logPathFile);
+
+ boolean resume = isStartFromBegining;
+ int resumeFromLineNumber = getResumeFromLineNumber();
+ if (resumeFromLineNumber > 0) {
+ resume = false;
+ }
+
+ setClosed(false);
+ int sleepStep = 2;
+ int sleepIteration = 0;
+ while (true) {
+ try {
+ if (isDrain()) {
+ break;
+ }
+
+ String line = br.readLine();
+ if (line == null) {
+ if (!resume) {
+ resume = true;
+ }
+ sleepIteration++;
+ if (sleepIteration == 2) {
+ flush();
+ if (!tail) {
+ LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
+ break;
+ }
+ } else if (sleepIteration > 4) {
+ Object newFileKey = getFileKey(logPathFile);
+ if (newFileKey != null && (fileKey == null || !newFileKey.equals(fileKey))) {
+ LOG.info("File key is different. Marking this input file for rollover. oldKey=" + fileKey + ", newKey=" +
+ newFileKey + ". " + getShortDescription());
+
+ try {
+ LOG.info("File is rolled over. Closing current open file." + getShortDescription() + ", lineCount=" +
+ lineCount);
+ br.close();
+ } catch (Exception ex) {
+ LOG.error("Error closing file" + getShortDescription(), ex);
+ break;
+ }
+
+ try {
+ LOG.info("Opening new rolled over file." + getShortDescription());
+ br = openLogFile(logPathFile);
+ lineCount = 0;
+ } catch (Exception ex) {
+ LOG.error("Error opening rolled over file. " + getShortDescription(), ex);
+ LOG.info("Added input to not ready list." + getShortDescription());
+ isReady = false;
+ inputManager.addToNotReady(this);
+ break;
+ }
+ LOG.info("File is successfully rolled over. " + getShortDescription());
+ continue;
+ }
+ }
+ try {
+ Thread.sleep(sleepStep * 1000);
+ sleepStep = Math.min(sleepStep * 2, 10);
+ } catch (InterruptedException e) {
+ LOG.info("Thread interrupted." + getShortDescription());
+ }
+ } else {
+ lineCount++;
+ sleepStep = 1;
+ sleepIteration = 0;
+
+ if (!resume && lineCount > resumeFromLineNumber) {
+ LOG.info("Resuming to read from last line. lineCount=" + lineCount + ", input=" + getShortDescription());
+ resume = true;
+ }
+ if (resume) {
+ InputMarker marker = new InputMarker(this, base64FileKey, lineCount);
+ outputLine(line, marker);
+ }
+ }
+ } catch (Throwable t) {
+ String logMessageKey = this.getClass().getSimpleName() + "_READ_LOOP_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in read loop. lineNumber=" + lineCount +
+ ", input=" + getShortDescription(), t, LOG, Level.ERROR);
+ }
+ }
+ } finally {
+ if (br != null) {
+ LOG.info("Closing reader." + getShortDescription() + ", lineCount=" + lineCount);
+ try {
+ br.close();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ protected abstract BufferedReader openLogFile(File logFile) throws IOException;
+
+ protected abstract Object getFileKey(File logFile);
+
+ private int getResumeFromLineNumber() {
+ int resumeFromLineNumber = 0;
+
+ if (tail) {
+ try {
+ LOG.info("Checking existing checkpoint file. " + getShortDescription());
+
+ String checkPointFileName = base64FileKey + checkPointExtension;
+ File checkPointFolder = inputManager.getCheckPointFolderFile();
+ checkPointFile = new File(checkPointFolder, checkPointFileName);
+ checkPointWriter = new RandomAccessFile(checkPointFile, "rw");
+
+ try {
+ int contentSize = checkPointWriter.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointWriter.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
+ readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
+ } else {
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+ resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+
+ LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
+ ", resumeFromLineNumber=" + resumeFromLineNumber);
+ }
+ } catch (EOFException eofEx) {
+ LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
+ getShortDescription());
+ }
+ if (jsonCheckPoint == null) {
+ // This seems to be first time, so creating the initial checkPoint object
+ jsonCheckPoint = new HashMap<String, Object>();
+ jsonCheckPoint.put("file_path", filePath);
+ jsonCheckPoint.put("file_key", base64FileKey);
+ }
+
+ } catch (Throwable t) {
+ LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
+ }
+ }
+
+ return resumeFromLineNumber;
+ }
+
+ @Override
+ public synchronized void checkIn(InputMarker inputMarker) {
+ if (checkPointWriter != null) {
+ try {
+ int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+ if (lineNumber > inputMarker.lineNumber) {
+ // Already wrote higher line number for this input
+ return;
+ }
+ // If interval is greater than last checkPoint time, then write
+ long currMS = System.currentTimeMillis();
+ if (!isClosed() && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+ // Let's save this one so we can update the check point file on flush
+ lastCheckPointInputMarker = inputMarker;
+ return;
+ }
+ lastCheckPointTimeMS = currMS;
+
+ jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
+ jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+ jsonCheckPoint.put("last_write_time_date", new Date());
+
+ String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+ // Let's rewind
+ checkPointWriter.seek(0);
+ checkPointWriter.writeInt(jsonStr.length());
+ checkPointWriter.write(jsonStr.getBytes());
+
+ if (isClosed()) {
+ String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() +
+ ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
+ }
+ } catch (Throwable t) {
+ String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t,
+ LOG, Level.ERROR);
+ }
+ }
+ }
+
+ @Override
+ public void lastCheckIn() {
+ if (lastCheckPointInputMarker != null) {
+ checkIn(lastCheckPointInputMarker);
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ LOG.info("close() calling checkPoint checkIn(). " + getShortDescription());
+ lastCheckIn();
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "input:source=" + getStringValue("source") + ", path=" +
+ (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
+ }
+}