You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/07/07 11:17:55 UTC
[21/31] ambari git commit: AMBARI-21399 Create property descriptions
for internal Log Feeder configs (mgergely)
AMBARI-21399 Create property descriptions for internal Log Feeder configs (mgergely)
Change-Id: I51bf4322184da06084c1b2af35fedd6ee19ab36e
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5c9bdbfe
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5c9bdbfe
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5c9bdbfe
Branch: refs/heads/branch-feature-AMBARI-20859
Commit: 5c9bdbfeff7b5b78adf4d840fbd93c504f129830
Parents: 8d9fd45
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Thu Jul 6 11:08:48 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Thu Jul 6 11:08:48 2017 +0200
----------------------------------------------------------------------
.../logsearch/config/api/LogSearchConfig.java | 7 +-
.../config/api/LogSearchConfigFactory.java | 8 +-
.../config/api/LogSearchConfigClass1.java | 6 +-
.../config/api/LogSearchConfigClass2.java | 6 +-
.../config/api/LogSearchConfigFactoryTest.java | 6 +-
.../config/zookeeper/LogSearchConfigZK.java | 25 ++-----
.../org/apache/ambari/logfeeder/LogFeeder.java | 34 ++-------
.../ambari/logfeeder/common/ConfigHandler.java | 52 ++++++-------
.../logfeeder/common/LogFeederException.java | 31 ++++++++
.../logfeeder/common/LogfeederException.java | 31 --------
.../apache/ambari/logfeeder/filter/Filter.java | 6 +-
.../ambari/logfeeder/filter/FilterGrok.java | 10 +--
.../ambari/logfeeder/filter/FilterJSON.java | 6 +-
.../ambari/logfeeder/filter/FilterKeyValue.java | 6 +-
.../logfeeder/input/AbstractInputFile.java | 3 +-
.../apache/ambari/logfeeder/input/Input.java | 72 +++++++++++++++---
.../logfeeder/input/InputConfigUploader.java | 18 ++++-
.../ambari/logfeeder/input/InputManager.java | 45 +++++++-----
.../ambari/logfeeder/input/InputSimulate.java | 74 +++++++++++++++++--
.../loglevelfilter/LogLevelFilterHandler.java | 28 ++++++-
.../logfeeder/metrics/LogFeederAMSClient.java | 43 ++++++++++-
.../logfeeder/metrics/MetricsManager.java | 25 +------
.../ambari/logfeeder/output/OutputHDFSFile.java | 10 +--
.../ambari/logfeeder/output/OutputS3File.java | 2 +-
.../ambari/logfeeder/output/OutputSolr.java | 28 ++++++-
.../apache/ambari/logfeeder/util/FileUtil.java | 15 ----
.../logfeeder/util/LogFeederHDFSUtil.java | 77 ++++++++++++++++++++
.../ambari/logfeeder/util/LogFeederUtil.java | 41 +++++++++--
.../logfeeder/util/LogfeederHDFSUtil.java | 77 --------------------
.../apache/ambari/logfeeder/util/SSLUtil.java | 9 +++
.../ambari/logfeeder/filter/FilterJSONTest.java | 8 +-
.../logfeeder/input/InputManagerTest.java | 26 -------
.../logfeeder/metrics/MetricsManagerTest.java | 2 -
.../src/test/resources/logfeeder.properties | 1 -
.../configurer/LogSearchConfigConfigurer.java | 2 +-
35 files changed, 500 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
index ad1f5d4..6c5cefd 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -43,9 +43,10 @@ public interface LogSearchConfig extends Closeable {
*
* @param component The component which will use the configuration.
* @param properties The properties of that component.
+ * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode).
* @throws Exception
*/
- void init(Component component, Map<String, String> properties) throws Exception;
+ void init(Component component, Map<String, String> properties, String clusterName) throws Exception;
/**
* Returns all the service names with input configurations of a cluster. Will be used only in SERVER mode.
@@ -134,7 +135,9 @@ public interface LogSearchConfig extends Closeable {
*
* @param inputConfigMonitor The input config monitor to call in case of an input config change.
* @param logLevelFilterMonitor The log level filter monitor to call in case of a log level filter change.
+ * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode).
* @throws Exception
*/
- void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor) throws Exception;
+ void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor,
+ String clusterName) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
index 947e7e7..77b48eb 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
@@ -37,12 +37,13 @@ public class LogSearchConfigFactory {
* @param component The component of the Log Search Service to create the configuration for (SERVER/LOGFEEDER).
* @param properties The properties of the component for which the configuration is created. If the properties contain the
* "logsearch.config.class" entry than the class defined there would be used instead of the default class.
+ * @param clusterName The name of the cluster, only need to be specified in LOGFEEDER mode (null for SERVER mode).
* @param defaultClass The default configuration class to use if not specified otherwise.
* @return The Log Search Configuration instance.
* @throws Exception Throws exception if the defined class does not implement LogSearchConfig, or doesn't have an empty
* constructor, or throws an exception in it's init method.
*/
- public static LogSearchConfig createLogSearchConfig(Component component, Map<String, String> properties,
+ public static LogSearchConfig createLogSearchConfig(Component component, Map<String, String> properties, String clusterName,
Class<? extends LogSearchConfig> defaultClass) throws Exception {
try {
LogSearchConfig logSearchConfig = null;
@@ -52,13 +53,14 @@ public class LogSearchConfigFactory {
if (LogSearchConfig.class.isAssignableFrom(clazz)) {
logSearchConfig = (LogSearchConfig) clazz.newInstance();
} else {
- throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " + LogSearchConfig.class.getName());
+ throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " +
+ LogSearchConfig.class.getName());
}
} else {
logSearchConfig = defaultClass.newInstance();
}
- logSearchConfig.init(component, properties);
+ logSearchConfig.init(component, properties, clusterName);
return logSearchConfig;
} catch (Exception e) {
LOG.error("Could not initialize logsearch config.", e);
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
index 7309382..28844d5 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -30,7 +30,7 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
public class LogSearchConfigClass1 implements LogSearchConfig {
@Override
- public void init(Component component, Map<String, String> properties) {}
+ public void init(Component component, Map<String, String> properties, String clusterName) {}
@Override
public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
@@ -44,8 +44,8 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
@Override
- public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor)
- throws Exception {}
+ public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor,
+ String clusterName) throws Exception {}
@Override
public List<String> getServices(String clusterName) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
index f83eeef..5934fa6 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -30,7 +30,7 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
public class LogSearchConfigClass2 implements LogSearchConfig {
@Override
- public void init(Component component, Map<String, String> properties) {}
+ public void init(Component component, Map<String, String> properties, String clusterName) {}
@Override
public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
@@ -44,8 +44,8 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
@Override
- public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor)
- throws Exception {}
+ public void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor,
+ String clusterName) throws Exception {}
@Override
public List<String> getServices(String clusterName) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
index 425694f..f990c5c 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
@@ -33,7 +33,7 @@ public class LogSearchConfigFactoryTest {
@Test
public void testDefaultConfig() throws Exception {
LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
- Collections.<String, String> emptyMap(), LogSearchConfigClass1.class);
+ Collections.<String, String> emptyMap(), null, LogSearchConfigClass1.class);
Assert.assertSame(config.getClass(), LogSearchConfigClass1.class);
}
@@ -43,7 +43,7 @@ public class LogSearchConfigFactoryTest {
Map<String, String> logsearchConfClassMap = new HashMap<>();
logsearchConfClassMap.put("logsearch.config.class", "org.apache.ambari.logsearch.config.api.LogSearchConfigClass2");
LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
- logsearchConfClassMap, LogSearchConfigClass1.class);
+ logsearchConfClassMap, null, LogSearchConfigClass1.class);
Assert.assertSame(config.getClass(), LogSearchConfigClass2.class);
}
@@ -53,6 +53,6 @@ public class LogSearchConfigFactoryTest {
Map<String, String> logsearchConfClassMap = new HashMap<>();
logsearchConfClassMap.put("logsearch.config.class", "org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass");
LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
- logsearchConfClassMap, LogSearchConfigClass1.class);
+ logsearchConfClassMap, null, LogSearchConfigClass1.class);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index 827101c..6d36203 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -72,14 +72,6 @@ public class LogSearchConfigZK implements LogSearchConfig {
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
@LogSearchPropertyDescription(
- name = "cluster.name",
- description = "Cluster name for Log Feeder. (added into zk path of the shipper configs)",
- examples = {"cl1"},
- sources = {"logfeeder.properties"}
- )
- private static final String CLUSTER_NAME_PROPERTY = "cluster.name";
-
- @LogSearchPropertyDescription(
name = "logsearch.config.zk_connect_string",
description = "ZooKeeper connection string.",
examples = {"localhost1:2181,localhost2:2181/znode"},
@@ -111,7 +103,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
private Gson gson;
@Override
- public void init(Component component, Map<String, String> properties) throws Exception {
+ public void init(Component component, Map<String, String> properties, String clusterName) throws Exception {
this.properties = properties;
LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY));
@@ -136,8 +128,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
}
-
- cache = new TreeCache(client, String.format("%s/%s", root, properties.get(CLUSTER_NAME_PROPERTY)));
+ cache = new TreeCache(client, String.format("%s/%s", root, clusterName));
}
gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
@@ -169,7 +160,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
@Override
public void monitorInputConfigChanges(final InputConfigMonitor inputConfigMonitor,
- final LogLevelFilterMonitor logLevelFilterMonitor) throws Exception {
+ final LogLevelFilterMonitor logLevelFilterMonitor, final String clusterName) throws Exception {
final JsonParser parser = new JsonParser();
final JsonArray globalConfigNode = new JsonArray();
for (String globalConfigJsonString : inputConfigMonitor.getGlobalConfigJsons()) {
@@ -177,6 +168,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
globalConfigNode.add(globalConfigJson.getAsJsonObject().get("global"));
}
+ createGlobalConfigNode(globalConfigNode, clusterName);
+
TreeCacheListener listener = new TreeCacheListener() {
private final Set<Type> nodeEvents = ImmutableSet.of(Type.NODE_ADDED, Type.NODE_UPDATED, Type.NODE_REMOVED);
@@ -189,7 +182,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
String nodeData = new String(event.getData().getData());
Type eventType = event.getType();
- String configPathStab = String.format("%s/%s/", root, properties.get(CLUSTER_NAME_PROPERTY));
+ String configPathStab = String.format("%s/%s/", root, clusterName);
if (event.getData().getPath().startsWith(configPathStab + "input/")) {
handleInputConfigChange(eventType, nodeName, nodeData);
@@ -271,12 +264,10 @@ public class LogSearchConfigZK implements LogSearchConfig {
};
cache.getListenable().addListener(listener);
cache.start();
-
- createGlobalConfigNode(globalConfigNode);
}
- private void createGlobalConfigNode(JsonArray globalConfigNode) {
- String globalConfigNodePath = String.format("%s/%s/global", root, properties.get(CLUSTER_NAME_PROPERTY));
+ private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) {
+ String globalConfigNodePath = String.format("%s/%s/global", root, clusterName);
String data = InputConfigGson.gson.toJson(globalConfigNode);
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index e7b6edc..59c2a22 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -33,7 +33,6 @@ import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK;
import org.apache.commons.io.FileUtils;
import org.apache.ambari.logfeeder.input.InputConfigUploader;
-import org.apache.ambari.logfeeder.input.InputManager;
import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.metrics.MetricsManager;
@@ -57,11 +56,9 @@ public class LogFeeder {
private ConfigHandler configHandler = new ConfigHandler();
private LogSearchConfig config;
- private InputManager inputManager = new InputManager();
private MetricsManager metricsManager = new MetricsManager();
private long lastCheckPointCleanedMS = 0;
- private boolean isLogfeederCompleted = false;
private Thread statLoggerThread = null;
private LogFeeder(LogFeederCommandLine cli) {
@@ -72,7 +69,6 @@ public class LogFeeder {
try {
init();
monitor();
- waitOnAllDaemonThreads();
} catch (Throwable t) {
LOG.fatal("Caught exception in main.", t);
System.exit(1);
@@ -85,11 +81,11 @@ public class LogFeeder {
configHandler.init();
SSLUtil.ensureStorePasswords();
- config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,
- Maps.fromProperties(LogFeederUtil.getProperties()), LogSearchConfigZK.class);
+ config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, Maps.fromProperties(LogFeederUtil.getProperties()),
+ LogFeederUtil.getClusterName(), LogSearchConfigZK.class);
LogLevelFilterHandler.init(config);
InputConfigUploader.load(config);
- config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler());
+ config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederUtil.getClusterName());
metricsManager.init();
@@ -117,8 +113,8 @@ public class LogFeeder {
}
private void monitor() throws Exception {
- JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
- ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
+ JVMShutdownHook logFeederJVMHook = new JVMShutdownHook();
+ ShutdownHookManager.get().addShutdownHook(logFeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
statLoggerThread = new Thread("statLogger") {
@@ -140,10 +136,6 @@ public class LogFeeder {
lastCheckPointCleanedMS = System.currentTimeMillis();
configHandler.cleanCheckPointFiles();
}
-
- if (isLogfeederCompleted) {
- break;
- }
}
}
@@ -163,20 +155,6 @@ public class LogFeeder {
}
}
- private void waitOnAllDaemonThreads() {
- if ("true".equals(LogFeederUtil.getStringProperty("foreground"))) {
- inputManager.waitOnAllInputs();
- isLogfeederCompleted = true;
- if (statLoggerThread != null) {
- try {
- statLoggerThread.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
public void test() {
try {
LogManager.shutdown();
@@ -203,7 +181,7 @@ public class LogFeeder {
if (cli.isMonitor()) {
try {
- LogFeederUtil.loadProperties("logfeeder.properties");
+ LogFeederUtil.loadProperties();
} catch (Throwable t) {
LOG.warn("Could not load logfeeder properites");
System.exit(1);
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 25669d9..5bf074c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -41,15 +41,14 @@ import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.AliasUtil;
-import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
@@ -62,9 +61,30 @@ import org.apache.log4j.Logger;
import com.google.gson.reflect.TypeToken;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
public class ConfigHandler implements InputConfigMonitor {
private static final Logger LOG = Logger.getLogger(ConfigHandler.class);
+ @LogSearchPropertyDescription(
+ name = "logfeeder.config.files",
+ description = "Comma separated list of the config files containing global / output configurations.",
+ examples = {"global.json,output.json", "/etc/ambari-logsearch-logfeeder/conf/global.json"},
+ defaultValue = "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CONFIG_FILES_PROPERTY = "logfeeder.config.files";
+
+ private static final int DEFAULT_SIMULATE_INPUT_NUMBER = 0;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.simulate.input_number",
+ description = "The number of the simulator instances to run with. O means no simulation.",
+ examples = {"10"},
+ defaultValue = DEFAULT_SIMULATE_INPUT_NUMBER + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number";
+
private final OutputManager outputManager = new OutputManager();
private final InputManager inputManager = new InputManager();
@@ -108,24 +128,10 @@ public class ConfigHandler implements InputConfigMonitor {
private List<String> getConfigFiles() {
List<String> configFiles = new ArrayList<>();
- String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
- LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
- if (logfeederConfigFilesProperty != null) {
- configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
- }
-
- String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
- if (StringUtils.isNotEmpty(inputConfigDir)) {
- File configDirFile = new File(inputConfigDir);
- List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
- for (File inputConfigFile : inputConfigFiles) {
- configFiles.add(inputConfigFile.getAbsolutePath());
- }
- }
-
- if (CollectionUtils.isEmpty(configFiles)) {
- String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
- configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
+ String logFeederConfigFilesProperty = LogFeederUtil.getStringProperty(CONFIG_FILES_PROPERTY);
+ LOG.info("logfeeder.config.files=" + logFeederConfigFilesProperty);
+ if (logFeederConfigFilesProperty != null) {
+ configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(",")));
}
return configFiles;
@@ -225,7 +231,7 @@ public class ConfigHandler implements InputConfigMonitor {
}
private void simulateIfNeeded() throws Exception {
- int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
+ int simulatedInputNumber = LogFeederUtil.getIntProperty(SIMULATE_INPUT_NUMBER_PROPERTY, DEFAULT_SIMULATE_INPUT_NUMBER);
if (simulatedInputNumber == 0)
return;
@@ -434,10 +440,6 @@ public class ConfigHandler implements InputConfigMonitor {
outputManager.addMetricsContainers(metricsList);
}
- public void waitOnAllInputs() {
- inputManager.waitOnAllInputs();
- }
-
public void close() {
inputManager.close();
outputManager.close();
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java
new file mode 100644
index 0000000..3653475
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.common;
+
+public class LogFeederException extends Exception {
+
+ public LogFeederException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+ public LogFeederException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java
deleted file mode 100644
index 8a07602..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogfeederException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.common;
-
-public class LogfeederException extends Exception {
-
- public LogfeederException(String message, Throwable throwable) {
- super(message, throwable);
- }
-
- public LogfeederException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index fd02497..8e8834b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.ambari.logfeeder.common.ConfigItem;
-import org.apache.ambari.logfeeder.common.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogFeederException;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.mapper.Mapper;
@@ -116,7 +116,7 @@ public abstract class Filter extends ConfigItem {
/**
* Deriving classes should implement this at the minimum
*/
- public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
// TODO: There is no transformation for string types.
if (nextFilter != null) {
nextFilter.apply(inputStr, inputMarker);
@@ -125,7 +125,7 @@ public abstract class Filter extends ConfigItem {
}
}
- public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException {
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
for (String fieldName : postFieldValueMappers.keySet()) {
Object value = jsonObj.get(fieldName);
if (value != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 50247e2..fc7a565 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -34,7 +34,7 @@ import java.util.regex.Pattern;
import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.exception.GrokException;
-import org.apache.ambari.logfeeder.common.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogFeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -160,7 +160,7 @@ public class FilterGrok extends Filter {
}
@Override
- public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
if (grokMessage == null) {
return;
}
@@ -195,7 +195,7 @@ public class FilterGrok extends Filter {
}
@Override
- public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException {
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
if (sourceField != null) {
savedInputMarker = inputMarker;
applyMessage((String) jsonObj.get(sourceField), jsonObj, null);
@@ -205,7 +205,7 @@ public class FilterGrok extends Filter {
}
}
- private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogfeederException {
+ private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogFeederException {
String jsonStr = grokMessage.capture(inputStr);
boolean parseError = false;
@@ -260,7 +260,7 @@ public class FilterGrok extends Filter {
Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
try {
applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
- } catch (LogfeederException e) {
+ } catch (LogFeederException e) {
LOG.error(e.getLocalizedMessage(), e.getCause());
}
strBuff = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index cfccdeb..1a2da0c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.filter;
import java.util.Map;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.common.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogFeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -29,13 +29,13 @@ import org.apache.ambari.logfeeder.util.LogFeederUtil;
public class FilterJSON extends Filter {
@Override
- public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
Map<String, Object> jsonMap = null;
try {
jsonMap = LogFeederUtil.toJSONObject(inputStr);
} catch (Exception e) {
LOG.error(e.getLocalizedMessage());
- throw new LogfeederException("Json parsing failed for inputstr = " + inputStr ,e.getCause());
+ throw new LogFeederException("Json parsing failed for inputstr = " + inputStr ,e.getCause());
}
Double lineNumberD = (Double) jsonMap.get("line_number");
if (lineNumberD != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index f2a4186..670b1c3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-import org.apache.ambari.logfeeder.common.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogFeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -58,12 +58,12 @@ public class FilterKeyValue extends Filter {
}
@Override
- public void apply(String inputStr, InputMarker inputMarker) throws LogfeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
apply(LogFeederUtil.toJSONObject(inputStr), inputMarker);
}
@Override
- public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogfeederException {
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
if (sourceField == null) {
return;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index 2359256..9535260 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -68,7 +68,8 @@ public abstract class AbstractInputFile extends Input {
public void init() throws Exception {
LOG.info("init() called");
- checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", InputManager.DEFAULT_CHECKPOINT_EXTENSION);
+ checkPointExtension = LogFeederUtil.getStringProperty(InputManager.CHECKPOINT_EXTENSION_PROPERTY,
+ InputManager.DEFAULT_CHECKPOINT_EXTENSION);
// Let's close the file and set it to true after we start monitoring it
setClosed(true);
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 49151e7..8050263 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -26,12 +26,13 @@ import java.util.Map;
import org.apache.ambari.logfeeder.input.cache.LRUCache;
import org.apache.ambari.logfeeder.common.ConfigItem;
-import org.apache.ambari.logfeeder.common.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogFeederException;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
@@ -39,15 +40,62 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
import org.apache.commons.lang.BooleanUtils;
import org.apache.log4j.Priority;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
public abstract class Input extends ConfigItem implements Runnable {
- private static final boolean DEFAULT_TAIL = true;
- private static final boolean DEFAULT_USE_EVENT_MD5 = false;
- private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
private static final boolean DEFAULT_CACHE_ENABLED = false;
- private static final boolean DEFAULT_CACHE_DEDUP_LAST = false;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.cache.enabled",
+ description = "Enables the usage of a cache to avoid duplications.",
+ examples = {"true"},
+ defaultValue = DEFAULT_CACHE_ENABLED + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CACHE_ENABLED_PROPERTY = "logfeeder.cache.enabled";
+
+ private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
+ @LogSearchPropertyDescription(
+ name = "logfeeder.cache.key.field",
+ description = "The field which's value should be cached and should be checked for repteitions.",
+ examples = {"some_field_prone_to_repeating_value"},
+ defaultValue = DEFAULT_CACHE_KEY_FIELD,
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CACHE_KEY_FIELD_PROPERTY = "logfeeder.cache.key.field";
+
private static final int DEFAULT_CACHE_SIZE = 100;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.cache.size",
+ description = "The number of log entries to cache in order to avoid duplications.",
+ examples = {"50"},
+ defaultValue = DEFAULT_CACHE_SIZE + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CACHE_SIZE_PROPERTY = "logfeeder.cache.size";
+
+ private static final boolean DEFAULT_CACHE_LAST_DEDUP_ENABLED = false;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.cache.last.dedup.enabled",
+ description = "Enable filtering directly repeating log entries irrelevant of the time spent between them.",
+ examples = {"true"},
+ defaultValue = DEFAULT_CACHE_LAST_DEDUP_ENABLED + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CACHE_LAST_DEDUP_ENABLED_PROPERTY = "logfeeder.cache.last.dedup.enabled";
+
private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
- private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
+ @LogSearchPropertyDescription(
+ name = "logfeeder.cache.dedup.interval",
+ description = "Maximum number of milliseconds between two identical messages to be filtered out.",
+ examples = {"500"},
+ defaultValue = DEFAULT_CACHE_DEDUP_INTERVAL + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CACHE_DEDUP_INTERVAL_PROPERTY = "logfeeder.cache.dedup.interval";
+
+ private static final boolean DEFAULT_TAIL = true;
+ private static final boolean DEFAULT_USE_EVENT_MD5 = false;
+ private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
protected InputDescriptor inputDescriptor;
@@ -183,7 +231,7 @@ public abstract class Input extends ConfigItem implements Runnable {
if (firstFilter != null) {
try {
firstFilter.apply(line, marker);
- } catch (LogfeederException e) {
+ } catch (LogFeederException e) {
LOG.error(e.getLocalizedMessage(), e);
}
} else {
@@ -246,25 +294,25 @@ public abstract class Input extends ConfigItem implements Runnable {
private void initCache() {
boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null
? inputDescriptor.isCacheEnabled()
- : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
+ : LogFeederUtil.getBooleanProperty(CACHE_ENABLED_PROPERTY, DEFAULT_CACHE_ENABLED);
if (cacheEnabled) {
String cacheKeyField = inputDescriptor.getCacheKeyField() != null
? inputDescriptor.getCacheKeyField()
- : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
+ : LogFeederUtil.getStringProperty(CACHE_KEY_FIELD_PROPERTY, DEFAULT_CACHE_KEY_FIELD);
setCacheKeyField(cacheKeyField);
boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null
? inputDescriptor.getCacheLastDedupEnabled()
- : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
+ : LogFeederUtil.getBooleanProperty(CACHE_LAST_DEDUP_ENABLED_PROPERTY, DEFAULT_CACHE_LAST_DEDUP_ENABLED);
int cacheSize = inputDescriptor.getCacheSize() != null
? inputDescriptor.getCacheSize()
- : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
+ : LogFeederUtil.getIntProperty(CACHE_SIZE_PROPERTY, DEFAULT_CACHE_SIZE);
long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null
? inputDescriptor.getCacheDedupInterval()
- : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
+ : Long.parseLong(LogFeederUtil.getStringProperty(CACHE_DEDUP_INTERVAL_PROPERTY, String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index 8aec690..09fc3f5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -28,14 +28,25 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
import com.google.common.io.Files;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
public class InputConfigUploader extends Thread {
protected static final Logger LOG = Logger.getLogger(InputConfigUploader.class);
+ @LogSearchPropertyDescription(
+ name = "logfeeder.config.dir",
+ description = "The directory where shipper configuration files are looked for.",
+ examples = {"/etc/ambari-logsearch-logfeeder/conf"},
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CONFIG_DIR_PROPERTY = "logfeeder.config.dir";
+
private static final long SLEEP_BETWEEN_CHECK = 2000;
private final File configDir;
@@ -48,7 +59,6 @@ public class InputConfigUploader extends Thread {
private final Set<String> filesHandled = new HashSet<>();
private final Pattern serviceNamePattern = Pattern.compile("input.config-(.+).json");
private final LogSearchConfig config;
- private final String clusterName = LogFeederUtil.getStringProperty("cluster.name");
public static void load(LogSearchConfig config) {
new InputConfigUploader(config).start();
@@ -58,7 +68,7 @@ public class InputConfigUploader extends Thread {
super("Input Config Loader");
setDaemon(true);
- this.configDir = new File(LogFeederUtil.getStringProperty("logfeeder.config.dir"));
+ this.configDir = new File(LogFeederUtil.getStringProperty(CONFIG_DIR_PROPERTY));
this.config = config;
}
@@ -74,8 +84,8 @@ public class InputConfigUploader extends Thread {
String serviceName = m.group(1);
String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset());
- if (!config.inputConfigExists(clusterName, serviceName)) {
- config.createInputConfig(clusterName, serviceName, inputConfig);
+ if (!config.inputConfigExists(LogFeederUtil.getClusterName(), serviceName)) {
+ config.createInputConfig(LogFeederUtil.getClusterName(), serviceName, inputConfig);
}
filesHandled.add(inputConfigFile.getAbsolutePath());
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
index 01a11ec..091015a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
@@ -36,16 +36,36 @@ import java.util.UUID;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.solr.common.util.Base64;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
public class InputManager {
private static final Logger LOG = Logger.getLogger(InputManager.class);
- private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints";
public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp";
+ @LogSearchPropertyDescription(
+ name = "logfeeder.checkpoint.extension",
+ description = "The extension used for checkpoint files.",
+ examples = {"ckp"},
+ defaultValue = DEFAULT_CHECKPOINT_EXTENSION,
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension";
+
+ @LogSearchPropertyDescription(
+ name = "logfeeder.checkpoint.folder",
+ description = "The folder wher checkpoint files are stored.",
+ examples = {"/etc/ambari-logsearch-logfeeder/conf/checkpoints"},
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String CHECKPOINT_FOLDER_PROPERTY = "logfeeder.checkpoint.folder";
+
+ private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints";
private Map<String, List<Input>> inputs = new HashMap<>();
private Set<Input> notReadyList = new HashSet<Input>();
@@ -118,32 +138,21 @@ public class InputManager {
}
private void initCheckPointSettings() {
- checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", DEFAULT_CHECKPOINT_EXTENSION);
+ checkPointExtension = LogFeederUtil.getStringProperty(CHECKPOINT_EXTENSION_PROPERTY, DEFAULT_CHECKPOINT_EXTENSION);
LOG.info("Determining valid checkpoint folder");
boolean isCheckPointFolderValid = false;
// We need to keep track of the files we are reading.
- String checkPointFolder = LogFeederUtil.getStringProperty("logfeeder.checkpoint.folder");
+ String checkPointFolder = LogFeederUtil.getStringProperty(CHECKPOINT_FOLDER_PROPERTY);
if (!StringUtils.isEmpty(checkPointFolder)) {
checkPointFolderFile = new File(checkPointFolder);
isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
}
- if (!isCheckPointFolderValid) {
- // Let's try home folder
- String userHome = LogFeederUtil.getStringProperty("user.home");
- if (userHome != null) {
- checkPointFolderFile = new File(userHome, CHECKPOINT_SUBFOLDER_NAME);
- LOG.info("Checking if home folder can be used for checkpoints. Folder=" + checkPointFolderFile);
- isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
- }
- }
+
if (!isCheckPointFolderValid) {
// Let's use tmp folder
- String tmpFolder = LogFeederUtil.getStringProperty("java.io.tmpdir");
- if (tmpFolder == null) {
- tmpFolder = "/tmp";
- }
+ String tmpFolder = LogFeederUtil.getLogFeederTempDir();
checkPointFolderFile = new File(tmpFolder, CHECKPOINT_SUBFOLDER_NAME);
- LOG.info("Checking if tmps folder can be used for checkpoints. Folder=" + checkPointFolderFile);
+ LOG.info("Checking if tmp folder can be used for checkpoints. Folder=" + checkPointFolderFile);
isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
if (isCheckPointFolderValid) {
LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check points. This is not recommended." +
@@ -153,6 +162,8 @@ public class InputManager {
if (isCheckPointFolderValid) {
LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints");
+ } else {
+ throw new IllegalStateException("Could not determine the checkpoint folder.");
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 5e7bdb3..f1002ae 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -34,6 +34,7 @@ import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.filter.FilterJSON;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
@@ -42,9 +43,70 @@ import org.apache.solr.common.util.Base64;
import com.google.common.base.Joiner;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
public class InputSimulate extends Input {
private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}";
+ private static final String DEFAULT_LOG_LEVEL = "WARN";
+ @LogSearchPropertyDescription(
+ name = "logfeeder.simulate.log_level",
+ description = "The log level to create the simulated log entries with.",
+ examples = {"INFO"},
+ defaultValue = DEFAULT_LOG_LEVEL,
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String LOG_LEVEL_PROPERTY = "logfeeder.simulate.log_level";
+
+ private static final int DEFAULT_NUMBER_OF_WORDS = 1000;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.simulate.number_of_words",
+ description = "The size of the set of words that may be used to create the simulated log entries with.",
+ examples = {"100"},
+ defaultValue = DEFAULT_NUMBER_OF_WORDS + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String NUMBER_OF_WORDS_PROPERTY = "logfeeder.simulate.number_of_words";
+
+ private static final int DEFAULT_MIN_LOG_WORDS = 5;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.simulate.min_log_words",
+ description = "The minimum number of words in a simulated log entry.",
+ examples = {"3"},
+ defaultValue = DEFAULT_MIN_LOG_WORDS + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String MIN_LOG_WORDS_PROPERTY = "logfeeder.simulate.min_log_words";
+
+ private static final int DEFAULT_MAX_LOG_WORDS = 5;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.simulate.max_log_words",
+ description = "The maximum number of words in a simulated log entry.",
+ examples = {"8"},
+ defaultValue = DEFAULT_MAX_LOG_WORDS + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String MAX_LOG_WORDS_PROPERTY = "logfeeder.simulate.max_log_words";
+
+ private static final int DEFAULT_SLEEP_MILLISECONDS = 10000;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.simulate.sleep_milliseconds",
+ description = "The milliseconds to sleep between creating two simulated log entries.",
+ examples = {"5000"},
+ defaultValue = DEFAULT_SLEEP_MILLISECONDS + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String SLEEP_MILLISECONDS_PROPERTY = "logfeeder.simulate.sleep_milliseconds";
+
+ @LogSearchPropertyDescription(
+ name = "logfeeder.simulate.log_ids",
+ description = "The comma separated list of log ids for which to create the simulated log entries.",
+ examples = {"ambari_server,zookeeper,infra_solr,logsearch_app"},
+ defaultValue = "The log ids of the installed services in the cluster",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String LOG_IDS_PROPERTY = "logfeeder.simulate.log_ids";
+
private static final Map<String, String> typeToFilePath = new HashMap<>();
private static final List<String> inputTypes = new ArrayList<>();
public static void loadTypeToFilePath(List<InputDescriptor> inputList) {
@@ -75,11 +137,11 @@ public class InputSimulate extends Input {
public InputSimulate() throws Exception {
this.types = getSimulatedLogTypes();
- this.level = LogFeederUtil.getStringProperty("logfeeder.simulate.log_level", "WARN");
- this.numberOfWords = LogFeederUtil.getIntProperty("logfeeder.simulate.number_of_words", 1000, 50, 1000000);
- this.minLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.min_log_words", 5, 1, 10);
- this.maxLogWords = LogFeederUtil.getIntProperty("logfeeder.simulate.max_log_words", 10, 10, 20);
- this.sleepMillis = LogFeederUtil.getIntProperty("logfeeder.simulate.sleep_milliseconds", 10000);
+ this.level = LogFeederUtil.getStringProperty(LOG_LEVEL_PROPERTY, DEFAULT_LOG_LEVEL);
+ this.numberOfWords = LogFeederUtil.getIntProperty(NUMBER_OF_WORDS_PROPERTY, DEFAULT_NUMBER_OF_WORDS, 50, 1000000);
+ this.minLogWords = LogFeederUtil.getIntProperty(MIN_LOG_WORDS_PROPERTY, DEFAULT_MIN_LOG_WORDS, 1, 10);
+ this.maxLogWords = LogFeederUtil.getIntProperty(MAX_LOG_WORDS_PROPERTY, DEFAULT_MAX_LOG_WORDS, 10, 20);
+ this.sleepMillis = LogFeederUtil.getIntProperty(SLEEP_MILLISECONDS_PROPERTY, DEFAULT_SLEEP_MILLISECONDS);
this.host = "#" + hostNumber.incrementAndGet() + "-" + LogFeederUtil.hostName;
Filter filter = new FilterJSON();
@@ -89,7 +151,7 @@ public class InputSimulate extends Input {
}
private List<String> getSimulatedLogTypes() {
- String logsToSimulate = LogFeederUtil.getStringProperty("logfeeder.simulate.log_ids");
+ String logsToSimulate = LogFeederUtil.getStringProperty(LOG_IDS_PROPERTY);
return (logsToSimulate == null) ?
inputTypes :
Arrays.asList(logsToSimulate.split(","));
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
index 8a4d953..79bf5ea 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
@@ -32,14 +32,35 @@ import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
public class LogLevelFilterHandler implements LogLevelFilterMonitor {
private static final Logger LOG = Logger.getLogger(LogLevelFilterHandler.class);
+ private static final boolean DEFAULT_LOG_FILTER_ENABLE = false;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.log.filter.enable",
+ description = "Enables the filtering of the log entries by log level filters.",
+ examples = {"true"},
+ defaultValue = DEFAULT_LOG_FILTER_ENABLE + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String LOG_FILTER_ENABLE_PROPERTY = "logfeeder.log.filter.enable";
+
+ @LogSearchPropertyDescription(
+ name = "logfeeder.include.default.level",
+ description = "Comma separtaed list of the default log levels to be enabled by the filtering.",
+ examples = {"FATAL,ERROR,WARN"},
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String INCLUDE_DEFAULT_LEVEL_PROPERTY = "logfeeder.include.default.level";
+
private static final String TIMEZONE = "GMT";
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
@@ -52,15 +73,14 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
};
private static LogSearchConfig config;
- private static String clusterName = LogFeederUtil.getStringProperty("cluster.name");
private static boolean filterEnabled;
private static List<String> defaultLogLevels;
private static Map<String, LogLevelFilter> filters = new HashMap<>();
public static void init(LogSearchConfig config_) {
config = config_;
- filterEnabled = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
- defaultLogLevels = Arrays.asList(LogFeederUtil.getStringProperty("logfeeder.include.default.level").split(","));
+ filterEnabled = LogFeederUtil.getBooleanProperty(LOG_FILTER_ENABLE_PROPERTY, DEFAULT_LOG_FILTER_ENABLE);
+ defaultLogLevels = Arrays.asList(LogFeederUtil.getStringProperty(INCLUDE_DEFAULT_LEVEL_PROPERTY).split(","));
TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
}
@@ -100,7 +120,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
defaultFilter.setDefaultLevels(defaultLogLevels);
try {
- config.createLogLevelFilter(clusterName, logId, defaultFilter);
+ config.createLogLevelFilter(LogFeederUtil.getClusterName(), logId, defaultFilter);
filters.put(logId, defaultFilter);
} catch (Exception e) {
LOG.warn("Could not persist the default filter for log " + logId, e);
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index 39526a5..fdad9a6 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.metrics;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.SSLUtil;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
@@ -28,6 +29,8 @@ import org.apache.log4j.Logger;
import com.google.common.base.Splitter;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
import java.util.Collection;
import java.util.List;
@@ -35,21 +38,53 @@ import java.util.List;
public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
private static final Logger LOG = Logger.getLogger(LogFeederAMSClient.class);
+ @LogSearchPropertyDescription(
+ name = "logfeeder.metrics.collector.hosts",
+ description = "Comma separtaed list of metric collector hosts.",
+ examples = {"c6401.ambari.apache.org"},
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String METRICS_COLLECTOR_HOSTS_PROPERTY = "logfeeder.metrics.collector.hosts";
+
+ @LogSearchPropertyDescription(
+ name = "logfeeder.metrics.collector.protocol",
+ description = "The protocol used by metric collectors.",
+ examples = {"http", "https"},
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String METRICS_COLLECTOR_PROTOCOL_PROPERTY = "logfeeder.metrics.collector.protocol";
+
+ @LogSearchPropertyDescription(
+ name = "logfeeder.metrics.collector.port",
+ description = "The port used by metric collectors.",
+ examples = {"6188"},
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String METRICS_COLLECTOR_PORT_PROPERTY = "logfeeder.metrics.collector.port";
+
+ @LogSearchPropertyDescription(
+ name = "logfeeder.metrics.collector.path",
+ description = "The path used by metric collectors.",
+ examples = {"/ws/v1/timeline/metrics"},
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String METRICS_COLLECTOR_PATH_PROPERTY = "logfeeder.metrics.collector.path";
+
private final List<String> collectorHosts;
private final String collectorProtocol;
private final String collectorPort;
private final String collectorPath;
public LogFeederAMSClient() {
- String collectorHostsString = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.hosts");
+ String collectorHostsString = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_HOSTS_PROPERTY);
if (!StringUtils.isBlank(collectorHostsString)) {
collectorHostsString = collectorHostsString.trim();
LOG.info("AMS collector Hosts=" + collectorHostsString);
collectorHosts = Splitter.on(",").splitToList(collectorHostsString);
- collectorProtocol = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.protocol");
- collectorPort = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.port");
- collectorPath = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.path");
+ collectorProtocol = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PROTOCOL_PROPERTY);
+ collectorPort = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PORT_PROPERTY);
+ collectorPath = LogFeederUtil.getStringProperty(METRICS_COLLECTOR_PATH_PROPERTY);
} else {
collectorHosts = null;
collectorProtocol = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
index 942c0b4..1094852 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
@@ -19,7 +19,6 @@
package org.apache.ambari.logfeeder.metrics;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -34,7 +33,6 @@ public class MetricsManager {
private static final Logger LOG = Logger.getLogger(MetricsManager.class);
private boolean isMetricsEnabled = false;
- private String nodeHostName = null;
private String appId = "logfeeder";
private long lastPublishTimeMS = 0; // Let's do the first publish immediately
@@ -50,8 +48,7 @@ public class MetricsManager {
amsClient = new LogFeederAMSClient();
if (amsClient.getCollectorUri(null) != null) {
- findNodeHostName();
- if (nodeHostName == null) {
+ if (LogFeederUtil.hostName == null) {
isMetricsEnabled = false;
LOG.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
} else {
@@ -63,24 +60,6 @@ public class MetricsManager {
}
}
- private void findNodeHostName() {
- nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
- if (nodeHostName == null) {
- try {
- nodeHostName = InetAddress.getLocalHost().getHostName();
- } catch (Throwable e) {
- LOG.warn("Error getting hostname using InetAddress.getLocalHost().getHostName()", e);
- }
- }
- if (nodeHostName == null) {
- try {
- nodeHostName = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (Throwable e) {
- LOG.warn("Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", e);
- }
- }
- }
-
public boolean isMetricsEnabled() {
return isMetricsEnabled;
}
@@ -117,7 +96,7 @@ public class MetricsManager {
LOG.debug("Creating new metric obbject for " + metric.metricsName);
timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(metric.metricsName);
- timelineMetric.setHostName(nodeHostName);
+ timelineMetric.setHostName(LogFeederUtil.hostName);
timelineMetric.setAppId(appId);
timelineMetric.setStartTime(currMS);
timelineMetric.setType("Long");
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index 8f4b0b1..2b47a00 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -25,7 +25,7 @@ import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
@@ -87,7 +87,7 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
HashMap<String, String> contextParam = buildContextParam();
hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
LOG.info("hdfs Output dir=" + hdfsOutDir);
- String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
+ String localFileDir = LogFeederUtil.getLogFeederTempDir() + "hdfs/service/";
logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
this.startHDFSCopyThread();
}
@@ -124,13 +124,13 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
Iterator<File> localFileIterator = localReadyFiles.iterator();
while (localFileIterator.hasNext()) {
File localFile = localFileIterator.next();
- fileSystem = LogfeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort);
+ fileSystem = LogFeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort);
if (fileSystem != null && localFile.exists()) {
String destFilePath = hdfsOutDir + "/" + localFile.getName();
String localPath = localFile.getAbsolutePath();
boolean overWrite = true;
boolean delSrc = true;
- boolean isCopied = LogfeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem,
+ boolean isCopied = LogFeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem,
overWrite, delSrc);
if (isCopied) {
LOG.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath);
@@ -179,7 +179,7 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
LOG.error(" Current thread : '" + Thread.currentThread().getName() +
"' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'");
}
- LogfeederHDFSUtil.closeFileSystem(fileSystem);
+ LogFeederHDFSUtil.closeFileSystem(fileSystem);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index 076d12d..9f41a15 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -205,7 +205,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
@VisibleForTesting
protected LogSpooler createSpooler(String filePath) {
- String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service";
+ String spoolDirectory = LogFeederUtil.getLogFeederTempDir() + "/s3/service";
LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
s3OutputConfiguration.getRolloverTimeThresholdSecs());
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index d37a3bb..162a7f8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -51,9 +52,32 @@ import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
+
public class OutputSolr extends Output {
+
+ private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab";
+ @LogSearchPropertyDescription(
+ name = "logfeeder.solr.jaas.file",
+ description = "The jaas file used for solr.",
+ examples = {"/etc/ambari-logsearch-logfeeder/conf/logfeeder_jaas.conf"},
+ defaultValue = DEFAULT_SOLR_JAAS_FILE,
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file";
+
private static final Logger LOG = Logger.getLogger(OutputSolr.class);
+ private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false;
+ @LogSearchPropertyDescription(
+ name = "logfeeder.solr.kerberos.enable",
+ description = "Enables using kerberos for accessing solr.",
+ examples = {"true"},
+ defaultValue = DEFAULT_SOLR_KERBEROS_ENABLE + "",
+ sources = {LOGFEEDER_PROPERTIES_FILE}
+ )
+ private static final String SOLR_KERBEROS_ENABLE_PROPERTY = "logfeeder.solr.kerberos.enable";
+
private static final int DEFAULT_MAX_BUFFER_SIZE = 5000;
private static final int DEFAULT_MAX_INTERVAL_MS = 3000;
private static final int DEFAULT_NUMBER_OF_SHARDS = 1;
@@ -127,8 +151,8 @@ public class OutputSolr extends Output {
private void setupSecurity() {
- String jaasFile = LogFeederUtil.getStringProperty("logfeeder.solr.jaas.file", "/etc/security/keytabs/logsearch_solr.service.keytab");
- boolean securityEnabled = LogFeederUtil.getBooleanProperty("logfeeder.solr.kerberos.enable", false);
+ String jaasFile = LogFeederUtil.getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE);
+ boolean securityEnabled = LogFeederUtil.getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE);
if (securityEnabled) {
System.setProperty("java.security.auth.login.config", jaasFile);
HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index 90d1df6..8ade992 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -27,11 +27,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
@@ -42,18 +39,6 @@ public class FileUtil {
private FileUtil() {
throw new UnsupportedOperationException();
}
-
- public static List<File> getAllFileFromDir(File directory, String extension, boolean checkInSubDir) {
- if (!directory.exists()) {
- LOG.error(directory.getAbsolutePath() + " is not exists ");
- } else if (!directory.isDirectory()) {
- LOG.error(directory.getAbsolutePath() + " is not Directory ");
- } else {
- return (List<File>) FileUtils.listFiles(directory, new String[]{extension}, checkInSubDir);
- }
- return new ArrayList<File>();
- }
-
public static Object getFileKey(File file) {
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5c9bdbfe/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
new file mode 100644
index 0000000..4248ae1
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class LogFeederHDFSUtil {
+ private static final Logger LOG = Logger.getLogger(LogFeederHDFSUtil.class);
+
+ private LogFeederHDFSUtil() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static boolean copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
+ boolean delSrc) {
+ Path src = new Path(sourceFilepath);
+ Path dst = new Path(destFilePath);
+ boolean isCopied = false;
+ try {
+ LOG.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
+ fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
+ isCopied = true;
+ } catch (Exception e) {
+ LOG.error("Error copying local file :" + sourceFilepath + " to hdfs location : " + destFilePath, e);
+ }
+ return isCopied;
+ }
+
+ public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
+ try {
+ Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort);
+ FileSystem fs = FileSystem.get(configuration);
+ return fs;
+ } catch (Exception e) {
+ LOG.error("Exception is buildFileSystem :", e);
+ }
+ return null;
+ }
+
+ private static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
+ String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
+ Configuration configuration = new Configuration();
+ configuration.set("fs.default.name", url);
+ return configuration;
+ }
+
+ public static void closeFileSystem(FileSystem fileSystem) {
+ if (fileSystem != null) {
+ try {
+ fileSystem.close();
+ } catch (IOException e) {
+ LOG.error(e.getLocalizedMessage(), e.getCause());
+ }
+ }
+ }
+}
\ No newline at end of file