You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/04/12 13:02:36 UTC
[5/5] ambari git commit: AMBARI-20578 Log Search Configuration API
(mgergely)
AMBARI-20578 Log Search Configuration API (mgergely)
Change-Id: I2415e9402fa002dedb566cfebada4cf34ef1d4a6
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0ac0ba42
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0ac0ba42
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0ac0ba42
Branch: refs/heads/trunk
Commit: 0ac0ba424e31db38704d8e7a59ac60b853094cda
Parents: 754d6c8
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Apr 12 15:02:14 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Wed Apr 12 15:02:14 2017 +0200
----------------------------------------------------------------------
.../ambari-logsearch-config-api/.gitignore | 1 +
.../ambari-logsearch-config-api/pom.xml | 57 ++
.../config/api/InputConfigMonitor.java | 41 ++
.../logsearch/config/api/LogSearchConfig.java | 90 +++
.../config/api/LogSearchConfigFactory.java | 68 +++
.../config/api/LogSearchConfigClass1.java | 55 ++
.../config/api/LogSearchConfigClass2.java | 55 ++
.../config/api/LogSearchConfigFactoryTest.java | 58 ++
.../config/api/NonLogSearchConfigClass.java | 23 +
.../src/test/resources/log4j.xml | 34 ++
.../.gitignore | 1 +
.../ambari-logsearch-config-zookeeper/pom.xml | 74 +++
.../config/zookeeper/LogSearchConfigZK.java | 213 ++++++++
.../ambari-logsearch-logfeeder/pom.xml | 66 ++-
.../org/apache/ambari/logfeeder/LogFeeder.java | 384 +------------
.../ambari/logfeeder/common/ConfigHandler.java | 428 +++++++++++++++
.../logfeeder/input/InputConfigUploader.java | 94 ++++
.../ambari/logfeeder/input/InputManager.java | 304 ++++++-----
.../ambari/logfeeder/input/InputSimulate.java | 22 +-
.../ambari/logfeeder/output/OutputManager.java | 7 -
.../ambari/logfeeder/output/OutputS3File.java | 4 +-
.../ambari/logfeeder/util/LogFeederUtil.java | 3 +
.../apache/ambari/logfeeder/util/SSLUtil.java | 5 +-
.../src/main/resources/log4j.xml | 6 +-
.../ambari/logfeeder/input/InputFileTest.java | 3 +-
.../logfeeder/input/InputManagerTest.java | 70 +--
.../logfeeder/output/OutputManagerTest.java | 11 +-
.../ambari-logsearch-server/pom.xml | 18 +-
.../org/apache/ambari/logsearch/LogSearch.java | 1 +
.../logsearch/common/PropertiesHelper.java | 3 +
.../conf/global/LogSearchConfigState.java | 35 ++
.../ambari/logsearch/configurer/Configurer.java | 23 +
.../configurer/LogSearchConfigConfigurer.java | 69 +++
.../configurer/LogfeederFilterConfigurer.java | 2 +-
.../configurer/SolrAuditAliasConfigurer.java | 2 +-
.../configurer/SolrCollectionConfigurer.java | 2 +-
.../logsearch/configurer/SolrConfigurer.java | 23 -
.../ambari/logsearch/doc/DocConstants.java | 6 +
.../logsearch/manager/ShipperConfigManager.java | 81 +++
.../logsearch/rest/ShipperConfigResource.java | 73 +++
.../ambari-logsearch-web/.gitignore | 1 +
.../test-config/logfeeder/logfeeder.properties | 12 +-
.../test-config/logsearch/logsearch.properties | 2 +
ambari-logsearch/pom.xml | 2 +
.../server/upgrade/UpgradeCatalog300.java | 35 ++
.../configuration/accumulo-logsearch-conf.xml | 124 -----
.../templates/input.config-accumulo.json.j2 | 92 ++++
.../configuration/infra-logsearch-conf.xml | 80 ---
.../templates/input.config-ambari-infra.json.j2 | 48 ++
.../0.1.0/configuration/ams-logsearch-conf.xml | 201 -------
.../input.config-ambari-metrics.json.j2 | 169 ++++++
.../configuration/atlas-logsearch-conf.xml | 80 ---
.../templates/input.config-atlas.json.j2 | 48 ++
.../configuration/falcon-logsearch-conf.xml | 80 ---
.../templates/input.config-falcon.json.j2 | 48 ++
.../configuration/flume-logsearch-conf.xml | 80 ---
.../templates/input.config-flume.json.j2 | 53 ++
.../configuration/hbase-logsearch-conf.xml | 111 ----
.../templates/input.config-hbase.json.j2 | 79 +++
.../configuration/hdfs-logsearch-conf.xml | 248 ---------
.../package/templates/input.config-hdfs.json.j2 | 216 ++++++++
.../configuration/hive-logsearch-conf.xml | 117 ----
.../package/templates/input.config-hive.json.j2 | 85 +++
.../configuration/kafka-logsearch-conf.xml | 124 -----
.../templates/input.config-kafka.json.j2 | 92 ++++
.../configuration/knox-logsearch-conf.xml | 93 ----
.../package/templates/input.config-knox.json.j2 | 60 ++
.../configuration/logfeeder-ambari-config.xml | 1 +
.../logfeeder-custom-logsearch-conf.xml | 46 --
.../LOGSEARCH/0.5.0/package/scripts/params.py | 44 +-
.../0.5.0/package/scripts/setup_logfeeder.py | 15 +-
.../templates/HadoopServiceConfig.json.j2 | 545 ++++++++++++++++---
.../templates/input.config-logsearch.json.j2 | 6 +-
.../configuration/oozie-logsearch-conf.xml | 80 ---
.../templates/input.config-oozie.json.j2 | 48 ++
.../configuration/ranger-logsearch-conf.xml | 111 ----
.../templates/input.config-ranger.json.j2 | 79 +++
.../configuration/ranger-kms-logsearch-conf.xml | 80 ---
.../templates/input.config-ranger-kms.json.j2 | 48 ++
.../configuration/spark-logsearch-conf.xml | 98 ----
.../templates/input.config-spark.json.j2 | 66 +++
.../configuration/spark2-logsearch-conf.xml | 98 ----
.../templates/input.config-spark2.json.j2 | 66 +++
.../configuration/storm-logsearch-conf.xml | 110 ----
.../templates/input.config-storm.json.j2 | 78 +++
.../mapred-logsearch-conf.xml | 80 ---
.../configuration/yarn-logsearch-conf.xml | 104 ----
.../templates/input.config-mapreduce2.json.j2 | 48 ++
.../package/templates/input.config-yarn.json.j2 | 72 +++
.../configuration/zeppelin-logsearch-conf.xml | 80 ---
.../templates/input.config-zeppelin.json.j2 | 48 ++
.../configuration/zookeeper-logsearch-conf.xml | 76 ---
.../templates/input.config-zookeeper.json.j2 | 46 ++
.../2.0.6/hooks/after-INSTALL/scripts/params.py | 14 +
.../scripts/shared_initialization.py | 17 +
.../configuration/hbase-logsearch-conf.xml | 111 ----
.../templates/input.config-hbase.json.j2 | 79 +++
.../configuration/hdfs-logsearch-conf.xml | 248 ---------
.../package/templates/input.config-hdfs.json.j2 | 216 ++++++++
.../server/upgrade/UpgradeCatalog300Test.java | 87 +++
.../stacks/2.4/LOGSEARCH/test_logfeeder.py | 28 +-
.../stacks/2.4/LOGSEARCH/test_logsearch.py | 1 +
102 files changed, 4451 insertions(+), 3418 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/.gitignore
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/.gitignore b/ambari-logsearch/ambari-logsearch-config-api/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/pom.xml b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
new file mode 100644
index 0000000..e9abed0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>ambari-logsearch</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ambari-logsearch-config-api</artifactId>
+ <packaging>jar</packaging>
+ <name>Ambari Logsearch Config Api</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
new file mode 100644
index 0000000..df26920
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+/**
+ * Monitors input configuration changes.
+ */
+public interface InputConfigMonitor {
+ /**
+ * Notification of a new input configuration.
+ *
+ * @param serviceName The name of the service for which the input configuration was created.
+ * @param inputConfig The input configuration.
+ * @throws Exception
+ */
+ void loadInputConfigs(String serviceName, String inputConfig) throws Exception;
+
+ /**
+ * Notification of the removal of an input configuration.
+ *
+ * @param serviceName The name of the service of which's input configuration was removed.
+ */
+ void removeInputs(String serviceName);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
new file mode 100644
index 0000000..0bb0b78
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Log Search Configuration, which uploads, retrieves configurations, and monitors it's changes.
+ */
+public interface LogSearchConfig extends Closeable {
+ /**
+ * Enumeration of the components of the Log Search service.
+ */
+ public enum Component {
+ SERVER, LOGFEEDER;
+ }
+
+ /**
+ * Initialization of the configuration.
+ *
+ * @param component The component which will use the configuration.
+ * @param properties The properties of that component.
+ * @throws Exception
+ */
+ void init(Component component, Map<String, String> properties) throws Exception;
+
+ /**
+ * Returns all the service names with input configurations of a cluster. Will be used only in SERVER mode.
+ *
+ * @param clusterName The name of the cluster which's services are required.
+ * @return List of the service names.
+ */
+ List<String> getServices(String clusterName);
+
+ /**
+ * Checks if input configuration exists.
+ *
+ * @param clusterName The name of the cluster where the service is looked for.
+ * @param serviceName The name of the service looked for.
+ * @return If input configuration exists for the service.
+ * @throws Exception
+ */
+ boolean inputConfigExists(String clusterName, String serviceName) throws Exception;
+
+ /**
+ * Returns the input configuration of a service in a cluster. Will be used only in SERVER mode.
+ *
+ * @param clusterName The name of the cluster where the service is looked for.
+ * @param serviceName The name of the service looked for.
+ * @return The input configuration for the service if it exists, null otherwise.
+ */
+ String getInputConfig(String clusterName, String serviceName);
+
+ /**
+ * Uploads the input configuration for a service in a cluster.
+ *
+ * @param clusterName The name of the cluster where the service is.
+ * @param serviceName The name of the service of which's input configuration is uploaded.
+ * @param inputConfig The input configuration of the service.
+ * @throws Exception
+ */
+ void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception;
+
+ /**
+ * Starts the monitoring of the input configurations, asynchronously. Will be used only in LOGFEEDER mode.
+ *
+ * @param configMonitor The input config monitor to call in case of a config change.
+ * @throws Exception
+ */
+ void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
new file mode 100644
index 0000000..6ef4b90
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Factory class for LogSearchConfig.
+ */
+public class LogSearchConfigFactory {
+ private static final Logger LOG = Logger.getLogger(LogSearchConfigFactory.class);
+
+ /**
+ * Creates a Log Search Configuration instance that implements {@link org.apache.ambari.logsearch.config.api.LogSearchConfig}.
+ *
+ * @param component The component of the Log Search Service to create the configuration for (SERVER/LOGFEEDER).
+ * @param properties The properties of the component for which the configuration is created. If the properties contain the
+ * "logsearch.config.class" entry than the class defined there would be used instead of the default class.
+ * @param defaultClass The default configuration class to use if not specified otherwise.
+ * @return The Log Search Configuration instance.
+ * @throws Exception Throws exception if the defined class does not implement LogSearchConfig, or doesn't have an empty
+ * constructor, or throws an exception in it's init method.
+ */
+ public static LogSearchConfig createLogSearchConfig(Component component, Map<String, String> properties,
+ Class<? extends LogSearchConfig> defaultClass) throws Exception {
+ try {
+ LogSearchConfig logSearchConfig = null;
+ String configClassName = properties.get("logsearch.config.class");
+ if (!StringUtils.isBlank(configClassName)) {
+ Class<?> clazz = Class.forName(configClassName);
+ if (LogSearchConfig.class.isAssignableFrom(clazz)) {
+ logSearchConfig = (LogSearchConfig) clazz.newInstance();
+ } else {
+ throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " + LogSearchConfig.class.getName());
+ }
+ } else {
+ logSearchConfig = defaultClass.newInstance();
+ }
+
+ logSearchConfig.init(component, properties);
+ return logSearchConfig;
+ } catch (Exception e) {
+ LOG.fatal("Could not initialize logsearch config.", e);
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
new file mode 100644
index 0000000..969eb30
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+
+public class LogSearchConfigClass1 implements LogSearchConfig {
+ @Override
+ public void init(Component component, Map<String, String> properties) {}
+
+ @Override
+ public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
+
+ @Override
+ public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {}
+
+ @Override
+ public List<String> getServices(String clusterName) {
+ return null;
+ }
+
+ @Override
+ public String getInputConfig(String clusterName, String serviceName) {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
new file mode 100644
index 0000000..664ecc9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+
+public class LogSearchConfigClass2 implements LogSearchConfig {
+ @Override
+ public void init(Component component, Map<String, String> properties) {}
+
+ @Override
+ public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
+
+ @Override
+ public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {}
+
+ @Override
+ public List<String> getServices(String clusterName) {
+ return null;
+ }
+
+ @Override
+ public String getInputConfig(String clusterName, String serviceName) {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
new file mode 100644
index 0000000..8e7154e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.Collections;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import junit.framework.Assert;
+
+public class LogSearchConfigFactoryTest {
+
+ @Test
+ public void testDefaultConfig() throws Exception {
+ LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
+ Collections.<String, String> emptyMap(), LogSearchConfigClass1.class);
+
+ Assert.assertSame(config.getClass(), LogSearchConfigClass1.class);
+ }
+
+ @Test
+ public void testCustomConfig() throws Exception {
+ LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
+ ImmutableMap.of("logsearch.config.class", "org.apache.ambari.logsearch.config.api.LogSearchConfigClass2"),
+ LogSearchConfigClass1.class);
+
+ Assert.assertSame(config.getClass(), LogSearchConfigClass2.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNonConfigClass() throws Exception {
+ LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
+ ImmutableMap.of("logsearch.config.class", "org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass"),
+ LogSearchConfigClass1.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java
new file mode 100644
index 0000000..9564f33
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+public class NonLogSearchConfigClass {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml b/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml
new file mode 100644
index 0000000..6d968f9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d [%t] %-5p %C{6} (%F:%L) - %m%n" />
+ <!-- <param name="ConversionPattern" value="%d [%t] %-5p %c %x - %m%n"/> -->
+ </layout>
+ </appender>
+
+ <root>
+ <priority value="warn" />
+ <appender-ref ref="console" />
+ </root>
+
+</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore b/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore
@@ -0,0 +1 @@
+/bin/
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
new file mode 100644
index 0000000..4ed8eba
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>ambari-logsearch</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ambari-logsearch-config-zookeeper</artifactId>
+ <packaging>jar</packaging>
+ <name>Ambari Logsearch Config Zookeeper</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-logsearch-config-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.4</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>2.12.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <version>2.12.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.12.0</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
new file mode 100644
index 0000000..738fde2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+
+import com.google.common.base.Splitter;
+
+public class LogSearchConfigZK implements LogSearchConfig {
+ private static final Logger LOG = Logger.getLogger(LogSearchConfigZK.class);
+
+ private static final int SESSION_TIMEOUT = 15000;
+ private static final int CONNECTION_TIMEOUT = 30000;
+ private static final String DEFAULT_ZK_ROOT = "/logsearch";
+ private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
+
+ private static final String CLUSTER_NAME_PROPERTY = "cluster.name";
+ private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string";
+ private static final String ZK_ACLS_PROPERTY = "logsearch.config.zk_acls";
+ private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root";
+
+ private Map<String, String> properties;
+ private String root;
+ private CuratorFramework client;
+ private TreeCache cache;
+
+ @Override
+ public void init(Component component, Map<String, String> properties) throws Exception {
+ this.properties = properties;
+
+ LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY));
+ client = CuratorFrameworkFactory.builder()
+ .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY))
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .connectionTimeoutMs(CONNECTION_TIMEOUT)
+ .sessionTimeoutMs(SESSION_TIMEOUT)
+ .build();
+ client.start();
+
+ root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
+
+ if (component == Component.SERVER) {
+ if (client.checkExists().forPath(root) == null) {
+ client.create().creatingParentContainersIfNeeded().forPath(root);
+ }
+ cache = new TreeCache(client, root);
+ cache.start();
+ } else {
+ while (client.checkExists().forPath(root) == null) {
+ LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
+ Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
+ }
+
+ cache = new TreeCache(client, String.format("%s/%s", root, properties.get(CLUSTER_NAME_PROPERTY)));
+ }
+ }
+
+ @Override
+ public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+ String nodePath = root + "/" + clusterName + "/input/" + serviceName;
+ return cache.getCurrentData(nodePath) != null;
+ }
+
+ @Override
+ public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
+ String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName);
+ client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
+ LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName);
+ }
+
+ private List<ACL> getAcls() {
+ String aclStr = properties.get(ZK_ACLS_PROPERTY);
+ if (StringUtils.isBlank(aclStr)) {
+ return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ }
+
+ List<ACL> acls = new ArrayList<>();
+ List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
+ for (String unparcedAcl : aclStrList) {
+ String[] parts = unparcedAcl.split(":");
+ if (parts.length == 3) {
+ acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
+ }
+ }
+ return acls;
+ }
+
+ private Integer parsePermission(String permission) {
+ int permissionCode = 0;
+ for (char each : permission.toLowerCase().toCharArray()) {
+ switch (each) {
+ case 'r':
+ permissionCode |= ZooDefs.Perms.READ;
+ break;
+ case 'w':
+ permissionCode |= ZooDefs.Perms.WRITE;
+ break;
+ case 'c':
+ permissionCode |= ZooDefs.Perms.CREATE;
+ break;
+ case 'd':
+ permissionCode |= ZooDefs.Perms.DELETE;
+ break;
+ case 'a':
+ permissionCode |= ZooDefs.Perms.ADMIN;
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported permission: " + permission);
+ }
+ }
+ return permissionCode;
+ }
+
+ @Override
+ public void monitorInputConfigChanges(final InputConfigMonitor configMonitor) throws Exception {
+ TreeCacheListener listener = new TreeCacheListener() {
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+ if (!event.getData().getPath().startsWith(String.format("%s/%s/input/", root, properties.get(CLUSTER_NAME_PROPERTY)))) {
+ return;
+ }
+
+ String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
+ String nodeData = new String(event.getData().getData());
+ switch (event.getType()) {
+ case NODE_ADDED:
+ LOG.info("Node added under input ZK node: " + nodeName);
+ addInputs(nodeName, nodeData);
+ break;
+ case NODE_UPDATED:
+ LOG.info("Node updated under input ZK node: " + nodeName);
+ removeInputs(nodeName);
+ addInputs(nodeName, nodeData);
+ break;
+ case NODE_REMOVED:
+ LOG.info("Node removed from input ZK node: " + nodeName);
+ removeInputs(nodeName);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void removeInputs(String serviceName) {
+ configMonitor.removeInputs(serviceName);
+ }
+
+ private void addInputs(String serviceName, String inputConfig) {
+ try {
+ configMonitor.loadInputConfigs(serviceName, inputConfig);
+ } catch (Exception e) {
+ LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e);
+ }
+ }
+ };
+ cache.getListenable().addListener(listener);
+ cache.start();
+ }
+
+ @Override
+ public List<String> getServices(String clusterName) {
+ String parentPath = String.format("%s/%s/input", root, clusterName);
+ Map<String, ChildData> serviceNodes = cache.getCurrentChildren(parentPath);
+ return new ArrayList<String>(serviceNodes.keySet());
+ }
+
+ @Override
+ public String getInputConfig(String clusterName, String serviceName) {
+ ChildData childData = cache.getCurrentData(String.format("%s/%s/input/%s", root, clusterName, serviceName));
+ return childData == null ? null : new String(childData.getData());
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closing ZooKeeper Connection");
+ client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 25e4306..5d6f8b6 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -44,6 +44,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-logsearch-config-zookeeper</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
@@ -88,7 +93,6 @@
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
-
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -125,9 +129,9 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-s3</artifactId>
- <version>1.11.5</version>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.11.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -135,26 +139,40 @@
<version>1.11</version>
</dependency>
<dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-iam</artifactId>
- <version>1.11.5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>${common.io.version}</version>
- </dependency>
- </dependencies>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-iam</artifactId>
+ <version>1.11.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${common.io.version}</version>
+ </dependency>
+ </dependencies>
<build>
<finalName>LogFeeder</finalName>
<pluginManagement>
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index a47c71f..074fedb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -19,60 +19,37 @@
package org.apache.ambari.logfeeder;
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.lang.reflect.Type;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK;
+import org.apache.ambari.logfeeder.input.InputConfigUploader;
import org.apache.ambari.logfeeder.input.InputManager;
-import org.apache.ambari.logfeeder.input.InputSimulate;
import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.metrics.MetricsManager;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputManager;
-import org.apache.ambari.logfeeder.util.AliasUtil;
-import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.SSLUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import com.google.gson.reflect.TypeToken;
-
public class LogFeeder {
private static final Logger LOG = Logger.getLogger(LogFeeder.class);
private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours
- private OutputManager outputManager = new OutputManager();
+ private ConfigHandler configHandler = new ConfigHandler();
+ private LogSearchConfig config;
+
private InputManager inputManager = new InputManager();
private MetricsManager metricsManager = new MetricsManager();
- public static Map<String, Object> globalConfigs = new HashMap<>();
-
- private List<Map<String, Object>> inputConfigList = new ArrayList<>();
- private List<Map<String, Object>> filterConfigList = new ArrayList<>();
- private List<Map<String, Object>> outputConfigList = new ArrayList<>();
-
private long lastCheckPointCleanedMS = 0;
private boolean isLogfeederCompleted = false;
private Thread statLoggerThread = null;
@@ -91,329 +68,23 @@ public class LogFeeder {
}
private void init() throws Throwable {
- Date startTime = new Date();
+ long startTime = System.currentTimeMillis();
- loadConfigFiles();
- addSimulatedInputs();
- mergeAllConfigs();
-
+ configHandler.init();
+ LogConfigHandler.handleConfig();
SSLUtil.ensureStorePasswords();
- outputManager.init();
- inputManager.init();
- metricsManager.init();
+ config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,
+ Maps.fromProperties(LogFeederUtil.getProperties()), LogSearchConfigZK.class);
+ InputConfigUploader.load(config);
+ config.monitorInputConfigChanges(configHandler);
- LogConfigHandler.handleConfig();
+ metricsManager.init();
LOG.debug("==============");
- Date endTime = new Date();
- LOG.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize");
- }
-
- private void loadConfigFiles() throws Exception {
- List<String> configFiles = getConfigFiles();
- for (String configFileName : configFiles) {
- LOG.info("Going to load config file:" + configFileName);
- configFileName = configFileName.replace("\\ ", "%20");
- File configFile = new File(configFileName);
- if (configFile.exists() && configFile.isFile()) {
- LOG.info("Config file exists in path." + configFile.getAbsolutePath());
- loadConfigsUsingFile(configFile);
- } else {
- LOG.info("Trying to load config file from classloader: " + configFileName);
- loadConfigsUsingClassLoader(configFileName);
- LOG.info("Loaded config file from classloader: " + configFileName);
- }
- }
- }
-
- private List<String> getConfigFiles() {
- List<String> configFiles = new ArrayList<>();
-
- String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
- LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
- if (logfeederConfigFilesProperty != null) {
- configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
- }
-
- String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
- if (StringUtils.isNotEmpty(inputConfigDir)) {
- File configDirFile = new File(inputConfigDir);
- List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
- for (File inputConfigFile : inputConfigFiles) {
- configFiles.add(inputConfigFile.getAbsolutePath());
- }
- }
-
- if (CollectionUtils.isEmpty(configFiles)) {
- String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
- configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
- }
-
- return configFiles;
- }
-
- private void loadConfigsUsingFile(File configFile) throws Exception {
- try {
- String configData = FileUtils.readFileToString(configFile);
- loadConfigs(configData);
- } catch (Exception t) {
- LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
- throw t;
- }
- }
-
- private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
- try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
- String configData = IOUtils.toString(fis);
- loadConfigs(configData);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void loadConfigs(String configData) throws Exception {
- Type type = new TypeToken<Map<String, Object>>() {}.getType();
- Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
-
- // Get the globals
- for (String key : configMap.keySet()) {
- switch (key) {
- case "global" :
- globalConfigs.putAll((Map<String, Object>) configMap.get(key));
- break;
- case "input" :
- List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
- inputConfigList.addAll(inputConfig);
- break;
- case "filter" :
- List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
- filterConfigList.addAll(filterConfig);
- break;
- case "output" :
- List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
- outputConfigList.addAll(outputConfig);
- break;
- default :
- LOG.warn("Unknown config key: " + key);
- }
- }
- }
-
- private void addSimulatedInputs() {
- int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
- if (simulatedInputNumber == 0)
- return;
-
- InputSimulate.loadTypeToFilePath(inputConfigList);
- inputConfigList.clear();
-
- for (int i = 0; i < simulatedInputNumber; i++) {
- HashMap<String, Object> mapList = new HashMap<String, Object>();
- mapList.put("source", "simulate");
- mapList.put("rowtype", "service");
- inputConfigList.add(mapList);
- }
- }
-
- private void mergeAllConfigs() {
- loadOutputs();
- loadInputs();
- loadFilters();
-
- assignOutputsToInputs();
- }
-
- private void loadOutputs() {
- for (Map<String, Object> map : outputConfigList) {
- if (map == null) {
- continue;
- }
- mergeBlocks(globalConfigs, map);
-
- String value = (String) map.get("destination");
- if (StringUtils.isEmpty(value)) {
- LOG.error("Output block doesn't have destination element");
- continue;
- }
- Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
- if (output == null) {
- LOG.error("Output object could not be found");
- continue;
- }
- output.setDestination(value);
- output.loadConfig(map);
-
- // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
- if (output.getBooleanValue("is_enabled", true)) {
- output.logConfigs(Level.INFO);
- outputManager.add(output);
- } else {
- LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
- }
- }
- }
-
- private void loadInputs() {
- for (Map<String, Object> map : inputConfigList) {
- if (map == null) {
- continue;
- }
- mergeBlocks(globalConfigs, map);
-
- String value = (String) map.get("source");
- if (StringUtils.isEmpty(value)) {
- LOG.error("Input block doesn't have source element");
- continue;
- }
- Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
- if (input == null) {
- LOG.error("Input object could not be found");
- continue;
- }
- input.setType(value);
- input.loadConfig(map);
-
- if (input.isEnabled()) {
- input.setOutputManager(outputManager);
- input.setInputManager(inputManager);
- inputManager.add(input);
- input.logConfigs(Level.INFO);
- } else {
- LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
- }
- }
- }
-
- private void loadFilters() {
- sortFilters();
-
- List<Input> toRemoveInputList = new ArrayList<Input>();
- for (Input input : inputManager.getInputList()) {
- for (Map<String, Object> map : filterConfigList) {
- if (map == null) {
- continue;
- }
- mergeBlocks(globalConfigs, map);
-
- String value = (String) map.get("filter");
- if (StringUtils.isEmpty(value)) {
- LOG.error("Filter block doesn't have filter element");
- continue;
- }
- Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
- if (filter == null) {
- LOG.error("Filter object could not be found");
- continue;
- }
- filter.loadConfig(map);
- filter.setInput(input);
-
- if (filter.isEnabled()) {
- filter.setOutputManager(outputManager);
- input.addFilter(filter);
- filter.logConfigs(Level.INFO);
- } else {
- LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
- }
- }
-
- if (input.getFirstFilter() == null) {
- toRemoveInputList.add(input);
- }
- }
-
- for (Input toRemoveInput : toRemoveInputList) {
- LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
- inputManager.removeInput(toRemoveInput);
- }
- }
-
- private void sortFilters() {
- Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
-
- @Override
- public int compare(Map<String, Object> o1, Map<String, Object> o2) {
- Object o1Sort = o1.get("sort_order");
- Object o2Sort = o2.get("sort_order");
- if (o1Sort == null || o2Sort == null) {
- return 0;
- }
-
- int o1Value = parseSort(o1, o1Sort);
- int o2Value = parseSort(o2, o2Sort);
-
- return o1Value - o2Value;
- }
-
- private int parseSort(Map<String, Object> map, Object o) {
- if (!(o instanceof Number)) {
- try {
- return (new Double(Double.parseDouble(o.toString()))).intValue();
- } catch (Throwable t) {
- LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
- + ", map=" + map.toString());
- return 0;
- }
- } else {
- return ((Number) o).intValue();
- }
- }
- });
- }
-
- private void assignOutputsToInputs() {
- Set<Output> usedOutputSet = new HashSet<Output>();
- for (Input input : inputManager.getInputList()) {
- for (Output output : outputManager.getOutputs()) {
- if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
- usedOutputSet.add(output);
- input.addOutput(output);
- }
- }
- }
-
- // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
- for (Output output : InputSimulate.getSimulateOutputs()) {
- outputManager.add(output);
- usedOutputSet.add(output);
- }
-
- outputManager.retainUsedOutputs(usedOutputSet);
- }
-
- @SuppressWarnings("unchecked")
- private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
- for (String key : fromMap.keySet()) {
- Object objValue = fromMap.get(key);
- if (objValue == null) {
- continue;
- }
- if (objValue instanceof Map) {
- Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
-
- Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
- if (localFields == null) {
- localFields = new HashMap<String, Object>();
- toMap.put(key, localFields);
- }
-
- if (globalFields != null) {
- for (String fieldKey : globalFields.keySet()) {
- if (!localFields.containsKey(fieldKey)) {
- localFields.put(fieldKey, globalFields.get(fieldKey));
- }
- }
- }
- }
- }
-
- // Let's add the rest of the top level fields if missing
- for (String key : fromMap.keySet()) {
- if (!toMap.containsKey(key)) {
- toMap.put(key, fromMap.get(key));
- }
- }
+ long endTime = System.currentTimeMillis();
+ LOG.info("Took " + (endTime - startTime) + " ms to initialize");
}
private class JVMShutdownHook extends Thread {
@@ -422,10 +93,8 @@ public class LogFeeder {
try {
LOG.info("Processing is shutting down.");
- inputManager.close();
- outputManager.close();
- inputManager.checkInAll();
-
+ configHandler.close();
+ config.close();
logStats();
LOG.info("LogSearch is exiting.");
@@ -436,7 +105,6 @@ public class LogFeeder {
}
private void monitor() throws Exception {
- inputManager.monitor();
JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
@@ -458,7 +126,7 @@ public class LogFeeder {
if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
lastCheckPointCleanedMS = System.currentTimeMillis();
- inputManager.cleanCheckPointFiles();
+ configHandler.cleanCheckPointFiles();
}
if (isLogfeederCompleted) {
@@ -474,13 +142,11 @@ public class LogFeeder {
}
private void logStats() {
- inputManager.logStats();
- outputManager.logStats();
+ configHandler.logStats();
if (metricsManager.isMetricsEnabled()) {
List<MetricData> metricsList = new ArrayList<MetricData>();
- inputManager.addMetricsContainers(metricsList);
- outputManager.addMetricsContainers(metricsList);
+ configHandler.addMetrics(metricsList);
metricsManager.useMetrics(metricsList);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
new file mode 100644
index 0000000..effe980
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.common;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputManager;
+import org.apache.ambari.logfeeder.input.InputSimulate;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.util.AliasUtil;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.reflect.TypeToken;
+
+public class ConfigHandler implements InputConfigMonitor {
+ private static final Logger LOG = Logger.getLogger(ConfigHandler.class);
+
+ private final OutputManager outputManager = new OutputManager();
+ private final InputManager inputManager = new InputManager();
+
+ public static Map<String, Object> globalConfigs = new HashMap<>();
+
+ private final List<Map<String, Object>> inputConfigList = new ArrayList<>();
+ private final List<Map<String, Object>> filterConfigList = new ArrayList<>();
+ private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
+
+ private boolean simulateMode = false;
+
+ public ConfigHandler() {}
+
+ public void init() throws Exception {
+ loadConfigFiles();
+ loadOutputs();
+ simulateIfNeeded();
+
+ inputManager.init();
+ outputManager.init();
+ }
+
+ private void loadConfigFiles() throws Exception {
+ List<String> configFiles = getConfigFiles();
+ for (String configFileName : configFiles) {
+ LOG.info("Going to load config file:" + configFileName);
+ configFileName = configFileName.replace("\\ ", "%20");
+ File configFile = new File(configFileName);
+ if (configFile.exists() && configFile.isFile()) {
+ LOG.info("Config file exists in path." + configFile.getAbsolutePath());
+ loadConfigsUsingFile(configFile);
+ } else {
+ LOG.info("Trying to load config file from classloader: " + configFileName);
+ loadConfigsUsingClassLoader(configFileName);
+ LOG.info("Loaded config file from classloader: " + configFileName);
+ }
+ }
+ }
+
+ private List<String> getConfigFiles() {
+ List<String> configFiles = new ArrayList<>();
+
+ String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
+ LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
+ if (logfeederConfigFilesProperty != null) {
+ configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
+ }
+
+ String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
+ if (StringUtils.isNotEmpty(inputConfigDir)) {
+ File configDirFile = new File(inputConfigDir);
+ List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
+ for (File inputConfigFile : inputConfigFiles) {
+ configFiles.add(inputConfigFile.getAbsolutePath());
+ }
+ }
+
+ if (CollectionUtils.isEmpty(configFiles)) {
+ String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
+ configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
+ }
+
+ return configFiles;
+ }
+
+ private void loadConfigsUsingFile(File configFile) throws Exception {
+ try {
+ String configData = FileUtils.readFileToString(configFile, Charset.defaultCharset());
+ loadConfigs(configData);
+ } catch (Exception t) {
+ LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
+ throw t;
+ }
+ }
+
+ private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
+ try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
+ String configData = IOUtils.toString(fis, Charset.defaultCharset());
+ loadConfigs(configData);
+ }
+ }
+
+ @Override
+ public void loadInputConfigs(String serviceName, String inputConfigData) throws Exception {
+ inputConfigList.clear();
+ filterConfigList.clear();
+
+ loadConfigs(inputConfigData);
+
+ if (simulateMode) {
+ InputSimulate.loadTypeToFilePath(inputConfigList);
+ } else {
+ loadInputs(serviceName);
+ loadFilters(serviceName);
+ assignOutputsToInputs(serviceName);
+
+ inputManager.startInputs(serviceName);
+ }
+ }
+
+ @Override
+ public void removeInputs(String serviceName) {
+ inputManager.removeInputsForService(serviceName);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void loadConfigs(String configData) throws Exception {
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
+ Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
+
+ // Get the globals
+ for (String key : configMap.keySet()) {
+ switch (key) {
+ case "global" :
+ globalConfigs.putAll((Map<String, Object>) configMap.get(key));
+ break;
+ case "input" :
+ List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
+ inputConfigList.addAll(inputConfig);
+ break;
+ case "filter" :
+ List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
+ filterConfigList.addAll(filterConfig);
+ break;
+ case "output" :
+ List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
+ outputConfigList.addAll(outputConfig);
+ break;
+ default :
+ LOG.warn("Unknown config key: " + key);
+ }
+ }
+ }
+
+ private void simulateIfNeeded() throws Exception {
+ int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
+ if (simulatedInputNumber == 0)
+ return;
+
+ List<Map<String, Object>> simulateInputConfigList = new ArrayList<>();
+ for (int i = 0; i < simulatedInputNumber; i++) {
+ HashMap<String, Object> mapList = new HashMap<String, Object>();
+ mapList.put("source", "simulate");
+ mapList.put("rowtype", "service");
+ simulateInputConfigList.add(mapList);
+ }
+
+ Map<String, List<Map<String, Object>>> simulateInputConfigMap = ImmutableMap.of("input", simulateInputConfigList);
+ String simulateInputConfig = LogFeederUtil.getGson().toJson(simulateInputConfigMap);
+ loadInputConfigs("Simulation", simulateInputConfig);
+
+ simulateMode = true;
+ }
+
+ private void loadOutputs() {
+ for (Map<String, Object> map : outputConfigList) {
+ if (map == null) {
+ continue;
+ }
+ mergeBlocks(globalConfigs, map);
+
+ String value = (String) map.get("destination");
+ if (StringUtils.isEmpty(value)) {
+ LOG.error("Output block doesn't have destination element");
+ continue;
+ }
+ Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
+ if (output == null) {
+ LOG.error("Output object could not be found");
+ continue;
+ }
+ output.setDestination(value);
+ output.loadConfig(map);
+
+ // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
+ if (output.getBooleanValue("is_enabled", true)) {
+ output.logConfigs(Level.INFO);
+ outputManager.add(output);
+ } else {
+ LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
+ }
+ }
+ }
+
+ private void loadInputs(String serviceName) {
+ for (Map<String, Object> map : inputConfigList) {
+ if (map == null) {
+ continue;
+ }
+ mergeBlocks(globalConfigs, map);
+
+ String value = (String) map.get("source");
+ if (StringUtils.isEmpty(value)) {
+ LOG.error("Input block doesn't have source element");
+ continue;
+ }
+ Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
+ if (input == null) {
+ LOG.error("Input object could not be found");
+ continue;
+ }
+ input.setType(value);
+ input.loadConfig(map);
+
+ if (input.isEnabled()) {
+ input.setOutputManager(outputManager);
+ input.setInputManager(inputManager);
+ inputManager.add(serviceName, input);
+ input.logConfigs(Level.INFO);
+ } else {
+ LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
+ }
+ }
+ }
+
+ private void loadFilters(String serviceName) {
+ sortFilters();
+
+ List<Input> toRemoveInputList = new ArrayList<Input>();
+ for (Input input : inputManager.getInputList(serviceName)) {
+ for (Map<String, Object> map : filterConfigList) {
+ if (map == null) {
+ continue;
+ }
+ mergeBlocks(globalConfigs, map);
+
+ String value = (String) map.get("filter");
+ if (StringUtils.isEmpty(value)) {
+ LOG.error("Filter block doesn't have filter element");
+ continue;
+ }
+ Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
+ if (filter == null) {
+ LOG.error("Filter object could not be found");
+ continue;
+ }
+ filter.loadConfig(map);
+ filter.setInput(input);
+
+ if (filter.isEnabled()) {
+ filter.setOutputManager(outputManager);
+ input.addFilter(filter);
+ filter.logConfigs(Level.INFO);
+ } else {
+ LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
+ }
+ }
+
+ if (input.getFirstFilter() == null) {
+ toRemoveInputList.add(input);
+ }
+ }
+
+ for (Input toRemoveInput : toRemoveInputList) {
+ LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
+ inputManager.removeInput(toRemoveInput);
+ }
+ }
+
+ private void sortFilters() {
+ Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
+
+ @Override
+ public int compare(Map<String, Object> o1, Map<String, Object> o2) {
+ Object o1Sort = o1.get("sort_order");
+ Object o2Sort = o2.get("sort_order");
+ if (o1Sort == null || o2Sort == null) {
+ return 0;
+ }
+
+ int o1Value = parseSort(o1, o1Sort);
+ int o2Value = parseSort(o2, o2Sort);
+
+ return o1Value - o2Value;
+ }
+
+ private int parseSort(Map<String, Object> map, Object o) {
+ if (!(o instanceof Number)) {
+ try {
+ return (new Double(Double.parseDouble(o.toString()))).intValue();
+ } catch (Throwable t) {
+ LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
+ + ", map=" + map.toString());
+ return 0;
+ }
+ } else {
+ return ((Number) o).intValue();
+ }
+ }
+ });
+ }
+
+ private void assignOutputsToInputs(String serviceName) {
+ Set<Output> usedOutputSet = new HashSet<Output>();
+ for (Input input : inputManager.getInputList(serviceName)) {
+ for (Output output : outputManager.getOutputs()) {
+ if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
+ usedOutputSet.add(output);
+ input.addOutput(output);
+ }
+ }
+ }
+
+ // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
+ for (Output output : InputSimulate.getSimulateOutputs()) {
+ outputManager.add(output);
+ usedOutputSet.add(output);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
+ for (String key : fromMap.keySet()) {
+ Object objValue = fromMap.get(key);
+ if (objValue == null) {
+ continue;
+ }
+ if (objValue instanceof Map) {
+ Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
+
+ Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
+ if (localFields == null) {
+ localFields = new HashMap<String, Object>();
+ toMap.put(key, localFields);
+ }
+
+ if (globalFields != null) {
+ for (String fieldKey : globalFields.keySet()) {
+ if (!localFields.containsKey(fieldKey)) {
+ localFields.put(fieldKey, globalFields.get(fieldKey));
+ }
+ }
+ }
+ }
+ }
+
+ // Let's add the rest of the top level fields if missing
+ for (String key : fromMap.keySet()) {
+ if (!toMap.containsKey(key)) {
+ toMap.put(key, fromMap.get(key));
+ }
+ }
+ }
+
+ public void cleanCheckPointFiles() {
+ inputManager.cleanCheckPointFiles();
+ }
+
+ public void logStats() {
+ inputManager.logStats();
+ outputManager.logStats();
+ }
+
+ public void addMetrics(List<MetricData> metricsList) {
+ inputManager.addMetricsContainers(metricsList);
+ outputManager.addMetricsContainers(metricsList);
+ }
+
+ public void waitOnAllInputs() {
+ inputManager.waitOnAllInputs();
+ }
+
+ public void close() {
+ inputManager.close();
+ outputManager.close();
+ inputManager.checkInAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
new file mode 100644
index 0000000..b70fbd1
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Logger;
+
+import com.google.common.io.Files;
+
+public class InputConfigUploader extends Thread {
+ protected static final Logger LOG = Logger.getLogger(InputConfigUploader.class);
+
+ private static final long SLEEP_BETWEEN_CHECK = 2000;
+
+ private final File configDir;
+ private final FilenameFilter inputConfigFileFilter = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("input.config-") && name.endsWith(".json");
+ }
+ };
+ private final Set<String> filesHandled = new HashSet<>();
+ private final Pattern serviceNamePattern = Pattern.compile("input.config-(.+).json");
+ private final LogSearchConfig config;
+ private final String clusterName = LogFeederUtil.getStringProperty("cluster.name");
+
+ public static void load(LogSearchConfig config) {
+ new InputConfigUploader(config).start();
+ }
+
+ private InputConfigUploader(LogSearchConfig config) {
+ super("Input Config Loader");
+ setDaemon(true);
+
+ this.configDir = new File(LogFeederUtil.getStringProperty("logfeeder.config.dir"));
+ this.config = config;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ File[] inputConfigFiles = configDir.listFiles(inputConfigFileFilter);
+ for (File inputConfigFile : inputConfigFiles) {
+ if (!filesHandled.contains(inputConfigFile.getAbsolutePath())) {
+ try {
+ Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
+ m.find();
+ String serviceName = m.group(1);
+ String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset());
+
+ if (!config.inputConfigExists(clusterName, serviceName)) {
+ config.setInputConfig(clusterName, serviceName, inputConfig);
+ }
+ filesHandled.add(inputConfigFile.getAbsolutePath());
+ } catch (Exception e) {
+ LOG.warn("Error handling file " + inputConfigFile.getAbsolutePath(), e);
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(SLEEP_BETWEEN_CHECK);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted during sleep", e);
+ }
+ }
+ }
+}