You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mg...@apache.org on 2017/04/12 13:02:36 UTC

[5/5] ambari git commit: AMBARI-20578 Log Search Configuration API (mgergely)

AMBARI-20578 Log Search Configuration API (mgergely)

Change-Id: I2415e9402fa002dedb566cfebada4cf34ef1d4a6


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0ac0ba42
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0ac0ba42
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0ac0ba42

Branch: refs/heads/trunk
Commit: 0ac0ba424e31db38704d8e7a59ac60b853094cda
Parents: 754d6c8
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Wed Apr 12 15:02:14 2017 +0200
Committer: Miklos Gergely <mg...@hortonworks.com>
Committed: Wed Apr 12 15:02:14 2017 +0200

----------------------------------------------------------------------
 .../ambari-logsearch-config-api/.gitignore      |   1 +
 .../ambari-logsearch-config-api/pom.xml         |  57 ++
 .../config/api/InputConfigMonitor.java          |  41 ++
 .../logsearch/config/api/LogSearchConfig.java   |  90 +++
 .../config/api/LogSearchConfigFactory.java      |  68 +++
 .../config/api/LogSearchConfigClass1.java       |  55 ++
 .../config/api/LogSearchConfigClass2.java       |  55 ++
 .../config/api/LogSearchConfigFactoryTest.java  |  58 ++
 .../config/api/NonLogSearchConfigClass.java     |  23 +
 .../src/test/resources/log4j.xml                |  34 ++
 .../.gitignore                                  |   1 +
 .../ambari-logsearch-config-zookeeper/pom.xml   |  74 +++
 .../config/zookeeper/LogSearchConfigZK.java     | 213 ++++++++
 .../ambari-logsearch-logfeeder/pom.xml          |  66 ++-
 .../org/apache/ambari/logfeeder/LogFeeder.java  | 384 +------------
 .../ambari/logfeeder/common/ConfigHandler.java  | 428 +++++++++++++++
 .../logfeeder/input/InputConfigUploader.java    |  94 ++++
 .../ambari/logfeeder/input/InputManager.java    | 304 ++++++-----
 .../ambari/logfeeder/input/InputSimulate.java   |  22 +-
 .../ambari/logfeeder/output/OutputManager.java  |   7 -
 .../ambari/logfeeder/output/OutputS3File.java   |   4 +-
 .../ambari/logfeeder/util/LogFeederUtil.java    |   3 +
 .../apache/ambari/logfeeder/util/SSLUtil.java   |   5 +-
 .../src/main/resources/log4j.xml                |   6 +-
 .../ambari/logfeeder/input/InputFileTest.java   |   3 +-
 .../logfeeder/input/InputManagerTest.java       |  70 +--
 .../logfeeder/output/OutputManagerTest.java     |  11 +-
 .../ambari-logsearch-server/pom.xml             |  18 +-
 .../org/apache/ambari/logsearch/LogSearch.java  |   1 +
 .../logsearch/common/PropertiesHelper.java      |   3 +
 .../conf/global/LogSearchConfigState.java       |  35 ++
 .../ambari/logsearch/configurer/Configurer.java |  23 +
 .../configurer/LogSearchConfigConfigurer.java   |  69 +++
 .../configurer/LogfeederFilterConfigurer.java   |   2 +-
 .../configurer/SolrAuditAliasConfigurer.java    |   2 +-
 .../configurer/SolrCollectionConfigurer.java    |   2 +-
 .../logsearch/configurer/SolrConfigurer.java    |  23 -
 .../ambari/logsearch/doc/DocConstants.java      |   6 +
 .../logsearch/manager/ShipperConfigManager.java |  81 +++
 .../logsearch/rest/ShipperConfigResource.java   |  73 +++
 .../ambari-logsearch-web/.gitignore             |   1 +
 .../test-config/logfeeder/logfeeder.properties  |  12 +-
 .../test-config/logsearch/logsearch.properties  |   2 +
 ambari-logsearch/pom.xml                        |   2 +
 .../server/upgrade/UpgradeCatalog300.java       |  35 ++
 .../configuration/accumulo-logsearch-conf.xml   | 124 -----
 .../templates/input.config-accumulo.json.j2     |  92 ++++
 .../configuration/infra-logsearch-conf.xml      |  80 ---
 .../templates/input.config-ambari-infra.json.j2 |  48 ++
 .../0.1.0/configuration/ams-logsearch-conf.xml  | 201 -------
 .../input.config-ambari-metrics.json.j2         | 169 ++++++
 .../configuration/atlas-logsearch-conf.xml      |  80 ---
 .../templates/input.config-atlas.json.j2        |  48 ++
 .../configuration/falcon-logsearch-conf.xml     |  80 ---
 .../templates/input.config-falcon.json.j2       |  48 ++
 .../configuration/flume-logsearch-conf.xml      |  80 ---
 .../templates/input.config-flume.json.j2        |  53 ++
 .../configuration/hbase-logsearch-conf.xml      | 111 ----
 .../templates/input.config-hbase.json.j2        |  79 +++
 .../configuration/hdfs-logsearch-conf.xml       | 248 ---------
 .../package/templates/input.config-hdfs.json.j2 | 216 ++++++++
 .../configuration/hive-logsearch-conf.xml       | 117 ----
 .../package/templates/input.config-hive.json.j2 |  85 +++
 .../configuration/kafka-logsearch-conf.xml      | 124 -----
 .../templates/input.config-kafka.json.j2        |  92 ++++
 .../configuration/knox-logsearch-conf.xml       |  93 ----
 .../package/templates/input.config-knox.json.j2 |  60 ++
 .../configuration/logfeeder-ambari-config.xml   |   1 +
 .../logfeeder-custom-logsearch-conf.xml         |  46 --
 .../LOGSEARCH/0.5.0/package/scripts/params.py   |  44 +-
 .../0.5.0/package/scripts/setup_logfeeder.py    |  15 +-
 .../templates/HadoopServiceConfig.json.j2       | 545 ++++++++++++++++---
 .../templates/input.config-logsearch.json.j2    |   6 +-
 .../configuration/oozie-logsearch-conf.xml      |  80 ---
 .../templates/input.config-oozie.json.j2        |  48 ++
 .../configuration/ranger-logsearch-conf.xml     | 111 ----
 .../templates/input.config-ranger.json.j2       |  79 +++
 .../configuration/ranger-kms-logsearch-conf.xml |  80 ---
 .../templates/input.config-ranger-kms.json.j2   |  48 ++
 .../configuration/spark-logsearch-conf.xml      |  98 ----
 .../templates/input.config-spark.json.j2        |  66 +++
 .../configuration/spark2-logsearch-conf.xml     |  98 ----
 .../templates/input.config-spark2.json.j2       |  66 +++
 .../configuration/storm-logsearch-conf.xml      | 110 ----
 .../templates/input.config-storm.json.j2        |  78 +++
 .../mapred-logsearch-conf.xml                   |  80 ---
 .../configuration/yarn-logsearch-conf.xml       | 104 ----
 .../templates/input.config-mapreduce2.json.j2   |  48 ++
 .../package/templates/input.config-yarn.json.j2 |  72 +++
 .../configuration/zeppelin-logsearch-conf.xml   |  80 ---
 .../templates/input.config-zeppelin.json.j2     |  48 ++
 .../configuration/zookeeper-logsearch-conf.xml  |  76 ---
 .../templates/input.config-zookeeper.json.j2    |  46 ++
 .../2.0.6/hooks/after-INSTALL/scripts/params.py |  14 +
 .../scripts/shared_initialization.py            |  17 +
 .../configuration/hbase-logsearch-conf.xml      | 111 ----
 .../templates/input.config-hbase.json.j2        |  79 +++
 .../configuration/hdfs-logsearch-conf.xml       | 248 ---------
 .../package/templates/input.config-hdfs.json.j2 | 216 ++++++++
 .../server/upgrade/UpgradeCatalog300Test.java   |  87 +++
 .../stacks/2.4/LOGSEARCH/test_logfeeder.py      |  28 +-
 .../stacks/2.4/LOGSEARCH/test_logsearch.py      |   1 +
 102 files changed, 4451 insertions(+), 3418 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/.gitignore
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/.gitignore b/ambari-logsearch/ambari-logsearch-config-api/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/pom.xml b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
new file mode 100644
index 0000000..e9abed0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <parent>
+    <artifactId>ambari-logsearch</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>2.0.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>ambari-logsearch-config-api</artifactId>
+  <packaging>jar</packaging>
+  <name>Ambari Logsearch Config Api</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+  
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
new file mode 100644
index 0000000..df26920
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/InputConfigMonitor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+/**
+ * Monitors input configuration changes. 
+ */
+public interface InputConfigMonitor {
+  /**
+   * Notification of a new input configuration.
+   * 
+   * @param serviceName The name of the service for which the input configuration was created.
+   * @param inputConfig The input configuration.
+   * @throws Exception
+   */
+  void loadInputConfigs(String serviceName, String inputConfig) throws Exception;
+  
+  /**
+   * Notification of the removal of an input configuration.
+   * 
+   * @param serviceName The name of the service of which's input configuration was removed.
+   */
+  void removeInputs(String serviceName);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
new file mode 100644
index 0000000..0bb0b78
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Log Search Configuration, which uploads, retrieves configurations, and monitors it's changes.
+ */
+public interface LogSearchConfig extends Closeable {
+  /**
+   * Enumeration of the components of the Log Search service.
+   */
+  public enum Component {
+    SERVER, LOGFEEDER;
+  }
+  
+  /**
+   * Initialization of the configuration.
+   * 
+   * @param component The component which will use the configuration.
+   * @param properties The properties of that component.
+   * @throws Exception
+   */
+  void init(Component component, Map<String, String> properties) throws Exception;
+  
+  /**
+   * Returns all the service names with input configurations of a cluster. Will be used only in SERVER mode.
+   * 
+   * @param clusterName The name of the cluster which's services are required.
+   * @return List of the service names.
+   */
+  List<String> getServices(String clusterName);
+  
+  /**
+   * Checks if input configuration exists.
+   * 
+   * @param clusterName The name of the cluster where the service is looked for.
+   * @param serviceName The name of the service looked for.
+   * @return If input configuration exists for the service.
+   * @throws Exception
+   */
+  boolean inputConfigExists(String clusterName, String serviceName) throws Exception;
+  
+  /**
+   * Returns the input configuration of a service in a cluster. Will be used only in SERVER mode.
+   * 
+   * @param clusterName The name of the cluster where the service is looked for.
+   * @param serviceName The name of the service looked for.
+   * @return The input configuration for the service if it exists, null otherwise.
+   */
+  String getInputConfig(String clusterName, String serviceName);
+  
+  /**
+   * Uploads the input configuration for a service in a cluster.
+   * 
+   * @param clusterName The name of the cluster where the service is.
+   * @param serviceName The name of the service of which's input configuration is uploaded.
+   * @param inputConfig The input configuration of the service.
+   * @throws Exception
+   */
+  void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception;
+  
+  /**
+   * Starts the monitoring of the input configurations, asynchronously. Will be used only in LOGFEEDER mode.
+   * 
+   * @param configMonitor The input config monitor to call in case of a config change.
+   * @throws Exception
+   */
+  void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
new file mode 100644
index 0000000..6ef4b90
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Factory class for LogSearchConfig.
+ */
+public class LogSearchConfigFactory {
+  private static final Logger LOG = Logger.getLogger(LogSearchConfigFactory.class);
+
+  /**
+   * Creates a Log Search Configuration instance that implements {@link org.apache.ambari.logsearch.config.api.LogSearchConfig}.
+   * 
+   * @param component The component of the Log Search Service to create the configuration for (SERVER/LOGFEEDER).
+   * @param properties The properties of the component for which the configuration is created. If the properties contain the
+   *                  "logsearch.config.class" entry than the class defined there would be used instead of the default class.
+   * @param defaultClass The default configuration class to use if not specified otherwise.
+   * @return The Log Search Configuration instance.
+   * @throws Exception Throws exception if the defined class does not implement LogSearchConfig, or doesn't have an empty
+   *                   constructor, or throws an exception in it's init method.
+   */
+  public static LogSearchConfig createLogSearchConfig(Component component, Map<String, String> properties,
+      Class<? extends LogSearchConfig> defaultClass) throws Exception {
+    try {
+      LogSearchConfig logSearchConfig = null;
+      String configClassName = properties.get("logsearch.config.class");
+      if (!StringUtils.isBlank(configClassName)) {
+        Class<?> clazz = Class.forName(configClassName);
+        if (LogSearchConfig.class.isAssignableFrom(clazz)) {
+          logSearchConfig = (LogSearchConfig) clazz.newInstance();
+        } else {
+          throw new IllegalArgumentException("Class " + configClassName + " does not implement the interface " + LogSearchConfig.class.getName());
+        }
+      } else {
+        logSearchConfig = defaultClass.newInstance();
+      }
+      
+      logSearchConfig.init(component, properties);
+      return logSearchConfig;
+    } catch (Exception e) {
+      LOG.fatal("Could not initialize logsearch config.", e);
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
new file mode 100644
index 0000000..969eb30
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+
+public class LogSearchConfigClass1 implements LogSearchConfig {
+  @Override
+  public void init(Component component, Map<String, String> properties) {}
+
+  @Override
+  public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+    return false;
+  }
+
+  @Override
+  public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
+
+  @Override
+  public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {}
+
+  @Override
+  public List<String> getServices(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public String getInputConfig(String clusterName, String serviceName) {
+    return null;
+  }
+  
+  @Override
+  public void close() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
new file mode 100644
index 0000000..664ecc9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+
+public class LogSearchConfigClass2 implements LogSearchConfig {
+  @Override
+  public void init(Component component, Map<String, String> properties) {}
+
+  @Override
+  public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+    return false;
+  }
+
+  @Override
+  public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {}
+
+  @Override
+  public void monitorInputConfigChanges(InputConfigMonitor configMonitor) throws Exception {}
+
+  @Override
+  public List<String> getServices(String clusterName) {
+    return null;
+  }
+
+  @Override
+  public String getInputConfig(String clusterName, String serviceName) {
+    return null;
+  }
+  
+  @Override
+  public void close() {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
new file mode 100644
index 0000000..8e7154e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigFactoryTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import java.util.Collections;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import junit.framework.Assert;
+
+public class LogSearchConfigFactoryTest {
+
+  @Test
+  public void testDefaultConfig() throws Exception {
+    LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
+        Collections.<String, String> emptyMap(), LogSearchConfigClass1.class);
+    
+    Assert.assertSame(config.getClass(), LogSearchConfigClass1.class);
+  }
+
+  @Test
+  public void testCustomConfig() throws Exception {
+    LogSearchConfig config = LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
+        ImmutableMap.of("logsearch.config.class", "org.apache.ambari.logsearch.config.api.LogSearchConfigClass2"),
+        LogSearchConfigClass1.class);
+    
+    Assert.assertSame(config.getClass(), LogSearchConfigClass2.class);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testNonConfigClass() throws Exception {
+    LogSearchConfigFactory.createLogSearchConfig(Component.SERVER,
+        ImmutableMap.of("logsearch.config.class", "org.apache.ambari.logsearch.config.api.NonLogSearchConfigClass"),
+        LogSearchConfigClass1.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java
new file mode 100644
index 0000000..9564f33
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/NonLogSearchConfigClass.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+public class NonLogSearchConfigClass {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml b/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml
new file mode 100644
index 0000000..6d968f9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/resources/log4j.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out" />
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d [%t] %-5p %C{6} (%F:%L) - %m%n" />
+      <!-- <param name="ConversionPattern" value="%d [%t] %-5p %c %x - %m%n"/> -->
+    </layout>
+  </appender>
+
+  <root>
+    <priority value="warn" />
+    <appender-ref ref="console" />
+  </root>
+
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore b/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
new file mode 100644
index 0000000..4ed8eba
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <parent>
+    <artifactId>ambari-logsearch</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>2.0.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>ambari-logsearch-config-zookeeper</artifactId>
+  <packaging>jar</packaging>
+  <name>Ambari Logsearch Config Zookeeper</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-logsearch-config-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.4</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <version>3.2.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.9</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>2.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+      <version>2.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>2.12.0</version>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
new file mode 100644
index 0000000..738fde2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+
+import com.google.common.base.Splitter;
+
+public class LogSearchConfigZK implements LogSearchConfig {
+  private static final Logger LOG = Logger.getLogger(LogSearchConfigZK.class);
+
+  private static final int SESSION_TIMEOUT = 15000;
+  private static final int CONNECTION_TIMEOUT = 30000;
+  private static final String DEFAULT_ZK_ROOT = "/logsearch";
+  private static final long WAIT_FOR_ROOT_SLEEP_SECONDS = 10;
+
+  private static final String CLUSTER_NAME_PROPERTY = "cluster.name";
+  private static final String ZK_CONNECT_STRING_PROPERTY = "logsearch.config.zk_connect_string";
+  private static final String ZK_ACLS_PROPERTY = "logsearch.config.zk_acls";
+  private static final String ZK_ROOT_NODE_PROPERTY = "logsearch.config.zk_root";
+
+  private Map<String, String> properties;
+  private String root;
+  private CuratorFramework client;
+  private TreeCache cache;
+
+  @Override
+  public void init(Component component, Map<String, String> properties) throws Exception {
+    this.properties = properties;
+    
+    LOG.info("Connecting to ZooKeeper at " + properties.get(ZK_CONNECT_STRING_PROPERTY));
+    client = CuratorFrameworkFactory.builder()
+        .connectString(properties.get(ZK_CONNECT_STRING_PROPERTY))
+        .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+        .connectionTimeoutMs(CONNECTION_TIMEOUT)
+        .sessionTimeoutMs(SESSION_TIMEOUT)
+        .build();
+    client.start();
+
+    root = MapUtils.getString(properties, ZK_ROOT_NODE_PROPERTY, DEFAULT_ZK_ROOT);
+
+    if (component == Component.SERVER) {
+      if (client.checkExists().forPath(root) == null) {
+        client.create().creatingParentContainersIfNeeded().forPath(root);
+      }
+      cache = new TreeCache(client, root);
+      cache.start();
+    } else {
+      while (client.checkExists().forPath(root) == null) {
+        LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
+        Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
+      }
+
+      cache = new TreeCache(client, String.format("%s/%s", root, properties.get(CLUSTER_NAME_PROPERTY)));
+    }
+  }
+
+  @Override
+  public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+    String nodePath = root + "/" + clusterName + "/input/" + serviceName;
+    return cache.getCurrentData(nodePath) != null;
+  }
+
+  @Override
+  public void setInputConfig(String clusterName, String serviceName, String inputConfig) throws Exception {
+    String nodePath = String.format("%s/%s/input/%s", root, clusterName, serviceName);
+    client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, inputConfig.getBytes());
+    LOG.info("Set input config for the service " + serviceName + " for cluster " + clusterName);
+  }
+
+  private List<ACL> getAcls() {
+    String aclStr = properties.get(ZK_ACLS_PROPERTY);
+    if (StringUtils.isBlank(aclStr)) {
+      return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    }
+
+    List<ACL> acls = new ArrayList<>();
+    List<String> aclStrList = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(aclStr);
+    for (String unparcedAcl : aclStrList) {
+      String[] parts = unparcedAcl.split(":");
+      if (parts.length == 3) {
+        acls.add(new ACL(parsePermission(parts[2]), new Id(parts[0], parts[1])));
+      }
+    }
+    return acls;
+  }
+
+  private Integer parsePermission(String permission) {
+    int permissionCode = 0;
+    for (char each : permission.toLowerCase().toCharArray()) {
+      switch (each) {
+        case 'r':
+          permissionCode |= ZooDefs.Perms.READ;
+          break;
+        case 'w':
+          permissionCode |= ZooDefs.Perms.WRITE;
+          break;
+        case 'c':
+          permissionCode |= ZooDefs.Perms.CREATE;
+          break;
+        case 'd':
+          permissionCode |= ZooDefs.Perms.DELETE;
+          break;
+        case 'a':
+          permissionCode |= ZooDefs.Perms.ADMIN;
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported permission: " + permission);
+      }
+    }
+    return permissionCode;
+  }
+
+  @Override
+  public void monitorInputConfigChanges(final InputConfigMonitor configMonitor) throws Exception {
+    TreeCacheListener listener = new TreeCacheListener() {
+      public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+        if (!event.getData().getPath().startsWith(String.format("%s/%s/input/", root, properties.get(CLUSTER_NAME_PROPERTY)))) {
+          return;
+        }
+        
+        String nodeName = ZKPaths.getNodeFromPath(event.getData().getPath());
+        String nodeData = new String(event.getData().getData());
+        switch (event.getType()) {
+          case NODE_ADDED:
+            LOG.info("Node added under input ZK node: " + nodeName);
+            addInputs(nodeName, nodeData);
+            break;
+          case NODE_UPDATED:
+            LOG.info("Node updated under input ZK node: " + nodeName);
+            removeInputs(nodeName);
+            addInputs(nodeName, nodeData);
+            break;
+          case NODE_REMOVED:
+            LOG.info("Node removed from input ZK node: " + nodeName);
+            removeInputs(nodeName);
+            break;
+          default:
+            break;
+        }
+      }
+
+      private void removeInputs(String serviceName) {
+        configMonitor.removeInputs(serviceName);
+      }
+
+      private void addInputs(String serviceName, String inputConfig) {
+        try {
+          configMonitor.loadInputConfigs(serviceName, inputConfig);
+        } catch (Exception e) {
+          LOG.error("Could not load input configuration for service " + serviceName + ":\n" + inputConfig, e);
+        }
+      }
+    };
+    cache.getListenable().addListener(listener);
+    cache.start();
+  }
+
+  @Override
+  public List<String> getServices(String clusterName) {
+    String parentPath = String.format("%s/%s/input", root, clusterName);
+    Map<String, ChildData> serviceNodes = cache.getCurrentChildren(parentPath);
+    return new ArrayList<String>(serviceNodes.keySet());
+  }
+
+  @Override
+  public String getInputConfig(String clusterName, String serviceName) {
+    ChildData childData = cache.getCurrentData(String.format("%s/%s/input/%s", root, clusterName, serviceName));
+    return childData == null ? null : new String(childData.getData());
+  }
+
+  @Override
+  public void close() {
+    LOG.info("Closing ZooKeeper Connection");
+    client.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 25e4306..5d6f8b6 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -44,6 +44,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-logsearch-config-zookeeper</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>commons-codec</groupId>
       <artifactId>commons-codec</artifactId>
     </dependency>
@@ -88,7 +93,6 @@
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>
     </dependency>
-
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
@@ -125,9 +129,9 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-    <groupId>com.amazonaws</groupId>
-    <artifactId>aws-java-sdk-s3</artifactId>
-    <version>1.11.5</version>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-s3</artifactId>
+      <version>1.11.5</version>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
@@ -135,26 +139,40 @@
       <version>1.11</version>
     </dependency>
     <dependency>
-    <groupId>com.amazonaws</groupId>
-    <artifactId>aws-java-sdk-iam</artifactId>
-    <version>1.11.5</version>
-  </dependency>
-   <dependency>
-    <groupId>org.apache.hadoop</groupId>
-    <artifactId>hadoop-common</artifactId>
-    <version>${hadoop.version}</version>
-  </dependency>
-  <dependency>
-    <groupId>org.apache.hadoop</groupId>
-    <artifactId>hadoop-hdfs</artifactId>
-    <version>${hadoop.version}</version>
-  </dependency>
-  <dependency>
-    <groupId>commons-io</groupId>
-    <artifactId>commons-io</artifactId>
-    <version>${common.io.version}</version>
-  </dependency>
- </dependencies>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-iam</artifactId>
+      <version>1.11.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-framework</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-recipes</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${common.io.version}</version>
+    </dependency>
+  </dependencies>
   <build>
     <finalName>LogFeeder</finalName>
     <pluginManagement>

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index a47c71f..074fedb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -19,60 +19,37 @@
 
 package org.apache.ambari.logfeeder;
 
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.lang.reflect.Type;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig.Component;
+import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigZK;
+import org.apache.ambari.logfeeder.input.InputConfigUploader;
 import org.apache.ambari.logfeeder.input.InputManager;
-import org.apache.ambari.logfeeder.input.InputSimulate;
 import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputManager;
-import org.apache.ambari.logfeeder.util.AliasUtil;
-import org.apache.ambari.logfeeder.util.FileUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.SSLUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
 import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import com.google.gson.reflect.TypeToken;
-
 public class LogFeeder {
   private static final Logger LOG = Logger.getLogger(LogFeeder.class);
 
   private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
   private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours
 
-  private OutputManager outputManager = new OutputManager();
+  private ConfigHandler configHandler = new ConfigHandler();
+  private LogSearchConfig config;
+  
   private InputManager inputManager = new InputManager();
   private MetricsManager metricsManager = new MetricsManager();
 
-  public static Map<String, Object> globalConfigs = new HashMap<>();
-
-  private List<Map<String, Object>> inputConfigList = new ArrayList<>();
-  private List<Map<String, Object>> filterConfigList = new ArrayList<>();
-  private List<Map<String, Object>> outputConfigList = new ArrayList<>();
-  
   private long lastCheckPointCleanedMS = 0;
   private boolean isLogfeederCompleted = false;
   private Thread statLoggerThread = null;
@@ -91,329 +68,23 @@ public class LogFeeder {
   }
 
   private void init() throws Throwable {
-    Date startTime = new Date();
+    long startTime = System.currentTimeMillis();
 
-    loadConfigFiles();
-    addSimulatedInputs();
-    mergeAllConfigs();
-    
+    configHandler.init();
+    LogConfigHandler.handleConfig();
     SSLUtil.ensureStorePasswords();
     
-    outputManager.init();
-    inputManager.init();
-    metricsManager.init();
+    config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER,
+        Maps.fromProperties(LogFeederUtil.getProperties()), LogSearchConfigZK.class);
+    InputConfigUploader.load(config);
+    config.monitorInputConfigChanges(configHandler);
     
-    LogConfigHandler.handleConfig();
+    metricsManager.init();
     
     LOG.debug("==============");
     
-    Date endTime = new Date();
-    LOG.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize");
-  }
-
-  private void loadConfigFiles() throws Exception {
-    List<String> configFiles = getConfigFiles();
-    for (String configFileName : configFiles) {
-      LOG.info("Going to load config file:" + configFileName);
-      configFileName = configFileName.replace("\\ ", "%20");
-      File configFile = new File(configFileName);
-      if (configFile.exists() && configFile.isFile()) {
-        LOG.info("Config file exists in path." + configFile.getAbsolutePath());
-        loadConfigsUsingFile(configFile);
-      } else {
-        LOG.info("Trying to load config file from classloader: " + configFileName);
-        loadConfigsUsingClassLoader(configFileName);
-        LOG.info("Loaded config file from classloader: " + configFileName);
-      }
-    }
-  }
-
-  private List<String> getConfigFiles() {
-    List<String> configFiles = new ArrayList<>();
-    
-    String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
-    LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
-    if (logfeederConfigFilesProperty != null) {
-      configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
-    }
-
-    String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
-    if (StringUtils.isNotEmpty(inputConfigDir)) {
-      File configDirFile = new File(inputConfigDir);
-      List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
-      for (File inputConfigFile : inputConfigFiles) {
-        configFiles.add(inputConfigFile.getAbsolutePath());
-      }
-    }
-    
-    if (CollectionUtils.isEmpty(configFiles)) {
-      String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
-      configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
-    }
-    
-    return configFiles;
-  }
-
-  private void loadConfigsUsingFile(File configFile) throws Exception {
-    try {
-      String configData = FileUtils.readFileToString(configFile);
-      loadConfigs(configData);
-    } catch (Exception t) {
-      LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
-      throw t;
-    }
-  }
-
-  private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
-    try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
-      String configData = IOUtils.toString(fis);
-      loadConfigs(configData);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private void loadConfigs(String configData) throws Exception {
-    Type type = new TypeToken<Map<String, Object>>() {}.getType();
-    Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
-
-    // Get the globals
-    for (String key : configMap.keySet()) {
-      switch (key) {
-        case "global" :
-          globalConfigs.putAll((Map<String, Object>) configMap.get(key));
-          break;
-        case "input" :
-          List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
-          inputConfigList.addAll(inputConfig);
-          break;
-        case "filter" :
-          List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
-          filterConfigList.addAll(filterConfig);
-          break;
-        case "output" :
-          List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
-          outputConfigList.addAll(outputConfig);
-          break;
-        default :
-          LOG.warn("Unknown config key: " + key);
-      }
-    }
-  }
-  
-  private void addSimulatedInputs() {
-    int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
-    if (simulatedInputNumber == 0)
-      return;
-    
-    InputSimulate.loadTypeToFilePath(inputConfigList);
-    inputConfigList.clear();
-    
-    for (int i = 0; i < simulatedInputNumber; i++) {
-      HashMap<String, Object> mapList = new HashMap<String, Object>();
-      mapList.put("source", "simulate");
-      mapList.put("rowtype", "service");
-      inputConfigList.add(mapList);
-    }
-  }
-
-  private void mergeAllConfigs() {
-    loadOutputs();
-    loadInputs();
-    loadFilters();
-    
-    assignOutputsToInputs();
-  }
-
-  private void loadOutputs() {
-    for (Map<String, Object> map : outputConfigList) {
-      if (map == null) {
-        continue;
-      }
-      mergeBlocks(globalConfigs, map);
-
-      String value = (String) map.get("destination");
-      if (StringUtils.isEmpty(value)) {
-        LOG.error("Output block doesn't have destination element");
-        continue;
-      }
-      Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
-      if (output == null) {
-        LOG.error("Output object could not be found");
-        continue;
-      }
-      output.setDestination(value);
-      output.loadConfig(map);
-
-      // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
-      if (output.getBooleanValue("is_enabled", true)) {
-        output.logConfigs(Level.INFO);
-        outputManager.add(output);
-      } else {
-        LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
-      }
-    }
-  }
-
-  private void loadInputs() {
-    for (Map<String, Object> map : inputConfigList) {
-      if (map == null) {
-        continue;
-      }
-      mergeBlocks(globalConfigs, map);
-
-      String value = (String) map.get("source");
-      if (StringUtils.isEmpty(value)) {
-        LOG.error("Input block doesn't have source element");
-        continue;
-      }
-      Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
-      if (input == null) {
-        LOG.error("Input object could not be found");
-        continue;
-      }
-      input.setType(value);
-      input.loadConfig(map);
-
-      if (input.isEnabled()) {
-        input.setOutputManager(outputManager);
-        input.setInputManager(inputManager);
-        inputManager.add(input);
-        input.logConfigs(Level.INFO);
-      } else {
-        LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
-      }
-    }
-  }
-
-  private void loadFilters() {
-    sortFilters();
-
-    List<Input> toRemoveInputList = new ArrayList<Input>();
-    for (Input input : inputManager.getInputList()) {
-      for (Map<String, Object> map : filterConfigList) {
-        if (map == null) {
-          continue;
-        }
-        mergeBlocks(globalConfigs, map);
-
-        String value = (String) map.get("filter");
-        if (StringUtils.isEmpty(value)) {
-          LOG.error("Filter block doesn't have filter element");
-          continue;
-        }
-        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
-        if (filter == null) {
-          LOG.error("Filter object could not be found");
-          continue;
-        }
-        filter.loadConfig(map);
-        filter.setInput(input);
-
-        if (filter.isEnabled()) {
-          filter.setOutputManager(outputManager);
-          input.addFilter(filter);
-          filter.logConfigs(Level.INFO);
-        } else {
-          LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
-        }
-      }
-      
-      if (input.getFirstFilter() == null) {
-        toRemoveInputList.add(input);
-      }
-    }
-
-    for (Input toRemoveInput : toRemoveInputList) {
-      LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
-      inputManager.removeInput(toRemoveInput);
-    }
-  }
-
-  private void sortFilters() {
-    Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
-
-      @Override
-      public int compare(Map<String, Object> o1, Map<String, Object> o2) {
-        Object o1Sort = o1.get("sort_order");
-        Object o2Sort = o2.get("sort_order");
-        if (o1Sort == null || o2Sort == null) {
-          return 0;
-        }
-        
-        int o1Value = parseSort(o1, o1Sort);
-        int o2Value = parseSort(o2, o2Sort);
-        
-        return o1Value - o2Value;
-      }
-
-      private int parseSort(Map<String, Object> map, Object o) {
-        if (!(o instanceof Number)) {
-          try {
-            return (new Double(Double.parseDouble(o.toString()))).intValue();
-          } catch (Throwable t) {
-            LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
-              + ", map=" + map.toString());
-            return 0;
-          }
-        } else {
-          return ((Number) o).intValue();
-        }
-      }
-    });
-  }
-
-  private void assignOutputsToInputs() {
-    Set<Output> usedOutputSet = new HashSet<Output>();
-    for (Input input : inputManager.getInputList()) {
-      for (Output output : outputManager.getOutputs()) {
-        if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
-          usedOutputSet.add(output);
-          input.addOutput(output);
-        }
-      }
-    }
-    
-    // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
-    for (Output output : InputSimulate.getSimulateOutputs()) {
-      outputManager.add(output);
-      usedOutputSet.add(output);
-    }
-    
-    outputManager.retainUsedOutputs(usedOutputSet);
-  }
-
-  @SuppressWarnings("unchecked")
-  private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
-    for (String key : fromMap.keySet()) {
-      Object objValue = fromMap.get(key);
-      if (objValue == null) {
-        continue;
-      }
-      if (objValue instanceof Map) {
-        Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
-
-        Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
-        if (localFields == null) {
-          localFields = new HashMap<String, Object>();
-          toMap.put(key, localFields);
-        }
-
-        if (globalFields != null) {
-          for (String fieldKey : globalFields.keySet()) {
-            if (!localFields.containsKey(fieldKey)) {
-              localFields.put(fieldKey, globalFields.get(fieldKey));
-            }
-          }
-        }
-      }
-    }
-
-    // Let's add the rest of the top level fields if missing
-    for (String key : fromMap.keySet()) {
-      if (!toMap.containsKey(key)) {
-        toMap.put(key, fromMap.get(key));
-      }
-    }
+    long endTime = System.currentTimeMillis();
+    LOG.info("Took " + (endTime - startTime) + " ms to initialize");
   }
 
   private class JVMShutdownHook extends Thread {
@@ -422,10 +93,8 @@ public class LogFeeder {
       try {
         LOG.info("Processing is shutting down.");
 
-        inputManager.close();
-        outputManager.close();
-        inputManager.checkInAll();
-
+        configHandler.close();
+        config.close();
         logStats();
 
         LOG.info("LogSearch is exiting.");
@@ -436,7 +105,6 @@ public class LogFeeder {
   }
 
   private void monitor() throws Exception {
-    inputManager.monitor();
     JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
     ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
     
@@ -458,7 +126,7 @@ public class LogFeeder {
 
           if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
             lastCheckPointCleanedMS = System.currentTimeMillis();
-            inputManager.cleanCheckPointFiles();
+            configHandler.cleanCheckPointFiles();
           }
 
           if (isLogfeederCompleted) {
@@ -474,13 +142,11 @@ public class LogFeeder {
   }
 
   private void logStats() {
-    inputManager.logStats();
-    outputManager.logStats();
+    configHandler.logStats();
 
     if (metricsManager.isMetricsEnabled()) {
       List<MetricData> metricsList = new ArrayList<MetricData>();
-      inputManager.addMetricsContainers(metricsList);
-      outputManager.addMetricsContainers(metricsList);
+      configHandler.addMetrics(metricsList);
       metricsManager.useMetrics(metricsList);
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
new file mode 100644
index 0000000..effe980
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.common;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputManager;
+import org.apache.ambari.logfeeder.input.InputSimulate;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.util.AliasUtil;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
+import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.reflect.TypeToken;
+
+public class ConfigHandler implements InputConfigMonitor {
+  private static final Logger LOG = Logger.getLogger(ConfigHandler.class);
+
+  private final OutputManager outputManager = new OutputManager();
+  private final InputManager inputManager = new InputManager();
+
+  public static Map<String, Object> globalConfigs = new HashMap<>();
+
+  private final List<Map<String, Object>> inputConfigList = new ArrayList<>();
+  private final List<Map<String, Object>> filterConfigList = new ArrayList<>();
+  private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
+  
+  private boolean simulateMode = false;
+  
+  public ConfigHandler() {}
+  
+  public void init() throws Exception {
+    loadConfigFiles();
+    loadOutputs();
+    simulateIfNeeded();
+    
+    inputManager.init();
+    outputManager.init();
+  }
+  
+  private void loadConfigFiles() throws Exception {
+    List<String> configFiles = getConfigFiles();
+    for (String configFileName : configFiles) {
+      LOG.info("Going to load config file:" + configFileName);
+      configFileName = configFileName.replace("\\ ", "%20");
+      File configFile = new File(configFileName);
+      if (configFile.exists() && configFile.isFile()) {
+        LOG.info("Config file exists in path." + configFile.getAbsolutePath());
+        loadConfigsUsingFile(configFile);
+      } else {
+        LOG.info("Trying to load config file from classloader: " + configFileName);
+        loadConfigsUsingClassLoader(configFileName);
+        LOG.info("Loaded config file from classloader: " + configFileName);
+      }
+    }
+  }
+
+  private List<String> getConfigFiles() {
+    List<String> configFiles = new ArrayList<>();
+    
+    String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
+    LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
+    if (logfeederConfigFilesProperty != null) {
+      configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
+    }
+
+    String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
+    if (StringUtils.isNotEmpty(inputConfigDir)) {
+      File configDirFile = new File(inputConfigDir);
+      List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
+      for (File inputConfigFile : inputConfigFiles) {
+        configFiles.add(inputConfigFile.getAbsolutePath());
+      }
+    }
+    
+    if (CollectionUtils.isEmpty(configFiles)) {
+      String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
+      configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
+    }
+    
+    return configFiles;
+  }
+
+  private void loadConfigsUsingFile(File configFile) throws Exception {
+    try {
+      String configData = FileUtils.readFileToString(configFile, Charset.defaultCharset());
+      loadConfigs(configData);
+    } catch (Exception t) {
+      LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
+      throw t;
+    }
+  }
+
+  private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
+    try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
+      String configData = IOUtils.toString(fis, Charset.defaultCharset());
+      loadConfigs(configData);
+    }
+  }
+  
+  @Override
+  public void loadInputConfigs(String serviceName, String inputConfigData) throws Exception {
+    inputConfigList.clear();
+    filterConfigList.clear();
+    
+    loadConfigs(inputConfigData);
+    
+    if (simulateMode) {
+      InputSimulate.loadTypeToFilePath(inputConfigList);
+    } else {
+      loadInputs(serviceName);
+      loadFilters(serviceName);
+      assignOutputsToInputs(serviceName);
+      
+      inputManager.startInputs(serviceName);
+    }
+  }
+
+  @Override
+  public void removeInputs(String serviceName) {
+    inputManager.removeInputsForService(serviceName);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void loadConfigs(String configData) throws Exception {
+    Type type = new TypeToken<Map<String, Object>>() {}.getType();
+    Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
+
+    // Get the globals
+    for (String key : configMap.keySet()) {
+      switch (key) {
+        case "global" :
+          globalConfigs.putAll((Map<String, Object>) configMap.get(key));
+          break;
+        case "input" :
+          List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
+          inputConfigList.addAll(inputConfig);
+          break;
+        case "filter" :
+          List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
+          filterConfigList.addAll(filterConfig);
+          break;
+        case "output" :
+          List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
+          outputConfigList.addAll(outputConfig);
+          break;
+        default :
+          LOG.warn("Unknown config key: " + key);
+      }
+    }
+  }
+  
+  private void simulateIfNeeded() throws Exception {
+    int simulatedInputNumber = LogFeederUtil.getIntProperty("logfeeder.simulate.input_number", 0);
+    if (simulatedInputNumber == 0)
+      return;
+    
+    List<Map<String, Object>> simulateInputConfigList = new ArrayList<>();
+    for (int i = 0; i < simulatedInputNumber; i++) {
+      HashMap<String, Object> mapList = new HashMap<String, Object>();
+      mapList.put("source", "simulate");
+      mapList.put("rowtype", "service");
+      simulateInputConfigList.add(mapList);
+    }
+    
+    Map<String, List<Map<String, Object>>> simulateInputConfigMap = ImmutableMap.of("input", simulateInputConfigList);
+    String simulateInputConfig = LogFeederUtil.getGson().toJson(simulateInputConfigMap);
+    loadInputConfigs("Simulation", simulateInputConfig);
+    
+    simulateMode = true;
+  }
+
+  private void loadOutputs() {
+    for (Map<String, Object> map : outputConfigList) {
+      if (map == null) {
+        continue;
+      }
+      mergeBlocks(globalConfigs, map);
+
+      String value = (String) map.get("destination");
+      if (StringUtils.isEmpty(value)) {
+        LOG.error("Output block doesn't have destination element");
+        continue;
+      }
+      Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
+      if (output == null) {
+        LOG.error("Output object could not be found");
+        continue;
+      }
+      output.setDestination(value);
+      output.loadConfig(map);
+
+      // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
+      if (output.getBooleanValue("is_enabled", true)) {
+        output.logConfigs(Level.INFO);
+        outputManager.add(output);
+      } else {
+        LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
+      }
+    }
+  }
+
+  private void loadInputs(String serviceName) {
+    for (Map<String, Object> map : inputConfigList) {
+      if (map == null) {
+        continue;
+      }
+      mergeBlocks(globalConfigs, map);
+
+      String value = (String) map.get("source");
+      if (StringUtils.isEmpty(value)) {
+        LOG.error("Input block doesn't have source element");
+        continue;
+      }
+      Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
+      if (input == null) {
+        LOG.error("Input object could not be found");
+        continue;
+      }
+      input.setType(value);
+      input.loadConfig(map);
+
+      if (input.isEnabled()) {
+        input.setOutputManager(outputManager);
+        input.setInputManager(inputManager);
+        inputManager.add(serviceName, input);
+        input.logConfigs(Level.INFO);
+      } else {
+        LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
+      }
+    }
+  }
+
+  private void loadFilters(String serviceName) {
+    sortFilters();
+
+    List<Input> toRemoveInputList = new ArrayList<Input>();
+    for (Input input : inputManager.getInputList(serviceName)) {
+      for (Map<String, Object> map : filterConfigList) {
+        if (map == null) {
+          continue;
+        }
+        mergeBlocks(globalConfigs, map);
+
+        String value = (String) map.get("filter");
+        if (StringUtils.isEmpty(value)) {
+          LOG.error("Filter block doesn't have filter element");
+          continue;
+        }
+        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
+        if (filter == null) {
+          LOG.error("Filter object could not be found");
+          continue;
+        }
+        filter.loadConfig(map);
+        filter.setInput(input);
+
+        if (filter.isEnabled()) {
+          filter.setOutputManager(outputManager);
+          input.addFilter(filter);
+          filter.logConfigs(Level.INFO);
+        } else {
+          LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
+        }
+      }
+      
+      if (input.getFirstFilter() == null) {
+        toRemoveInputList.add(input);
+      }
+    }
+
+    for (Input toRemoveInput : toRemoveInputList) {
+      LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
+      inputManager.removeInput(toRemoveInput);
+    }
+  }
+
+  private void sortFilters() {
+    Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
+
+      @Override
+      public int compare(Map<String, Object> o1, Map<String, Object> o2) {
+        Object o1Sort = o1.get("sort_order");
+        Object o2Sort = o2.get("sort_order");
+        if (o1Sort == null || o2Sort == null) {
+          return 0;
+        }
+        
+        int o1Value = parseSort(o1, o1Sort);
+        int o2Value = parseSort(o2, o2Sort);
+        
+        return o1Value - o2Value;
+      }
+
+      private int parseSort(Map<String, Object> map, Object o) {
+        if (!(o instanceof Number)) {
+          try {
+            return (new Double(Double.parseDouble(o.toString()))).intValue();
+          } catch (Throwable t) {
+            LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
+              + ", map=" + map.toString());
+            return 0;
+          }
+        } else {
+          return ((Number) o).intValue();
+        }
+      }
+    });
+  }
+
+  private void assignOutputsToInputs(String serviceName) {
+    Set<Output> usedOutputSet = new HashSet<Output>();
+    for (Input input : inputManager.getInputList(serviceName)) {
+      for (Output output : outputManager.getOutputs()) {
+        if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
+          usedOutputSet.add(output);
+          input.addOutput(output);
+        }
+      }
+    }
+    
+    // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
+    for (Output output : InputSimulate.getSimulateOutputs()) {
+      outputManager.add(output);
+      usedOutputSet.add(output);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
+    for (String key : fromMap.keySet()) {
+      Object objValue = fromMap.get(key);
+      if (objValue == null) {
+        continue;
+      }
+      if (objValue instanceof Map) {
+        Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
+
+        Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
+        if (localFields == null) {
+          localFields = new HashMap<String, Object>();
+          toMap.put(key, localFields);
+        }
+
+        if (globalFields != null) {
+          for (String fieldKey : globalFields.keySet()) {
+            if (!localFields.containsKey(fieldKey)) {
+              localFields.put(fieldKey, globalFields.get(fieldKey));
+            }
+          }
+        }
+      }
+    }
+
+    // Let's add the rest of the top level fields if missing
+    for (String key : fromMap.keySet()) {
+      if (!toMap.containsKey(key)) {
+        toMap.put(key, fromMap.get(key));
+      }
+    }
+  }
+
+  public void cleanCheckPointFiles() {
+    inputManager.cleanCheckPointFiles();
+  }
+
+  public void logStats() {
+    inputManager.logStats();
+    outputManager.logStats();
+  }
+  
+  public void addMetrics(List<MetricData> metricsList) {
+    inputManager.addMetricsContainers(metricsList);
+    outputManager.addMetricsContainers(metricsList);
+  }
+
+  public void waitOnAllInputs() {
+    inputManager.waitOnAllInputs();
+  }
+
+  public void close() {
+    inputManager.close();
+    outputManager.close();
+    inputManager.checkInAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0ac0ba42/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
new file mode 100644
index 0000000..b70fbd1
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Logger;
+
+import com.google.common.io.Files;
+
+public class InputConfigUploader extends Thread {
+  protected static final Logger LOG = Logger.getLogger(InputConfigUploader.class);
+
+  private static final long SLEEP_BETWEEN_CHECK = 2000;
+
+  private final File configDir;
+  private final FilenameFilter inputConfigFileFilter = new FilenameFilter() {
+    @Override
+    public boolean accept(File dir, String name) {
+      return name.startsWith("input.config-") && name.endsWith(".json");
+    }
+  };
+  private final Set<String> filesHandled = new HashSet<>();
+  private final Pattern serviceNamePattern = Pattern.compile("input.config-(.+).json");
+  private final LogSearchConfig config;
+  private final String clusterName = LogFeederUtil.getStringProperty("cluster.name");
+  
+  public static void load(LogSearchConfig config) {
+    new InputConfigUploader(config).start();
+  }
+  
+  private InputConfigUploader(LogSearchConfig config) {
+    super("Input Config Loader");
+    setDaemon(true);
+    
+    this.configDir = new File(LogFeederUtil.getStringProperty("logfeeder.config.dir"));
+    this.config = config;
+  }
+  
+  @Override
+  public void run() {
+    while (true) {
+      File[] inputConfigFiles = configDir.listFiles(inputConfigFileFilter);
+      for (File inputConfigFile : inputConfigFiles) {
+        if (!filesHandled.contains(inputConfigFile.getAbsolutePath())) {
+          try {
+            Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
+            m.find();
+            String serviceName = m.group(1);
+            String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset());
+            
+            if (!config.inputConfigExists(clusterName, serviceName)) {
+              config.setInputConfig(clusterName, serviceName, inputConfig);
+            }
+            filesHandled.add(inputConfigFile.getAbsolutePath());
+          } catch (Exception e) {
+            LOG.warn("Error handling file " + inputConfigFile.getAbsolutePath(), e);
+          }
+        }
+      }
+      
+      try {
+        Thread.sleep(SLEEP_BETWEEN_CHECK);
+      } catch (InterruptedException e) {
+        LOG.debug("Interrupted during sleep", e);
+      }
+    }
+  }
+}