You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2018/01/05 23:04:37 UTC
[41/50] [abbrv] ambari git commit: AMBARI-22639. Log Feeder refactor:
integrate with spring boot (oleewere)
AMBARI-22639. Log Feeder refactor: integrate with spring boot (oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4a668f0b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4a668f0b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4a668f0b
Branch: refs/heads/branch-feature-AMBARI-20859
Commit: 4a668f0b69426f7a2c4379e0930fea739a1b6c0b
Parents: e99867a
Author: Oliver Szabo <ol...@gmail.com>
Authored: Tue Dec 12 22:34:09 2017 +0100
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Fri Jan 5 17:54:16 2018 -0500
----------------------------------------------------------------------
.../config/api/LogSearchConfigFactory.java | 49 +-
.../config/api/LogSearchConfigLogFeeder.java | 3 +-
.../ambari-logsearch-logfeeder/.gitignore | 1 +
.../ambari-logsearch-logfeeder/pom.xml | 62 ++-
.../org/apache/ambari/logfeeder/LogFeeder.java | 167 +------
.../ambari/logfeeder/LogFeederCommandLine.java | 28 +-
.../ambari/logfeeder/common/ConfigHandler.java | 47 +-
.../ambari/logfeeder/common/ConfigItem.java | 8 +-
.../logfeeder/common/LogEntryParseTester.java | 3 +-
.../logfeeder/common/LogFeederConstants.java | 50 ++
.../logfeeder/conf/ApplicationConfig.java | 107 ++++
.../logfeeder/conf/InputSimulateConfig.java | 154 ++++++
.../logfeeder/conf/LogEntryCacheConfig.java | 118 +++++
.../ambari/logfeeder/conf/LogFeederProps.java | 226 +++++++++
.../logfeeder/conf/LogFeederSecurityConfig.java | 189 +++++++
.../logfeeder/conf/MetricsCollectorConfig.java | 113 +++++
.../apache/ambari/logfeeder/filter/Filter.java | 7 +-
.../ambari/logfeeder/filter/FilterGrok.java | 7 +-
.../ambari/logfeeder/filter/FilterKeyValue.java | 5 +-
.../logfeeder/input/AbstractInputFile.java | 11 +-
.../apache/ambari/logfeeder/input/Input.java | 25 +-
.../logfeeder/input/InputConfigUploader.java | 82 +--
.../ambari/logfeeder/input/InputManager.java | 28 +-
.../ambari/logfeeder/input/InputSimulate.java | 44 +-
.../logfeeder/loglevelfilter/FilterLogData.java | 73 ---
.../loglevelfilter/LogLevelFilterHandler.java | 89 +++-
.../logfeeder/metrics/LogFeederAMSClient.java | 22 +-
.../logfeeder/metrics/MetricsManager.java | 16 +-
.../ambari/logfeeder/metrics/StatsLogger.java | 83 ++++
.../ambari/logfeeder/output/OutputFile.java | 5 +-
.../ambari/logfeeder/output/OutputHDFSFile.java | 8 +-
.../ambari/logfeeder/output/OutputKafka.java | 5 +-
.../ambari/logfeeder/output/OutputManager.java | 36 +-
.../ambari/logfeeder/output/OutputS3File.java | 8 +-
.../ambari/logfeeder/output/OutputSolr.java | 10 +-
.../logfeeder/util/LogFeederPropertiesUtil.java | 498 -------------------
.../apache/ambari/logfeeder/util/SSLUtil.java | 134 -----
.../shipper-conf/input.config-sample.json | 2 +-
.../src/main/resources/log4j.xml | 15 +-
.../src/main/resources/logfeeder.properties | 18 +-
.../ambari/logfeeder/filter/FilterGrokTest.java | 3 +-
.../ambari/logfeeder/filter/FilterJSONTest.java | 3 +-
.../logfeeder/filter/FilterKeyValueTest.java | 3 +-
.../ambari/logfeeder/input/InputFileTest.java | 11 +-
.../logfeeder/input/InputManagerTest.java | 12 +-
.../logconfig/LogConfigHandlerTest.java | 148 ------
.../logfeeder/metrics/MetricsManagerTest.java | 7 -
.../logfeeder/output/OutputKafkaTest.java | 7 +-
.../logfeeder/output/OutputManagerTest.java | 32 +-
.../logfeeder/output/OutputS3FileTest.java | 9 +-
.../ambari/logfeeder/output/OutputSolrTest.java | 10 +-
51 files changed, 1542 insertions(+), 1259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
index a84a97b..c74fad3 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
@@ -37,12 +37,13 @@ public class LogSearchConfigFactory {
* @param properties The properties of the component for which the configuration is created. If the properties contain the
* "logsearch.config.class" entry than the class defined there would be used instead of the default class.
* @param defaultClass The default configuration class to use if not specified otherwise.
+ * @param init initialize the properties and zookeeper client
* @return The Log Search Configuration instance.
* @throws Exception Throws exception if the defined class does not implement LogSearchConfigServer, or doesn't have an empty
* constructor, or throws an exception in it's init method.
*/
public static LogSearchConfigServer createLogSearchConfigServer(Map<String, String> properties,
- Class<? extends LogSearchConfigServer> defaultClass) throws Exception {
+ Class<? extends LogSearchConfigServer> defaultClass, boolean init) throws Exception {
try {
LogSearchConfigServer logSearchConfig = null;
String configClassName = properties.get("logsearch.config.server.class");
@@ -57,8 +58,9 @@ public class LogSearchConfigFactory {
} else {
logSearchConfig = defaultClass.newInstance();
}
-
- logSearchConfig.init(properties);
+ if (init) {
+ logSearchConfig.init(properties);
+ }
return logSearchConfig;
} catch (Exception e) {
LOG.error("Could not initialize logsearch config.", e);
@@ -74,12 +76,13 @@ public class LogSearchConfigFactory {
* "logsearch.config.class" entry than the class defined there would be used instead of the default class.
* @param clusterName The name of the cluster.
* @param defaultClass The default configuration class to use if not specified otherwise.
+ * @param init initialize the properties and zookeeper client
* @return The Log Search Configuration instance.
* @throws Exception Throws exception if the defined class does not implement LogSearchConfigLogFeeder, or doesn't have an empty
* constructor, or throws an exception in it's init method.
*/
public static LogSearchConfigLogFeeder createLogSearchConfigLogFeeder(Map<String, String> properties, String clusterName,
- Class<? extends LogSearchConfigLogFeeder> defaultClass) throws Exception {
+ Class<? extends LogSearchConfigLogFeeder> defaultClass, boolean init) throws Exception {
try {
LogSearchConfigLogFeeder logSearchConfig = null;
String configClassName = properties.get("logsearch.config.logfeeder.class");
@@ -94,12 +97,46 @@ public class LogSearchConfigFactory {
} else {
logSearchConfig = defaultClass.newInstance();
}
-
- logSearchConfig.init(properties, clusterName);
+ if (init) {
+ logSearchConfig.init(properties, clusterName);
+ }
return logSearchConfig;
} catch (Exception e) {
LOG.error("Could not initialize logsearch config.", e);
throw e;
}
}
+
+ /**
+ * Creates a Log Search Configuration instance for the Log Search Server that implements
+ * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigServer}.
+ *
+ * @param properties The properties of the component for which the configuration is created. If the properties contain the
+ * "logsearch.config.class" entry than the class defined there would be used instead of the default class.
+ * @param defaultClass The default configuration class to use if not specified otherwise.
+ * @return The Log Search Configuration instance.
+ * @throws Exception Throws exception if the defined class does not implement LogSearchConfigServer, or doesn't have an empty
+ * constructor, or throws an exception in it's init method.
+ */
+ public static LogSearchConfigServer createLogSearchConfigServer(Map<String, String> properties,
+ Class<? extends LogSearchConfigServer> defaultClass) throws Exception {
+ return createLogSearchConfigServer(properties, defaultClass, true);
+ }
+
+ /**
+ * Creates a Log Search Configuration instance for the Log Search Server that implements
+ * {@link org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder}.
+ *
+ * @param properties The properties of the component for which the configuration is created. If the properties contain the
+ * "logsearch.config.class" entry than the class defined there would be used instead of the default class.
+ * @param clusterName The name of the cluster.
+ * @param defaultClass The default configuration class to use if not specified otherwise.
+ * @return The Log Search Configuration instance.
+ * @throws Exception Throws exception if the defined class does not implement LogSearchConfigLogFeeder, or doesn't have an empty
+ * constructor, or throws an exception in it's init method.
+ */
+ public static LogSearchConfigLogFeeder createLogSearchConfigLogFeeder(Map<String, String> properties, String clusterName,
+ Class<? extends LogSearchConfigLogFeeder> defaultClass) throws Exception {
+ return createLogSearchConfigLogFeeder(properties, clusterName, defaultClass, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
index 6ed36fd..1387515 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigLogFeeder.java
@@ -68,8 +68,7 @@ public interface LogSearchConfigLogFeeder extends LogSearchConfig {
/**
* Saves the properties of an Output Solr.
- *
- * @param type The type of the Output Solr.
+ *
* @param outputConfigMonitors The monitors which want to watch the output config changes.
* @throws Exception
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore b/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore
new file mode 100644
index 0000000..7b00482
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/.gitignore
@@ -0,0 +1 @@
+*.pid
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 01710bf..005af15 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -33,6 +33,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <spring.version>4.3.10.RELEASE</spring.version>
+ <spring-boot.version>1.5.6.RELEASE</spring-boot.version>
</properties>
<dependencies>
@@ -67,11 +69,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- </dependency>
- <dependency>
<groupId>io.thekraken</groupId>
<artifactId>grok</artifactId>
<version>0.1.4</version>
@@ -102,16 +99,6 @@
<version>18.0</version>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.20</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.20</version>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.2</version>
@@ -152,6 +139,10 @@
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
@@ -179,6 +170,32 @@
<artifactId>commons-io</artifactId>
<version>${common.io.version}</version>
</dependency>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ <version>${spring-boot.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-log4j</artifactId>
+ <version>1.3.8.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
</dependencies>
<build>
<finalName>LogFeeder</finalName>
@@ -225,6 +242,21 @@
</arguments>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${spring-boot.version}</version>
+ <configuration>
+ <classifier>exec</classifier>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
<!-- copy-dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 5114743..2d31e5a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -16,146 +16,41 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder;
+import com.google.gson.GsonBuilder;
+import org.apache.ambari.logfeeder.common.LogEntryParseTester;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.LogManager;
+import org.springframework.boot.Banner;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.boot.system.ApplicationPidFileWriter;
+
import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
-import org.apache.ambari.logfeeder.common.LogEntryParseTester;
-import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
-import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
-import org.apache.commons.io.FileUtils;
-import org.apache.ambari.logfeeder.input.InputConfigUploader;
-import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.metrics.MetricsManager;
-import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil;
-import org.apache.ambari.logfeeder.util.SSLUtil;
-import com.google.common.collect.Maps;
-import com.google.gson.GsonBuilder;
-
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
+@SpringBootApplication(
+ scanBasePackages = {"org.apache.ambari.logfeeder"}
+)
public class LogFeeder {
- private static final Logger LOG = Logger.getLogger(LogFeeder.class);
-
- private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
- private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours
-
- private final LogFeederCommandLine cli;
-
- private ConfigHandler configHandler;
- private LogSearchConfigLogFeeder config;
-
- private MetricsManager metricsManager = new MetricsManager();
-
- private long lastCheckPointCleanedMS = 0;
- private Thread statLoggerThread = null;
-
- private LogFeeder(LogFeederCommandLine cli) {
- this.cli = cli;
- }
-
- public void run() {
- try {
- init();
- monitor();
- } catch (Throwable t) {
- LOG.fatal("Caught exception in main.", t);
- System.exit(1);
- }
- }
-
- private void init() throws Throwable {
- long startTime = System.currentTimeMillis();
-
- SSLUtil.ensureStorePasswords();
-
- config = LogSearchConfigFactory.createLogSearchConfigLogFeeder(Maps.fromProperties(LogFeederPropertiesUtil.getProperties()),
- LogFeederPropertiesUtil.getClusterName(), LogSearchConfigLogFeederZK.class);
- configHandler = new ConfigHandler(config);
- configHandler.init();
- LogLevelFilterHandler.init(config);
- InputConfigUploader.load(config);
- config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederPropertiesUtil.getClusterName());
-
- metricsManager.init();
-
- LOG.debug("==============");
-
- long endTime = System.currentTimeMillis();
- LOG.info("Took " + (endTime - startTime) + " ms to initialize");
- }
-
- private class JVMShutdownHook extends Thread {
-
- public void run() {
- try {
- LOG.info("Processing is shutting down.");
-
- configHandler.close();
- config.close();
- logStats();
-
- LOG.info("LogSearch is exiting.");
- } catch (Throwable t) {
- // Ignore
- }
- }
- }
-
- private void monitor() throws Exception {
- JVMShutdownHook logFeederJVMHook = new JVMShutdownHook();
- ShutdownHookManager.get().addShutdownHook(logFeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
-
- statLoggerThread = new Thread("statLogger") {
-
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(30 * 1000);
- } catch (Throwable t) {
- // Ignore
- }
- try {
- logStats();
- } catch (Throwable t) {
- LOG.error("LogStats: Caught exception while logging stats.", t);
- }
-
- if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
- lastCheckPointCleanedMS = System.currentTimeMillis();
- configHandler.cleanCheckPointFiles();
- }
- }
- }
- };
- statLoggerThread.setDaemon(true);
- statLoggerThread.start();
-
- }
-
- private void logStats() {
- configHandler.logStats();
-
- if (metricsManager.isMetricsEnabled()) {
- List<MetricData> metricsList = new ArrayList<MetricData>();
- configHandler.addMetrics(metricsList);
- metricsManager.useMetrics(metricsList);
+ public static void main(String[] args) {
+ LogFeederCommandLine cli = new LogFeederCommandLine(args);
+ if (cli.isTest()) {
+ test(cli);
}
+ String pidFile = System.getenv("PID_FILE") == null ? "logfeeder.pid" : System.getenv("PID_FILE");
+ new SpringApplicationBuilder(LogFeeder.class)
+ .bannerMode(Banner.Mode.OFF)
+ .listeners(new ApplicationPidFileWriter(pidFile))
+ .run(args);
}
- public void test() {
+ private static void test(LogFeederCommandLine cli) {
try {
LogManager.shutdown();
String testLogEntry = cli.getTestLogEntry();
@@ -173,22 +68,4 @@ public class LogFeeder {
e.printStackTrace(System.out);
}
}
-
- public static void main(String[] args) {
- LogFeederCommandLine cli = new LogFeederCommandLine(args);
-
- LogFeeder logFeeder = new LogFeeder(cli);
-
- if (cli.isMonitor()) {
- try {
- LogFeederPropertiesUtil.loadProperties();
- } catch (Throwable t) {
- LOG.warn("Could not load logfeeder properites");
- System.exit(1);
- }
- logFeeder.run();
- } else if (cli.isTest()) {
- logFeeder.test();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
index d996f98..61e7a1e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederCommandLine.java
@@ -30,14 +30,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
public class LogFeederCommandLine {
private static final Logger LOG = LoggerFactory.getLogger(LogFeederCommandLine.class);
-
- private static final String MONITOR_COMMAND = "monitor";
private static final String TEST_COMMAND = "test";
private static final String TEST_LOG_ENTRY_OPTION = "test-log-entry";
@@ -60,11 +57,6 @@ public class LogFeederCommandLine {
.desc("Print commands")
.build();
- Option monitorOption = Option.builder("m")
- .longOpt(MONITOR_COMMAND)
- .desc("Monitor log files")
- .build();
-
Option testOption = Option.builder("t")
.longOpt(TEST_COMMAND)
.desc("Test if log entry is parseable")
@@ -95,7 +87,6 @@ public class LogFeederCommandLine {
.build();
options.addOption(helpOption);
- options.addOption(monitorOption);
options.addOption(testOption);
options.addOption(testLogEntryOption);
options.addOption(testShipperConfOption);
@@ -111,21 +102,14 @@ public class LogFeederCommandLine {
System.exit(0);
}
String command = "";
- if (cli.hasOption("m")) {
- command = MONITOR_COMMAND;
- } else if (cli.hasOption("t")) {
+ if (cli.hasOption("t")) {
command = TEST_COMMAND;
validateRequiredOptions(cli, command, testLogEntryOption, testShipperConfOption);
} else {
- List<String> commands = Arrays.asList(MONITOR_COMMAND, TEST_COMMAND);
- helpFormatter.printHelp(COMMAND_LINE_SYNTAX, options);
- LOG.error(String.format("One of the supported commands is required (%s)", StringUtils.join(commands, "|")));
- System.exit(1);
+ LOG.info("Start application in monitor mode ");
}
} catch (Exception e) {
- LOG.error("Error parsing command line parameters", e);
- helpFormatter.printHelp(COMMAND_LINE_SYNTAX, options);
- System.exit(1);
+ LOG.info("Error parsing command line parameters: {}. LogFeeder will be started in monitoring mode.", e.getMessage());
}
}
@@ -142,12 +126,8 @@ public class LogFeederCommandLine {
}
}
- public boolean isMonitor() {
- return cli.hasOption('m');
- }
-
public boolean isTest() {
- return cli.hasOption('t');
+ return cli != null && cli.hasOption('t');
}
public String getTestLogEntry() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 243b344..35c0e6a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -33,6 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Maps;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputManager;
@@ -47,7 +49,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
-import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
@@ -62,16 +63,24 @@ import org.apache.log4j.Logger;
import com.google.gson.reflect.TypeToken;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
public class ConfigHandler implements InputConfigMonitor {
private static final Logger LOG = Logger.getLogger(ConfigHandler.class);
private final LogSearchConfigLogFeeder logSearchConfig;
-
- private final OutputManager outputManager = new OutputManager();
- private final InputManager inputManager = new InputManager();
+
+ @Inject
+ private InputManager inputManager;
+ @Inject
+ private OutputManager outputManager;
+ @Inject
+ private LogFeederProps logFeederProps;
private final Map<String, Object> globalConfigs = new HashMap<>();
- private final List<String> globalConfigJsons = new ArrayList<String>();
+ private final List<String> globalConfigJsons = new ArrayList<>();
private final List<InputDescriptor> inputConfigList = new ArrayList<>();
private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
@@ -82,9 +91,11 @@ public class ConfigHandler implements InputConfigMonitor {
public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) {
this.logSearchConfig = logSearchConfig;
}
-
+
+ @PostConstruct
public void init() throws Exception {
loadConfigFiles();
+ logSearchConfig.init(Maps.fromProperties(logFeederProps.getProperties()), logFeederProps.getClusterName());
loadOutputs();
simulateIfNeeded();
@@ -114,7 +125,7 @@ public class ConfigHandler implements InputConfigMonitor {
private List<String> getConfigFiles() {
List<String> configFiles = new ArrayList<>();
- String logFeederConfigFilesProperty = LogFeederPropertiesUtil.getConfigFiles();
+ String logFeederConfigFilesProperty = logFeederProps.getConfigFiles();
LOG.info("logfeeder.config.files=" + logFeederConfigFilesProperty);
if (logFeederConfigFilesProperty != null) {
configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(",")));
@@ -217,7 +228,7 @@ public class ConfigHandler implements InputConfigMonitor {
}
private void simulateIfNeeded() throws Exception {
- int simulatedInputNumber = LogFeederPropertiesUtil.getSimulateInputNumber();
+ int simulatedInputNumber = logFeederProps.getInputSimulateConfig().getSimulateInputNumber();
if (simulatedInputNumber == 0)
return;
@@ -347,18 +358,15 @@ public class ConfigHandler implements InputConfigMonitor {
}
private void sortFilters() {
- Collections.sort(filterConfigList, new Comparator<FilterDescriptor>() {
- @Override
- public int compare(FilterDescriptor o1, FilterDescriptor o2) {
- Integer o1Sort = o1.getSortOrder();
- Integer o2Sort = o2.getSortOrder();
- if (o1Sort == null || o2Sort == null) {
- return 0;
- }
-
- return o1Sort - o2Sort;
+ Collections.sort(filterConfigList, (o1, o2) -> {
+ Integer o1Sort = o1.getSortOrder();
+ Integer o2Sort = o2.getSortOrder();
+ if (o1Sort == null || o2Sort == null) {
+ return 0;
}
- } );
+
+ return o1Sort - o2Sort;
+ });
}
private void assignOutputsToInputs(String serviceName) {
@@ -428,6 +436,7 @@ public class ConfigHandler implements InputConfigMonitor {
outputManager.addMetricsContainers(metricsList);
}
+ @PreDestroy
public void close() {
inputManager.close();
outputManager.close();
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
index 5c20a8e..30bd9fd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
@@ -21,6 +21,7 @@ package org.apache.ambari.logfeeder.common;
import java.util.List;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
@@ -30,6 +31,7 @@ public abstract class ConfigItem {
protected static final Logger LOG = Logger.getLogger(ConfigBlock.class);
private boolean drain = false;
+ private LogFeederProps logFeederProps;
public MetricData statMetric = new MetricData(getStatMetricName(), false);
public ConfigItem() {
@@ -59,7 +61,8 @@ public abstract class ConfigItem {
/**
* This method needs to be overwritten by deriving classes.
*/
- public void init() throws Exception {
+ public void init(LogFeederProps logFeederProps) throws Exception {
+ this.logFeederProps = logFeederProps;
}
public abstract boolean isEnabled();
@@ -94,4 +97,7 @@ public abstract class ConfigItem {
this.drain = drain;
}
+ public LogFeederProps getLogFeederProps() {
+ return logFeederProps;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
index ec29f69..1a701e1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.output.Output;
@@ -76,7 +77,7 @@ public class LogEntryParseTester {
ConfigHandler configHandler = new ConfigHandler(null);
Input input = configHandler.getTestInput(inputConfig, logId);
final Map<String, Object> result = new HashMap<>();
- input.getFirstFilter().init();
+ input.getFirstFilter().init(new LogFeederProps());
input.addOutput(new Output() {
@Override
public void write(String block, InputMarker inputMarker) throws Exception {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index a7cccc6..b241831 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -38,4 +38,54 @@ public class LogFeederConstants {
public static final String S3_PATH_SEPARATOR = "/";
public static final String IN_MEMORY_TIMESTAMP = "in_memory_timestamp";
+
+ public static final String LOGFEEDER_PROPERTIES_FILE = "logfeeder.properties";
+ public static final String CLUSTER_NAME_PROPERTY = "cluster.name";
+ public static final String TMP_DIR_PROPERTY = "logfeeder.tmp.dir";
+
+ public static final String METRICS_COLLECTOR_PROTOCOL_PROPERTY = "logfeeder.metrics.collector.protocol";
+ public static final String METRICS_COLLECTOR_PORT_PROPERTY = "logfeeder.metrics.collector.port";
+ public static final String METRICS_COLLECTOR_HOSTS_PROPERTY = "logfeeder.metrics.collector.hosts";
+ public static final String METRICS_COLLECTOR_PATH_PROPERTY = "logfeeder.metrics.collector.path";
+
+ public static final String LOG_FILTER_ENABLE_PROPERTY = "logfeeder.log.filter.enable";
+ public static final String INCLUDE_DEFAULT_LEVEL_PROPERTY = "logfeeder.include.default.level";
+
+ public static final String CONFIG_DIR_PROPERTY = "logfeeder.config.dir";
+ public static final String CONFIG_FILES_PROPERTY = "logfeeder.config.files";
+
+ public static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number";
+ public static final int DEFAULT_SIMULATE_INPUT_NUMBER = 0;
+ public static final String SIMULATE_LOG_LEVEL_PROPERTY = "logfeeder.simulate.log_level";
+ public static final String DEFAULT_SIMULATE_LOG_LEVEL = "WARN";
+ public static final String SIMULATE_NUMBER_OF_WORDS_PROPERTY = "logfeeder.simulate.number_of_words";
+ public static final int DEFAULT_SIMULATE_NUMBER_OF_WORDS = 1000;
+ public static final String SIMULATE_MIN_LOG_WORDS_PROPERTY = "logfeeder.simulate.min_log_words";
+ public static final int DEFAULT_SIMULATE_MIN_LOG_WORDS = 5;
+ public static final String SIMULATE_MAX_LOG_WORDS_PROPERTY = "logfeeder.simulate.max_log_words";
+ public static final int DEFAULT_SIMULATE_MAX_LOG_WORDS = 5;
+ public static final String SIMULATE_SLEEP_MILLISECONDS_PROPERTY = "logfeeder.simulate.sleep_milliseconds";
+ public static final int DEFAULT_SIMULATE_SLEEP_MILLISECONDS = 10000;
+ public static final String SIMULATE_LOG_IDS_PROPERTY = "logfeeder.simulate.log_ids";
+
+ public static final String SOLR_KERBEROS_ENABLE_PROPERTY = "logfeeder.solr.kerberos.enable";
+ public static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false;
+ public static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab";
+ public static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file";
+
+ public static final String CACHE_ENABLED_PROPERTY = "logfeeder.cache.enabled";
+ public static final boolean DEFAULT_CACHE_ENABLED = false;
+ public static final String CACHE_KEY_FIELD_PROPERTY = "logfeeder.cache.key.field";
+ public static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
+ public static final String CACHE_SIZE_PROPERTY = "logfeeder.cache.size";
+ public static final int DEFAULT_CACHE_SIZE = 100;
+ public static final String CACHE_LAST_DEDUP_ENABLED_PROPERTY = "logfeeder.cache.last.dedup.enabled";
+ public static final boolean DEFAULT_CACHE_LAST_DEDUP_ENABLED = false;
+ public static final String CACHE_DEDUP_INTERVAL_PROPERTY = "logfeeder.cache.dedup.interval";
+ public static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
+
+ public static final String CHECKPOINT_FOLDER_PROPERTY = "logfeeder.checkpoint.folder";
+ public static final String CHECKPOINT_EXTENSION_PROPERTY = "logfeeder.checkpoint.extension";
+ public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp";
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
new file mode 100644
index 0000000..cfb6c78
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+import com.google.common.collect.Maps;
+import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.InputConfigUploader;
+import org.apache.ambari.logfeeder.input.InputManager;
+import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
+import org.apache.ambari.logfeeder.metrics.MetricsManager;
+import org.apache.ambari.logfeeder.metrics.StatsLogger;
+import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
+
+import javax.inject.Inject;
+
+@Configuration
+@PropertySource(value = {
+ "classpath:" + LogFeederConstants.LOGFEEDER_PROPERTIES_FILE
+})
+public class ApplicationConfig {
+
+ @Inject
+ private LogFeederProps logFeederProps;
+
+ @Bean
+ public static PropertySourcesPlaceholderConfigurer propertyConfigurer() {
+ return new PropertySourcesPlaceholderConfigurer();
+ }
+
+ @Bean
+ public LogFeederSecurityConfig logFeederSecurityConfig() {
+ return new LogFeederSecurityConfig();
+ }
+
+ @Bean
+ @DependsOn("logSearchConfigLogFeeder")
+ public ConfigHandler configHandler() throws Exception {
+ return new ConfigHandler(logSearchConfigLogFeeder());
+ }
+
+ @Bean
+ @DependsOn("logFeederSecurityConfig")
+ public LogSearchConfigLogFeeder logSearchConfigLogFeeder() throws Exception {
+ return LogSearchConfigFactory.createLogSearchConfigLogFeeder(
+ Maps.fromProperties(logFeederProps.getProperties()),
+ logFeederProps.getClusterName(),
+ LogSearchConfigLogFeederZK.class,false);
+ }
+
+ @Bean
+ public MetricsManager metricsManager() {
+ return new MetricsManager();
+ }
+
+ @Bean
+ @DependsOn("configHandler")
+ public LogLevelFilterHandler logLevelFilterHandler() throws Exception {
+ return new LogLevelFilterHandler(logSearchConfigLogFeeder());
+ }
+
+ @Bean
+ @DependsOn({"configHandler", "logSearchConfigLogFeeder", "logLevelFilterHandler"})
+ public InputConfigUploader inputConfigUploader() {
+ return new InputConfigUploader();
+ }
+
+ @Bean
+ @DependsOn("inputConfigUploader")
+ public StatsLogger statsLogger() {
+ return new StatsLogger();
+ }
+
+ @Bean
+ public InputManager inputManager() {
+ return new InputManager();
+ }
+
+ @Bean
+ public OutputManager outputManager() {
+ return new OutputManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java
new file mode 100644
index 0000000..cf087f9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/InputSimulateConfig.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class InputSimulateConfig {
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SIMULATE_INPUT_NUMBER_PROPERTY,
+ description = "The number of the simulator instances to run with. O means no simulation.",
+ examples = {"10"},
+ defaultValue = LogFeederConstants.DEFAULT_SIMULATE_INPUT_NUMBER + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.SIMULATE_INPUT_NUMBER_PROPERTY + ":0}")
+ private Integer simulateInputNumber;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SIMULATE_LOG_LEVEL_PROPERTY,
+ description = "The log level to create the simulated log entries with.",
+ examples = {"INFO"},
+ defaultValue = LogFeederConstants.DEFAULT_SIMULATE_LOG_LEVEL,
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.SIMULATE_LOG_LEVEL_PROPERTY + ":"+ LogFeederConstants.DEFAULT_SIMULATE_LOG_LEVEL + "}")
+ private String simulateLogLevel;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SIMULATE_NUMBER_OF_WORDS_PROPERTY,
+ description = "The size of the set of words that may be used to create the simulated log entries with.",
+ examples = {"100"},
+ defaultValue = LogFeederConstants.DEFAULT_SIMULATE_NUMBER_OF_WORDS + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.SIMULATE_NUMBER_OF_WORDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_NUMBER_OF_WORDS + "}")
+ private Integer simulateNumberOfWords;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SIMULATE_MIN_LOG_WORDS_PROPERTY,
+ description = "The minimum number of words in a simulated log entry.",
+ examples = {"3"},
+ defaultValue = LogFeederConstants.DEFAULT_SIMULATE_MIN_LOG_WORDS + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.SIMULATE_MIN_LOG_WORDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_MIN_LOG_WORDS + "}")
+ private Integer simulateMinLogWords;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SIMULATE_MAX_LOG_WORDS_PROPERTY,
+ description = "The maximum number of words in a simulated log entry.",
+ examples = {"8"},
+ defaultValue = LogFeederConstants.DEFAULT_SIMULATE_MAX_LOG_WORDS + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.SIMULATE_MAX_LOG_WORDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_MAX_LOG_WORDS + "}")
+ private Integer simulateMaxLogWords;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SIMULATE_SLEEP_MILLISECONDS_PROPERTY,
+ description = "The milliseconds to sleep between creating two simulated log entries.",
+ examples = {"5000"},
+ defaultValue = LogFeederConstants.DEFAULT_SIMULATE_SLEEP_MILLISECONDS + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.SIMULATE_SLEEP_MILLISECONDS_PROPERTY + ":" + LogFeederConstants.DEFAULT_SIMULATE_SLEEP_MILLISECONDS + "}")
+ private Integer simulateSleepMilliseconds;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SIMULATE_LOG_IDS_PROPERTY,
+ description = "The comma separated list of log ids for which to create the simulated log entries.",
+ examples = {"ambari_server,zookeeper,infra_solr,logsearch_app"},
+ defaultValue = "The log ids of the installed services in the cluster",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.SIMULATE_LOG_IDS_PROPERTY + ":}")
+ private String simulateLogIds;
+
+ public Integer getSimulateInputNumber() {
+ return simulateInputNumber;
+ }
+
+ public void setSimulateInputNumber(Integer simulateInputNumber) {
+ this.simulateInputNumber = simulateInputNumber;
+ }
+
+ public String getSimulateLogLevel() {
+ return simulateLogLevel;
+ }
+
+ public void setSimulateLogLevel(String simulateLogLevel) {
+ this.simulateLogLevel = simulateLogLevel;
+ }
+
+ public Integer getSimulateNumberOfWords() {
+ return simulateNumberOfWords;
+ }
+
+ public void setSimulateNumberOfWords(Integer simulateNumberOfWords) {
+ this.simulateNumberOfWords = simulateNumberOfWords;
+ }
+
+ public Integer getSimulateMinLogWords() {
+ return simulateMinLogWords;
+ }
+
+ public void setSimulateMinLogWords(Integer simulateMinLogWords) {
+ this.simulateMinLogWords = simulateMinLogWords;
+ }
+
+ public Integer getSimulateMaxLogWords() {
+ return simulateMaxLogWords;
+ }
+
+ public void setSimulateMaxLogWords(Integer simulateMaxLogWords) {
+ this.simulateMaxLogWords = simulateMaxLogWords;
+ }
+
+ public Integer getSimulateSleepMilliseconds() {
+ return simulateSleepMilliseconds;
+ }
+
+ public void setSimulateSleepMilliseconds(Integer simulateSleepMilliseconds) {
+ this.simulateSleepMilliseconds = simulateSleepMilliseconds;
+ }
+
+ public String getSimulateLogIds() {
+ return simulateLogIds;
+ }
+
+ public void setSimulateLogIds(String simulateLogIds) {
+ this.simulateLogIds = simulateLogIds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
new file mode 100644
index 0000000..353bdc1
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class LogEntryCacheConfig {
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CACHE_ENABLED_PROPERTY,
+ description = "Enables the usage of a cache to avoid duplications.",
+ examples = {"true"},
+ defaultValue = LogFeederConstants.DEFAULT_CACHE_ENABLED + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CACHE_ENABLED_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_ENABLED + "}")
+ private boolean cacheEnabled;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CACHE_KEY_FIELD_PROPERTY,
+ description = "The field which's value should be cached and should be checked for repetitions.",
+ examples = {"some_field_prone_to_repeating_value"},
+ defaultValue = LogFeederConstants.DEFAULT_CACHE_KEY_FIELD,
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CACHE_KEY_FIELD_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_KEY_FIELD + "}")
+ private String cacheKeyField;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CACHE_SIZE_PROPERTY,
+ description = "The number of log entries to cache in order to avoid duplications.",
+ examples = {"50"},
+ defaultValue = LogFeederConstants.DEFAULT_CACHE_SIZE + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CACHE_SIZE_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_SIZE + "}")
+ private Integer cacheSize;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CACHE_LAST_DEDUP_ENABLED_PROPERTY,
+ description = "Enable filtering directly repeating log entries irrelevant of the time spent between them.",
+ examples = {"true"},
+ defaultValue = LogFeederConstants.DEFAULT_CACHE_LAST_DEDUP_ENABLED + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CACHE_LAST_DEDUP_ENABLED_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_LAST_DEDUP_ENABLED + "}")
+ private Boolean cacheLastDedupEnabled;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CACHE_DEDUP_INTERVAL_PROPERTY,
+ description = "Maximum number of milliseconds between two identical messages to be filtered out.",
+ examples = {"500"},
+ defaultValue = LogFeederConstants.DEFAULT_CACHE_DEDUP_INTERVAL + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CACHE_DEDUP_INTERVAL_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_DEDUP_INTERVAL + "}")
+ private String cacheDedupInterval;
+
+ public boolean isCacheEnabled() {
+ return cacheEnabled;
+ }
+
+ public void setCacheEnabled(boolean cacheEnabled) {
+ this.cacheEnabled = cacheEnabled;
+ }
+
+ public String getCacheKeyField() {
+ return cacheKeyField;
+ }
+
+ public void setCacheKeyField(String cacheKeyField) {
+ this.cacheKeyField = cacheKeyField;
+ }
+
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public boolean isCacheLastDedupEnabled() {
+ return cacheLastDedupEnabled;
+ }
+
+ public void setCacheLastDedupEnabled(boolean cacheLastDedupEnabled) {
+ this.cacheLastDedupEnabled = cacheLastDedupEnabled;
+ }
+
+ public String getCacheDedupInterval() {
+ return cacheDedupInterval;
+ }
+
+ public void setCacheDedupInterval(String cacheDedupInterval) {
+ this.cacheDedupInterval = cacheDedupInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
new file mode 100644
index 0000000..367d1cd
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.env.AbstractEnvironment;
+import org.springframework.core.env.Environment;
+import org.springframework.core.env.MapPropertySource;
+import org.springframework.core.env.MutablePropertySources;
+import org.springframework.core.io.support.ResourcePropertySource;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+@Configuration
+public class LogFeederProps {
+
+ @Inject
+ private Environment env;
+
+ private Properties properties;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLUSTER_NAME_PROPERTY,
+ description = "The name of the cluster the Log Feeder program runs in.",
+ examples = {"cl1"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLUSTER_NAME_PROPERTY + "}")
+ private String clusterName;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.TMP_DIR_PROPERTY,
+ description = "The tmp dir used for creating temporary files.",
+ examples = {"/tmp/"},
+ defaultValue = "java.io.tmpdir",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.TMP_DIR_PROPERTY + ":#{systemProperties['java.io.tmpdir']}}")
+ private String tmpDir;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.LOG_FILTER_ENABLE_PROPERTY,
+ description = "Enables the filtering of the log entries by log level filters.",
+ examples = {"true"},
+ defaultValue = "false",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.LOG_FILTER_ENABLE_PROPERTY + "}")
+ private boolean logLevelFilterEnabled;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.INCLUDE_DEFAULT_LEVEL_PROPERTY,
+ description = "Comma separated list of the default log levels to be enabled by the filtering.",
+ examples = {"FATAL,ERROR,WARN"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("#{'${" + LogFeederConstants.INCLUDE_DEFAULT_LEVEL_PROPERTY + ":}'.split(',')}")
+ private List<String> includeDefaultLogLevels;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CONFIG_DIR_PROPERTY,
+ description = "The directory where shipper configuration files are looked for.",
+ examples = {"/etc/ambari-logsearch-logfeeder/conf"},
+ defaultValue = "etc/ambari-logsearch-logfeeder/conf",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CONFIG_DIR_PROPERTY + ":/etc/ambari-logsearch-logfeeder/conf}")
+ private String confDir;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CONFIG_FILES_PROPERTY,
+ description = "Comma separated list of the config files containing global / output configurations.",
+ examples = {"global.json,output.json", "/etc/ambari-logsearch-logfeeder/conf/global.json"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CONFIG_FILES_PROPERTY + ":}")
+ private String configFiles;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CHECKPOINT_EXTENSION_PROPERTY,
+ description = "The extension used for checkpoint files.",
+ examples = {"ckp"},
+ defaultValue = LogFeederConstants.DEFAULT_CHECKPOINT_EXTENSION,
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CHECKPOINT_EXTENSION_PROPERTY + ":" + LogFeederConstants.DEFAULT_CHECKPOINT_EXTENSION + "}")
+ private String checkPointExtension;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CHECKPOINT_FOLDER_PROPERTY,
+ description = "The folder where checkpoint files are stored.",
+ examples = {"/etc/ambari-logsearch-logfeeder/conf/checkpoints"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CHECKPOINT_FOLDER_PROPERTY + ":/etc/ambari-logsearch-logfeeder/conf/checkpoints}")
+ public String checkpointFolder;
+
+ @Inject
+ private LogEntryCacheConfig logEntryCacheConfig;
+
+ @Inject
+ private InputSimulateConfig inputSimulateConfig;
+
+ @Inject
+ private LogFeederSecurityConfig logFeederSecurityConfig;
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public String getTmpDir() {
+ return tmpDir;
+ }
+
+ public boolean isLogLevelFilterEnabled() {
+ return logLevelFilterEnabled;
+ }
+
+ public List<String> getIncludeDefaultLogLevels() {
+ return includeDefaultLogLevels;
+ }
+
+ public String getConfDir() {
+ return confDir;
+ }
+
+ public void setConfDir(String confDir) {
+ this.confDir = confDir;
+ }
+
+ public String getConfigFiles() {
+ return configFiles;
+ }
+
+ public void setConfigFiles(String configFiles) {
+ this.configFiles = configFiles;
+ }
+
+ public LogEntryCacheConfig getLogEntryCacheConfig() {
+ return logEntryCacheConfig;
+ }
+
+ public void setLogEntryCacheConfig(LogEntryCacheConfig logEntryCacheConfig) {
+ this.logEntryCacheConfig = logEntryCacheConfig;
+ }
+
+ public InputSimulateConfig getInputSimulateConfig() {
+ return inputSimulateConfig;
+ }
+
+ public void setInputSimulateConfig(InputSimulateConfig inputSimulateConfig) {
+ this.inputSimulateConfig = inputSimulateConfig;
+ }
+
+ public LogFeederSecurityConfig getLogFeederSecurityConfig() {
+ return logFeederSecurityConfig;
+ }
+
+ public void setLogFeederSecurityConfig(LogFeederSecurityConfig logFeederSecurityConfig) {
+ this.logFeederSecurityConfig = logFeederSecurityConfig;
+ }
+
+ public String getCheckPointExtension() {
+ return checkPointExtension;
+ }
+
+ public void setCheckPointExtension(String checkPointExtension) {
+ this.checkPointExtension = checkPointExtension;
+ }
+
+ public String getCheckpointFolder() {
+ return checkpointFolder;
+ }
+
+ public void setCheckpointFolder(String checkpointFolder) {
+ this.checkpointFolder = checkpointFolder;
+ }
+
+ @PostConstruct
+ public void init() {
+ properties = new Properties();
+ MutablePropertySources propSrcs = ((AbstractEnvironment) env).getPropertySources();
+ ResourcePropertySource propertySource = (ResourcePropertySource) propSrcs.get("class path resource [" +
+ LogFeederConstants.LOGFEEDER_PROPERTIES_FILE + "]");
+ if (propertySource != null) {
+ Stream.of(propertySource)
+ .map(MapPropertySource::getPropertyNames)
+ .flatMap(Arrays::<String>stream)
+ .forEach(propName -> properties.setProperty(propName, env.getProperty(propName)));
+ } else {
+ throw new IllegalArgumentException("Cannot find logfeeder.properties on the classpath");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
new file mode 100644
index 0000000..8a45753
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+
+import javax.annotation.PostConstruct;
+import java.io.File;
+import java.nio.charset.Charset;
+
+public class LogFeederSecurityConfig {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogFeederSecurityConfig.class);
+
+ private static final String KEYSTORE_LOCATION_ARG = "javax.net.ssl.keyStore";
+ private static final String TRUSTSTORE_LOCATION_ARG = "javax.net.ssl.trustStore";
+ private static final String KEYSTORE_TYPE_ARG = "javax.net.ssl.keyStoreType";
+ private static final String TRUSTSTORE_TYPE_ARG = "javax.net.ssl.trustStoreType";
+ private static final String KEYSTORE_PASSWORD_ARG = "javax.net.ssl.keyStorePassword";
+ private static final String TRUSTSTORE_PASSWORD_ARG = "javax.net.ssl.trustStorePassword";
+ private static final String KEYSTORE_PASSWORD_PROPERTY_NAME = "logfeeder_keystore_password";
+ private static final String TRUSTSTORE_PASSWORD_PROPERTY_NAME = "logfeeder_truststore_password";
+ private static final String KEYSTORE_PASSWORD_FILE = "ks_pass.txt";
+ private static final String TRUSTSTORE_PASSWORD_FILE = "ts_pass.txt";
+
+ private static final String LOGFEEDER_CERT_DEFAULT_FOLDER = "/etc/ambari-logsearch-portal/conf/keys";
+ private static final String LOGFEEDER_STORE_DEFAULT_PASSWORD = "bigdata";
+
+ private static final String CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY = "hadoop.security.credential.provider.path";
+
+ @LogSearchPropertyDescription(
+ name = CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY,
+ description = "The jceks file that provides passwords.",
+ examples = {"jceks://file/etc/ambari-logsearch-logfeeder/conf/logfeeder.jceks"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY + ":}")
+ private String credentialStoreProviderPath;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SOLR_JAAS_FILE_PROPERTY,
+ description = "The jaas file used for solr.",
+ examples = {"/etc/ambari-logsearch-logfeeder/conf/logfeeder_jaas.conf"},
+ defaultValue = LogFeederConstants.DEFAULT_SOLR_JAAS_FILE,
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.SOLR_JAAS_FILE_PROPERTY + ":" + LogFeederConstants.DEFAULT_SOLR_JAAS_FILE + "}")
+ private String solrJaasFile;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.SOLR_KERBEROS_ENABLE_PROPERTY,
+ description = "Enables using kerberos for accessing solr.",
+ examples = {"true"},
+ defaultValue = LogFeederConstants.DEFAULT_SOLR_KERBEROS_ENABLE + "",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.SOLR_KERBEROS_ENABLE_PROPERTY + ":" + LogFeederConstants.DEFAULT_SOLR_KERBEROS_ENABLE + "}")
+ private Boolean solrKerberosEnabled;
+
+ public String getKeyStoreLocation() {
+ return System.getProperty(KEYSTORE_LOCATION_ARG);
+ }
+
+ public String getKeyStoreType() {
+ return System.getProperty(KEYSTORE_TYPE_ARG);
+ }
+
+ public String getKeyStorePassword() {
+ return System.getProperty(KEYSTORE_PASSWORD_ARG);
+ }
+
+ public String getTrustStoreLocation() {
+ return System.getProperty(TRUSTSTORE_LOCATION_ARG);
+ }
+
+ public String getTrustStoreType() {
+ return System.getProperty(TRUSTSTORE_TYPE_ARG);
+ }
+
+ public String getTrustStorePassword() {
+ return System.getProperty(TRUSTSTORE_PASSWORD_ARG);
+ }
+
+ public String getCredentialStoreProviderPath() {
+ return credentialStoreProviderPath;
+ }
+
+ public void setCredentialStoreProviderPath(String credentialStoreProviderPath) {
+ this.credentialStoreProviderPath = credentialStoreProviderPath;
+ }
+
+ public String getSolrJaasFile() {
+ return solrJaasFile;
+ }
+
+ public void setSolrJaasFile(String solrJaasFile) {
+ this.solrJaasFile = solrJaasFile;
+ }
+
+ public boolean isSolrKerberosEnabled() {
+ return solrKerberosEnabled;
+ }
+
+ public void setSolrKerberosEnabled(Boolean solrKerberosEnabled) {
+ this.solrKerberosEnabled = solrKerberosEnabled;
+ }
+
+ @PostConstruct
+ public void ensureStorePasswords() {
+ ensureStorePassword(KEYSTORE_LOCATION_ARG, KEYSTORE_PASSWORD_ARG, KEYSTORE_PASSWORD_PROPERTY_NAME, KEYSTORE_PASSWORD_FILE);
+ ensureStorePassword(TRUSTSTORE_LOCATION_ARG, TRUSTSTORE_PASSWORD_ARG, TRUSTSTORE_PASSWORD_PROPERTY_NAME, TRUSTSTORE_PASSWORD_FILE);
+ }
+
+ private void ensureStorePassword(String locationArg, String pwdArg, String propertyName, String fileName) {
+ if (StringUtils.isNotEmpty(System.getProperty(locationArg)) && StringUtils.isEmpty(System.getProperty(pwdArg))) {
+ String password = getPassword(propertyName, fileName);
+ System.setProperty(pwdArg, password);
+ }
+ }
+
+ private String getPassword(String propertyName, String fileName) {
+ String credentialStorePassword = getPasswordFromCredentialStore(propertyName);
+ if (credentialStorePassword != null) {
+ return credentialStorePassword;
+ }
+
+ String filePassword = getPasswordFromFile(fileName);
+ if (filePassword != null) {
+ return filePassword;
+ }
+
+ return LOGFEEDER_STORE_DEFAULT_PASSWORD;
+ }
+
+ private String getPasswordFromCredentialStore(String propertyName) {
+ try {
+ if (StringUtils.isEmpty(credentialStoreProviderPath)) {
+ return null;
+ }
+
+ org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
+ config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, credentialStoreProviderPath);
+ char[] passwordChars = config.getPassword(propertyName);
+ return (ArrayUtils.isNotEmpty(passwordChars)) ? new String(passwordChars) : null;
+ } catch (Exception e) {
+ LOG.warn(String.format("Could not load password %s from credential store, using default password", propertyName));
+ return null;
+ }
+ }
+
+ private String getPasswordFromFile(String fileName) {
+ try {
+ File pwdFile = new File(LOGFEEDER_CERT_DEFAULT_FOLDER, fileName);
+ if (!pwdFile.exists()) {
+ FileUtils.writeStringToFile(pwdFile, LOGFEEDER_STORE_DEFAULT_PASSWORD, Charset.defaultCharset());
+ return LOGFEEDER_STORE_DEFAULT_PASSWORD;
+ } else {
+ return FileUtils.readFileToString(pwdFile, Charset.defaultCharset());
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception occurred during read/write password file for keystore/truststore.", e);
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java
new file mode 100644
index 0000000..4b3c6fb
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/MetricsCollectorConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+import com.google.common.base.Splitter;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+
+@Configuration
+public class MetricsCollectorConfig {
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.METRICS_COLLECTOR_HOSTS_PROPERTY,
+ description = "Comma separtaed list of metric collector hosts.",
+ examples = {"c6401.ambari.apache.org,c6402.ambari.apache.org"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.METRICS_COLLECTOR_HOSTS_PROPERTY + ":}")
+ private String hostsString;
+
+ private List<String> hosts;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.METRICS_COLLECTOR_PROTOCOL_PROPERTY,
+ description = "The protocol used by metric collectors.",
+ examples = {"http", "https"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.METRICS_COLLECTOR_PROTOCOL_PROPERTY + ":#{NULL}}")
+ private String protocol;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.METRICS_COLLECTOR_PORT_PROPERTY,
+ description = "The port used by metric collectors.",
+ examples = {"6188"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.METRICS_COLLECTOR_PORT_PROPERTY + ":#{NULL}}")
+ private String port;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.METRICS_COLLECTOR_PATH_PROPERTY,
+ description = "The path used by metric collectors.",
+ examples = {"/ws/v1/timeline/metrics"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.METRICS_COLLECTOR_PATH_PROPERTY + ":#{NULL}}")
+ private String path;
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(List<String> hosts) {
+ this.hosts = hosts;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getHostsString() {
+ return hostsString;
+ }
+
+ @PostConstruct
+ public void init() {
+ if (StringUtils.isNotBlank(hostsString)) {
+ hosts = Splitter.on(',').splitToList(hostsString);
+ } else {
+ hosts = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index 8e8834b..a06b348 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.ambari.logfeeder.common.ConfigItem;
import org.apache.ambari.logfeeder.common.LogFeederException;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.mapper.Mapper;
@@ -56,12 +57,12 @@ public abstract class Filter extends ConfigItem {
}
@Override
- public void init() throws Exception {
- super.init();
+ public void init(LogFeederProps logFeederProps) throws Exception {
+ super.init(logFeederProps);
initializePostMapValues();
if (nextFilter != null) {
- nextFilter.init();
+ nextFilter.init(logFeederProps);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index fc7a565..f0ef31b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -35,6 +35,7 @@ import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.exception.GrokException;
import org.apache.ambari.logfeeder.common.LogFeederException;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -73,13 +74,13 @@ public class FilterGrok extends Filter {
private MetricData grokErrorMetric = new MetricData("filter.error.grok", false);
@Override
- public void init() throws Exception {
- super.init();
+ public void init(LogFeederProps logFeederProps) throws Exception {
+ super.init(logFeederProps);
try {
messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern());
multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern());
- sourceField = ((FilterGrokDescriptor)filterDescriptor).getSourceField();
+ sourceField = filterDescriptor.getSourceField();
removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField);
LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index 8e5aee8..adcf0a4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.regex.Pattern;
import org.apache.ambari.logfeeder.common.LogFeederException;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
@@ -41,8 +42,8 @@ public class FilterKeyValue extends Filter {
private MetricData errorMetric = new MetricData("filter.error.keyvalue", false);
@Override
- public void init() throws Exception {
- super.init();
+ public void init(LogFeederProps logFeederProps) throws Exception {
+ super.init(logFeederProps);
sourceField = filterDescriptor.getSourceField();
valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
index b021c37..cf295c5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
@@ -28,8 +28,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
import org.apache.commons.lang.ObjectUtils;
@@ -55,6 +55,8 @@ public abstract class AbstractInputFile extends Input {
private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>();
private Map<String, InputMarker> lastCheckPointInputMarkers = new HashMap<>();
+ private LogFeederProps logFeederProps;
+
@Override
protected String getStatMetricName() {
return "input.files.read_lines";
@@ -66,10 +68,11 @@ public abstract class AbstractInputFile extends Input {
}
@Override
- public void init() throws Exception {
+ public void init(LogFeederProps logFeederProps) throws Exception {
+ this.logFeederProps = logFeederProps;
LOG.info("init() called");
- checkPointExtension = LogFeederPropertiesUtil.getCheckPointExtension();
+ checkPointExtension = logFeederProps.getCheckPointExtension();
// Let's close the file and set it to true after we start monitoring it
setClosed(true);
@@ -86,7 +89,7 @@ public abstract class AbstractInputFile extends Input {
LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
- super.init();
+ super.init(logFeederProps);
}
protected void processFile(File logPathFile, boolean follow) throws FileNotFoundException, IOException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4a668f0b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 972011d..7b9dcd4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.cache.LRUCache;
import org.apache.ambari.logfeeder.common.ConfigItem;
import org.apache.ambari.logfeeder.common.LogFeederException;
@@ -31,7 +33,6 @@ import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
-import org.apache.ambari.logfeeder.util.LogFeederPropertiesUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
@@ -49,7 +50,7 @@ public abstract class Input extends ConfigItem implements Runnable {
protected InputManager inputManager;
protected OutputManager outputManager;
- private List<Output> outputList = new ArrayList<Output>();
+ private List<Output> outputList = new ArrayList<>();
private Thread thread;
private String type;
@@ -128,15 +129,15 @@ public abstract class Input extends ConfigItem implements Runnable {
}
@Override
- public void init() throws Exception {
- super.init();
- initCache();
+ public void init(LogFeederProps logFeederProps) throws Exception {
+ super.init(logFeederProps);
+ initCache(logFeederProps.getLogEntryCacheConfig());
tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL);
useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5);
genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5);
if (firstFilter != null) {
- firstFilter.init();
+ firstFilter.init(logFeederProps);
}
}
@@ -239,28 +240,28 @@ public abstract class Input extends ConfigItem implements Runnable {
}
}
- private void initCache() {
+ private void initCache(LogEntryCacheConfig cacheConfig) {
boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null
? inputDescriptor.isCacheEnabled()
- : LogFeederPropertiesUtil.isCacheEnabled();
+ : cacheConfig.isCacheEnabled();
if (cacheEnabled) {
String cacheKeyField = inputDescriptor.getCacheKeyField() != null
? inputDescriptor.getCacheKeyField()
- : LogFeederPropertiesUtil.getCacheKeyField();
+ : cacheConfig.getCacheKeyField();
setCacheKeyField(cacheKeyField);
int cacheSize = inputDescriptor.getCacheSize() != null
? inputDescriptor.getCacheSize()
- : LogFeederPropertiesUtil.getCacheSize();
+ : cacheConfig.getCacheSize();
boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null
? inputDescriptor.getCacheLastDedupEnabled()
- : LogFeederPropertiesUtil.isCacheLastDedupEnabled();
+ : cacheConfig.isCacheLastDedupEnabled();
long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null
? inputDescriptor.getCacheDedupInterval()
- : Long.parseLong(LogFeederPropertiesUtil.getCacheDedupInterval());
+ : Long.parseLong(cacheConfig.getCacheDedupInterval());
setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
}