You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by GitBox <gi...@apache.org> on 2018/11/01 12:33:32 UTC

[GitHub] oleewere closed pull request #17: AMBARI-24833. Create cloud input/output skeleton.

oleewere closed pull request #17: AMBARI-24833. Create cloud input/output skeleton.
URL: https://github.com/apache/ambari-logsearch/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 6228637afc..9ee4533485 100644
--- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -71,6 +71,7 @@
   private LRUCache cache;
   private String cacheKeyField;
   private boolean initDefaultFields;
+  private boolean cloudInput = false;
   private MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
 
   /**
@@ -400,4 +401,17 @@ public boolean isInitDefaultFields() {
   public void setInitDefaultFields(boolean initDefaultFields) {
     this.initDefaultFields = initDefaultFields;
   }
+
+  public boolean isCloudInput() {
+    return cloudInput;
+  }
+
+  public void setCloudInput(boolean cloudInput) {
+    this.cloudInput = cloudInput;
+  }
+
+  public String getCloudModeSuffix() {
+    String mode = isCloudInput() ? "cloud": "default";
+    return "mode=" + mode;
+  }
 }
diff --git a/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch-logfeeder/pom.xml
index 94af44f3f8..71cf853c18 100644
--- a/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch-logfeeder/pom.xml
@@ -33,8 +33,8 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <spring.version>5.1.1.RELEASE</spring.version>
-    <spring-boot.version>2.0.6.RELEASE</spring-boot.version>
+    <spring.version>5.1.2.RELEASE</spring.version>
+    <spring-boot.version>2.1.0.RELEASE</spring-boot.version>
   </properties>
 
   <dependencies>
@@ -96,7 +96,7 @@
     <dependency>
       <groupId>org.easymock</groupId>
       <artifactId>easymock</artifactId>
-      <version>3.6</version>
+      <version>4.0.1</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
index b4a2a26b43..c4b9835d07 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
@@ -27,9 +27,11 @@
 
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.manager.operations.impl.DefaultInputConfigHandler;
 import org.apache.ambari.logfeeder.input.InputFileMarker;
 import org.apache.ambari.logfeeder.input.InputManagerImpl;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
 import org.apache.ambari.logfeeder.output.OutputManagerImpl;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -89,8 +91,6 @@ public LogEntryParseTester(String logEntry, String shipperConfig, List<String> g
   @SuppressWarnings("unchecked")
   public Map<String, Object> parse() throws Exception {
     InputConfig inputConfig = getInputConfig();
-    ConfigHandler configHandler = new ConfigHandler(null);
-    configHandler.setInputManager(new InputManagerImpl());
     OutputManagerImpl outputManager = new OutputManagerImpl();
     LogFeederProps logFeederProps = new LogFeederProps();
     LogEntryCacheConfig logEntryCacheConfig = new LogEntryCacheConfig();
@@ -101,8 +101,11 @@ public LogEntryParseTester(String logEntry, String shipperConfig, List<String> g
     LogLevelFilterHandler logLevelFilterHandler = new LogLevelFilterHandler(null);
     logLevelFilterHandler.setLogFeederProps(logFeederProps);
     outputManager.setLogLevelFilterHandler(logLevelFilterHandler);
-    configHandler.setOutputManager(outputManager);
-    Input input = configHandler.getTestInput(inputConfig, logId);
+    DefaultInputConfigHandler configHandler = new DefaultInputConfigHandler();
+    InputConfigManager inputConfigManager = new InputConfigManager(
+      null, new InputManagerImpl(), outputManager, configHandler,logFeederProps, true
+    );
+    Input input = inputConfigManager.getTestInput(inputConfig, logId);
     input.init(logFeederProps);
     final Map<String, Object> result = new HashMap<>();
     input.getFirstFilter().init(logFeederProps);
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index 1d56924f06..a9790b26a0 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -23,7 +23,9 @@
   public static final String ALL = "all";
   public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
   public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN";
-  
+
+  public static final String CLOUD_PREFIX = "cl-";
+
   // solr fields
   public static final String SOLR_LEVEL = "level";
   public static final String SOLR_COMPONENT = "type";
@@ -107,4 +109,6 @@
   public static final String SOLR_ZK_CONNECTION_STRING = "logfeeder.solr.zk_connect_string";
   public static final String SOLR_URLS = "logfeeder.solr.urls";
 
+  public static final String CLOUD_STORAGE_MODE = "logfeeder.cloud.storage.mode";
+
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index 086ad70acd..881b856012 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,14 +20,20 @@
 
 import com.google.common.collect.Maps;
 import org.apache.ambari.logfeeder.common.LogFeederSolrClientFactory;
+import org.apache.ambari.logfeeder.conf.condition.CloudStorageCondition;
+import org.apache.ambari.logfeeder.conf.condition.NonCloudStorageCondition;
 import org.apache.ambari.logfeeder.container.docker.DockerContainerRegistry;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
+import org.apache.ambari.logfeeder.manager.operations.impl.CloudStorageInputConfigHandler;
 import org.apache.ambari.logfeeder.input.InputConfigUploader;
 import org.apache.ambari.logfeeder.input.InputManagerImpl;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
+import org.apache.ambari.logfeeder.output.cloud.CloudStorageOutputManager;
 import org.apache.ambari.logfeeder.plugin.manager.CheckpointManager;
 import org.apache.ambari.logfeeder.input.file.checkpoint.FileCheckpointManager;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logfeeder.manager.operations.impl.DefaultInputConfigHandler;
 import org.apache.ambari.logfeeder.metrics.MetricsManager;
 import org.apache.ambari.logfeeder.metrics.StatsLogger;
 import org.apache.ambari.logfeeder.output.OutputManagerImpl;
@@ -44,6 +50,7 @@
 import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
 import org.apache.solr.client.solrj.SolrClient;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.DependsOn;
 import org.springframework.context.annotation.PropertySource;
@@ -71,14 +78,43 @@ public LogFeederSecurityConfig logFeederSecurityConfig() {
     return new LogFeederSecurityConfig();
   }
 
+  @Bean
+  public CheckpointManager checkpointHandler() {
+    return new FileCheckpointManager();
+  }
+
+  @Bean
+  public DockerContainerRegistry containerRegistry() {
+    if (logFeederProps.isDockerContainerRegistryEnabled()) {
+      return DockerContainerRegistry.getInstance(logFeederProps.getProperties());
+    } else {
+      return null;
+    }
+  }
+
+  @Bean
+  public MetricsManager metricsManager() {
+    return new MetricsManager();
+  }
+
+  // Non-cloud configurations
+
+  @Bean
+  @Conditional(NonCloudStorageCondition.class)
+  public StatsLogger statsLogger() throws Exception {
+    return new StatsLogger("statsLogger", inputConfigManager());
+  }
+
   @Bean
   @DependsOn({"logSearchConfigLogFeeder", "propertyConfigurer"})
-  public ConfigHandler configHandler() throws Exception {
-    return new ConfigHandler(logSearchConfigLogFeeder());
+  @Conditional(NonCloudStorageCondition.class)
+  public DefaultInputConfigHandler inputConfigHandler() throws Exception {
+    return new DefaultInputConfigHandler();
   }
 
   @Bean
   @DependsOn("logFeederSecurityConfig")
+  @Conditional(NonCloudStorageCondition.class)
   public LogSearchConfigLogFeeder logSearchConfigLogFeeder() throws Exception {
     if (logFeederProps.isUseLocalConfigs()) {
       LogSearchConfigLogFeeder logfeederConfig = LogSearchConfigFactory.createLogSearchConfigLogFeeder(
@@ -96,6 +132,7 @@ public LogSearchConfigLogFeeder logSearchConfigLogFeeder() throws Exception {
   }
 
   @Bean
+  @Conditional(NonCloudStorageCondition.class)
   public LogLevelFilterManager logLevelFilterManager() throws Exception {
     if (logFeederProps.isSolrFilterStorage()) {
       SolrClient solrClient = new LogFeederSolrClientFactory().createSolrClient(
@@ -107,13 +144,14 @@ public LogLevelFilterManager logLevelFilterManager() throws Exception {
         map.put(name, logFeederProps.getProperties().getProperty(name));
       }
       return new LogLevelFilterManagerZK(map);
-    } else { // no default filter manager
+    } else {
       return null;
     }
   }
 
   @Bean
   @DependsOn("logLevelFilterHandler")
+  @Conditional(NonCloudStorageCondition.class)
   public LogLevelFilterUpdater logLevelFilterUpdater() throws Exception {
     if (logFeederProps.isSolrFilterStorage() && logFeederProps.isSolrFilterMonitor()) {
       LogLevelFilterUpdater logLevelFilterUpdater = new LogLevelFilterUpdaterSolr(
@@ -121,56 +159,92 @@ public LogLevelFilterUpdater logLevelFilterUpdater() throws Exception {
         30, (LogLevelFilterManagerSolr) logLevelFilterManager(), logFeederProps.getClusterName());
       logLevelFilterUpdater.start();
       return logLevelFilterUpdater;
-    } else { // no default filter updater
-      return null;
     }
-  }
-  @Bean
-  public MetricsManager metricsManager() {
-    return new MetricsManager();
+    return null;
   }
 
   @Bean
-  @DependsOn("configHandler")
+  @Conditional(NonCloudStorageCondition.class)
   public LogLevelFilterHandler logLevelFilterHandler() throws Exception {
     return new LogLevelFilterHandler(logSearchConfigLogFeeder());
   }
 
   @Bean
-  @DependsOn({"configHandler", "logSearchConfigLogFeeder", "logLevelFilterHandler"})
-  public InputConfigUploader inputConfigUploader() {
-    return new InputConfigUploader();
+  @Conditional(NonCloudStorageCondition.class)
+  @DependsOn({"inputConfigHandler"})
+  public InputConfigUploader inputConfigUploader() throws Exception {
+    return new InputConfigUploader("Input Config Loader", logSearchConfigLogFeeder(),
+      inputConfigManager(), logLevelFilterHandler());
+  }
+
+  @Bean
+  @DependsOn({"containerRegistry", "checkpointHandler"})
+  @Conditional(NonCloudStorageCondition.class)
+  public InputManager inputManager() {
+    return new InputManagerImpl("InputIsNotReadyMonitor");
+  }
+
+  @Bean
+  @Conditional(NonCloudStorageCondition.class)
+  public OutputManager outputManager() throws Exception {
+    return new OutputManagerImpl();
   }
 
   @Bean
-  @DependsOn("inputConfigUploader")
-  public StatsLogger statsLogger() {
-    return new StatsLogger();
+  @Conditional(NonCloudStorageCondition.class)
+  public InputConfigManager inputConfigManager() throws Exception {
+    return new InputConfigManager(logSearchConfigLogFeeder(), inputManager(), outputManager(),
+      inputConfigHandler(), logFeederProps, true);
   }
 
+  // Cloud configurations
+
+  @Bean(name = "cloudLogSearchLogFeederConfig")
+  @Conditional(CloudStorageCondition.class)
+  public LogSearchConfigLogFeeder cloudLogSearchLogFeederConfig() throws Exception {
+    return LogSearchConfigFactory.createLogSearchConfigLogFeeder(
+      Maps.fromProperties(logFeederProps.getProperties()),
+      logFeederProps.getClusterName(),
+      LogSearchConfigLogFeederLocal.class, false);
+  }
+
+  @Bean
+  @Conditional(CloudStorageCondition.class)
+  @DependsOn({"cloudInputConfigHandler"})
+  public InputConfigUploader cloudInputConfigUploader() throws Exception {
+    return new InputConfigUploader("Cloud Input Config Loader", cloudLogSearchLogFeederConfig(),
+      cloudInputConfigManager(),null);
+  }
 
   @Bean
   @DependsOn({"containerRegistry", "checkpointHandler"})
-  public InputManager inputManager() {
-    return new InputManagerImpl();
+  @Conditional(CloudStorageCondition.class)
+  public InputManager cloudInputManager() {
+    return new InputManagerImpl("CloudInputIsNotReady");
   }
 
   @Bean
-  public OutputManager outputManager() {
-    return new OutputManagerImpl();
+  @Conditional(CloudStorageCondition.class)
+  public OutputManager cloudOutputManager() throws Exception {
+    return new CloudStorageOutputManager();
   }
 
   @Bean
-  public CheckpointManager checkpointHandler() {
-    return new FileCheckpointManager();
+  @Conditional(CloudStorageCondition.class)
+  public InputConfigHandler cloudInputConfigHandler() {
+    return new CloudStorageInputConfigHandler();
   }
 
   @Bean
-  public DockerContainerRegistry containerRegistry() {
-    if (logFeederProps.isDockerContainerRegistryEnabled()) {
-      return DockerContainerRegistry.getInstance(logFeederProps.getProperties());
-    } else {
-      return null;
-    }
+  @Conditional(CloudStorageCondition.class)
+  public InputConfigManager cloudInputConfigManager() throws Exception {
+    return new InputConfigManager(cloudLogSearchLogFeederConfig(), cloudInputManager(), cloudOutputManager(),
+      cloudInputConfigHandler(), logFeederProps, false);
+  }
+
+  @Bean
+  @Conditional(CloudStorageCondition.class)
+  public StatsLogger cloudStatsLogger() throws Exception {
+    return new StatsLogger("cloudStatsLogger", cloudInputConfigManager());
   }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java
new file mode 100644
index 0000000000..329f06613d
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederMode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf;
+
+/**
+ * Global Log Feeder modes:
+ * <pre>
+ * - default: process logs based on input / filter / output JSON configurations
+ * - cloud: process logs based on input JSON configurations and send them directly into cloud storage (without filters)
+ * - hybrid: use both 2 above (together)
+ * </pre>
+ */
+public enum LogFeederMode {
+  DEFAULT("default"), CLOUD("cloud"), HYBRID("hybrid");
+
+  private String text;
+
+  LogFeederMode(String text) {
+    this.text = text;
+  }
+
+  public String getText() {
+    return this.text;
+  }
+
+  public static LogFeederMode fromString(String text) {
+    for (LogFeederMode mode : LogFeederMode.values()) {
+      if (mode.text.equalsIgnoreCase(text)) {
+        return mode;
+      }
+    }
+    throw new IllegalArgumentException(String.format("String '%s' cannot be converted to LogFeederMode enum", text));
+  }
+
+  public static boolean isCloudStorage(LogFeederMode mode) {
+    return LogFeederMode.HYBRID.equals(mode) || LogFeederMode.CLOUD.equals(mode);
+  }
+
+  public static boolean isNonCloudStorage(LogFeederMode mode) {
+    return LogFeederMode.HYBRID.equals(mode) || LogFeederMode.DEFAULT.equals(mode);
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 859de8f0be..dc1bfd2780 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -199,6 +199,16 @@
   @Value("${" + LogFeederConstants.SOLR_URLS + ":}")
   private String solrUrlsStr;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_MODE,
+    description = "Option to support sending logs to cloud storage. You can choose between supporting only cloud storage, non-cloud storage or both",
+    examples = {"default", "cloud", "hybrid"},
+    defaultValue = "default",
+    sources = {LogFeederConstants.CLOUD_STORAGE_MODE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_MODE + ":default}")
+  public LogFeederMode cloudStorageMode;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -352,6 +362,14 @@ public void setZkFilterStorage(boolean zkFilterStorage) {
     this.zkFilterStorage = zkFilterStorage;
   }
 
+  public LogFeederMode getCloudStorageMode() {
+    return cloudStorageMode;
+  }
+
+  public void setCloudStorageMode(LogFeederMode cloudStorageMode) {
+    this.cloudStorageMode = cloudStorageMode;
+  }
+
   public String[] getSolrUrls() {
     if (StringUtils.isNotBlank(this.solrUrlsStr)) {
       return this.solrUrlsStr.split(",");
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
index aca1109c8a..e047f60aec 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
@@ -19,17 +19,18 @@
 package org.apache.ambari.logfeeder.conf;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.credential.CompositeSecretStore;
+import org.apache.ambari.logfeeder.credential.FileSecretStore;
+import org.apache.ambari.logfeeder.credential.HadoopCredentialSecretStore;
+import org.apache.ambari.logfeeder.credential.SecretStore;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.springframework.beans.factory.annotation.Value;
 
 import javax.annotation.PostConstruct;
 import java.io.File;
-import java.nio.charset.Charset;
 
 public class LogFeederSecurityConfig {
 
@@ -142,48 +143,12 @@ private void ensureStorePassword(String locationArg, String pwdArg, String prope
   }
 
   private String getPassword(String propertyName, String fileName) {
-    String credentialStorePassword = getPasswordFromCredentialStore(propertyName);
-    if (credentialStorePassword != null) {
-      return credentialStorePassword;
-    }
-
-    String filePassword = getPasswordFromFile(fileName);
-    if (filePassword != null) {
-      return filePassword;
-    }
-
-    return LOGFEEDER_STORE_DEFAULT_PASSWORD;
-  }
+    SecretStore hadoopSecretStore = new HadoopCredentialSecretStore(propertyName, credentialStoreProviderPath);
+    SecretStore fileSecretStore = new FileSecretStore(String.join(File.separator, LOGFEEDER_CERT_DEFAULT_FOLDER, fileName), LOGFEEDER_STORE_DEFAULT_PASSWORD);
+    SecretStore compositeSecretStore = new CompositeSecretStore(hadoopSecretStore, fileSecretStore);
 
-  private String getPasswordFromCredentialStore(String propertyName) {
-    try {
-      if (StringUtils.isEmpty(credentialStoreProviderPath)) {
-        return null;
-      }
-
-      org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
-      config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, credentialStoreProviderPath);
-      char[] passwordChars = config.getPassword(propertyName);
-      return (ArrayUtils.isNotEmpty(passwordChars)) ? new String(passwordChars) : null;
-    } catch (Exception e) {
-      logger.warn(String.format("Could not load password %s from credential store, using default password", propertyName));
-      return null;
-    }
-  }
-
-  private String getPasswordFromFile(String fileName) {
-    try {
-      File pwdFile = new File(LOGFEEDER_CERT_DEFAULT_FOLDER, fileName);
-      if (!pwdFile.exists()) {
-        FileUtils.writeStringToFile(pwdFile, LOGFEEDER_STORE_DEFAULT_PASSWORD, Charset.defaultCharset());
-        return LOGFEEDER_STORE_DEFAULT_PASSWORD;
-      } else {
-        return FileUtils.readFileToString(pwdFile, Charset.defaultCharset());
-      }
-    } catch (Exception e) {
-      logger.warn("Exception occurred during read/write password file for keystore/truststore.", e);
-      return null;
-    }
+    char[] password = compositeSecretStore.getSecret();
+    return password == null ? LOGFEEDER_STORE_DEFAULT_PASSWORD: new String(password);
   }
 
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java
new file mode 100644
index 0000000000..3860699199
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/CloudStorageCondition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.condition;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederMode;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+/**
+ * Global condition that checks is the application started in cloud or hybrid mode
+ */
+public class CloudStorageCondition implements Condition {
+
+  @Override
+  public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+    return LogFeederMode.isCloudStorage(LogFeederMode.fromString(
+      context.getEnvironment().getProperty(LogFeederConstants.CLOUD_STORAGE_MODE)));
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java
new file mode 100644
index 0000000000..fee0efaf09
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/condition/NonCloudStorageCondition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.condition;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederMode;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+/**
+ * Global condition that checks is the application started in default or hybrid mode
+ */
+public class NonCloudStorageCondition implements Condition {
+
+  @Override
+  public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+    return LogFeederMode.isNonCloudStorage(LogFeederMode.fromString(
+      context.getEnvironment().getProperty(LogFeederConstants.CLOUD_STORAGE_MODE)));
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java
new file mode 100644
index 0000000000..7edaf266e2
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/CompositeSecretStore.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+public class CompositeSecretStore implements SecretStore {
+
+  private SecretStore[] secretStores;
+
+  public CompositeSecretStore(SecretStore... secretStores) {
+    this.secretStores = secretStores;
+  }
+
+  @Override
+  public char[] getSecret() {
+    for (SecretStore secretStore : secretStores) {
+      char[] secret = secretStore.getSecret();
+      if (secret != null) {
+        return secret;
+      }
+    }
+    return null;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java
new file mode 100644
index 0000000000..5d82ee1d00
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/EnvSecretStore.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+public class EnvSecretStore implements SecretStore {
+
+  private final String property;
+
+  public EnvSecretStore(String property) {
+    this.property = property;
+  }
+
+  @Override
+  public char[] getSecret() {
+    String envValue = System.getenv(property);
+    if (envValue != null) {
+      return envValue.toCharArray();
+    }
+    return null;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java
new file mode 100644
index 0000000000..b9687e02ef
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/FileSecretStore.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.nio.charset.Charset;
+
+public class FileSecretStore implements SecretStore {
+
+  private static final Logger logger = LogManager.getLogger(FileSecretStore.class);
+
+  private final String fileLocation;
+  private final String defaultSecret;
+
+  public FileSecretStore(String fileLocation, String defaultSecret) {
+    this.fileLocation = fileLocation;
+    this.defaultSecret = defaultSecret;
+  }
+
+  public FileSecretStore(String fileLocation) {
+    this.fileLocation = fileLocation;
+    this.defaultSecret = null;
+  }
+
+  @Override
+  public char[] getSecret() {
+    try {
+      File pwdFile = new File(fileLocation);
+      if (!pwdFile.exists() && defaultSecret != null) {
+        FileUtils.writeStringToFile(pwdFile, defaultSecret, Charset.defaultCharset());
+        return defaultSecret.toCharArray();
+      } else {
+        return FileUtils.readFileToString(pwdFile, Charset.defaultCharset()).toCharArray();
+      }
+    } catch (Exception e) {
+      logger.warn("Exception occurred during read/write password file.", e);
+      return null;
+    }
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java
new file mode 100644
index 0000000000..7e1237e135
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/HadoopCredentialSecretStore.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HadoopCredentialSecretStore implements SecretStore {
+
+  private static final Logger logger = LogManager.getLogger(HadoopCredentialSecretStore.class);
+  private static final String CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY = "hadoop.security.credential.provider.path";
+
+  private final String credentialStoreProviderPath;
+  private final String property;
+
+  public HadoopCredentialSecretStore(String property, String credentialStoreProviderPath) {
+    this.property = property;
+    this.credentialStoreProviderPath = credentialStoreProviderPath;
+  }
+
+  @Override
+  public char[] getSecret() {
+    try {
+      if (StringUtils.isBlank(credentialStoreProviderPath)) {
+        return null;
+      }
+      org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
+      config.set(CREDENTIAL_STORE_PROVIDER_PATH_PROPERTY, credentialStoreProviderPath);
+      return config.getPassword(property);
+    } catch (Exception e) {
+      logger.warn("Could not load password {} from credential store.", property);
+      return null;
+    }
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java
new file mode 100644
index 0000000000..046fe7c4f5
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PropertySecretStore.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+public class PropertySecretStore implements SecretStore {
+  private final String property;
+
+  public PropertySecretStore(String property) {
+    this.property = property;
+  }
+
+  @Override
+  public char[] getSecret() {
+    String propValue = System.getProperty(property);
+    if (propValue != null) {
+      return propValue.toCharArray();
+    }
+    return null;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java
new file mode 100644
index 0000000000..c257ff48ab
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/SecretStore.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.credential;
+
+/**
+ * Store secrets in character array
+ */
+public interface SecretStore {
+  /**
+   * Gather a secret - implement the way
+   * @return secret character array
+   */
+  char[] getSecret();
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java
new file mode 100644
index 0000000000..e1946f8689
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterDummy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.filter;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
+import org.apache.commons.lang3.BooleanUtils;
+
+/**
+ * Simple dummy filter, not supported by config api, create it manually
+ */
+public class FilterDummy extends Filter<LogFeederProps> {
+
+  private boolean dockerEnabled = false;
+
+  @Override
+  public void init(LogFeederProps logFeederProps) throws Exception {
+    if (logFeederProps.isDockerContainerRegistryEnabled()) {
+      Input input = getInput();
+      if (input instanceof InputFile) {
+        dockerEnabled = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor) input.getInputDescriptor()).getDockerEnabled(), false);
+      }
+    }
+  }
+
+  @Override
+  public void apply(String inputStr, InputMarker inputMarker) throws Exception {
+    if (dockerEnabled) {
+      inputStr = DockerLogFilter.getLogFromDockerJson(inputStr);
+    }
+    super.apply(inputStr, inputMarker);
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "filter:filter=dummy";
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index 57f5b3d647..283273a571 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -20,8 +20,8 @@
 
 import com.google.common.io.Files;
 import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -49,20 +49,18 @@
   private final Set<String> filesHandled = new HashSet<>();
   private final Pattern serviceNamePattern = Pattern.compile("input.config-(.+).json");
 
-  @Inject
-  private LogSearchConfigLogFeeder config;
-
   @Inject
   private LogFeederProps logFeederProps;
 
-  @Inject
-  private LogLevelFilterHandler logLevelFilterHandler;
-
-  @Inject
-  private ConfigHandler configHandler;
+  private final InputConfigManager inputConfigManager;
+  private final LogSearchConfigLogFeeder config;
+  private final LogLevelFilterHandler logLevelFilterHandler;
 
-  public InputConfigUploader() {
-    super("Input Config Loader");
+  public InputConfigUploader(String name, LogSearchConfigLogFeeder config, InputConfigManager inputConfigManager, LogLevelFilterHandler logLevelFilterHandler) {
+    super(name);
+    this.config = config;
+    this.inputConfigManager = inputConfigManager;
+    this.logLevelFilterHandler = logLevelFilterHandler;
     setDaemon(true);
   }
 
@@ -70,7 +68,9 @@ public InputConfigUploader() {
   public void init() throws Exception {
     this.configDir = new File(logFeederProps.getConfDir());
     this.start();
-    config.monitorInputConfigChanges(configHandler, logLevelFilterHandler, logFeederProps.getClusterName());
+    if (config != null) {
+      config.monitorInputConfigChanges(inputConfigManager, logLevelFilterHandler, logFeederProps.getClusterName());
+    }
   }
 
   @Override
@@ -85,7 +85,7 @@ public void run() {
               m.find();
               String serviceName = m.group(1);
               String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset());
-              if (!config.inputConfigExists(serviceName)) {
+              if (config != null && !config.inputConfigExists(serviceName)) {
                 config.createInputConfig(logFeederProps.getClusterName(), serviceName, inputConfig);
               }
               filesHandled.add(inputConfigFile.getAbsolutePath());
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index b8eb5e99a7..64428f60b5 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.logfeeder.input;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.container.docker.DockerContainerRegistry;
@@ -106,7 +107,7 @@ public boolean isReady() {
         if (dockerContainerRegistry != null) {
           Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap();
           String logType = getLogType();
-          if (metadataMap.containsKey(logType)) {
+          if (metadataMap.containsKey(StringUtils.removeStart(logType, LogFeederConstants.CLOUD_PREFIX))) {
             isReady = true;
           }
         } else {
@@ -140,12 +141,12 @@ public void setReady(boolean isReady) {
   public String getNameForThread() {
     if (filePath != null) {
       try {
-        return (getType() + "=" + (new File(filePath)).getName());
+        return (getType() + "=" + (new File(filePath)).getName() + ";" + getCloudModeSuffix());
       } catch (Throwable ex) {
         logger.warn("Couldn't get basename for filePath=" + filePath, ex);
       }
     }
-    return super.getNameForThread() + ":" + getType();
+    return super.getNameForThread() + ":" + getType() + ";" + getCloudModeSuffix();
   }
 
   @Override
@@ -177,8 +178,9 @@ public boolean monitor() {
         Map<String, Map<String, DockerMetadata>> metadataMap = dockerContainerRegistry.getContainerMetadataMap();
         String logType = getLogType();
         threadGroup = new ThreadGroup("docker-parent-" + logType);
-        if (metadataMap.containsKey(logType)) {
-          Map<String, DockerMetadata> dockerMetadataMap = metadataMap.get(logType);
+        String replacedLogType = StringUtils.removeStart(logType, LogFeederConstants.CLOUD_PREFIX);
+        if (metadataMap.containsKey(replacedLogType)) {
+          Map<String, DockerMetadata> dockerMetadataMap = metadataMap.get(replacedLogType);
           for (Map.Entry<String, DockerMetadata> dockerMetadataEntry : dockerMetadataMap.entrySet()) {
             try {
               startNewChildDockerInputFileThread(dockerMetadataEntry.getValue());
@@ -198,9 +200,9 @@ else if (multiFolder) {
             for (Map.Entry<String, List<File>> folderFileEntry : getFolderMap().entrySet()) {
               startNewChildInputFileThread(folderFileEntry);
             }
-            logFilePathUpdaterThread = new Thread(new LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), "logfile_path_updater=" + filePath);
+            logFilePathUpdaterThread = new Thread(new LogFilePathUpdateMonitor((InputFile) this, pathUpdateIntervalMin, detachTimeMin), String.format("logfile_path_updater=%s;%s", filePath, getCloudModeSuffix()));
             logFilePathUpdaterThread.setDaemon(true);
-            logFileDetacherThread = new Thread(new LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), "logfile_detacher=" + filePath);
+            logFileDetacherThread = new Thread(new LogFileDetachMonitor((InputFile) this, detachIntervalMin, detachTimeMin), String.format("logfile_detacher=%s;%s", filePath, getCloudModeSuffix()));
             logFileDetacherThread.setDaemon(true);
 
             logFilePathUpdaterThread.start();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index 70e54d6731..bd3e0457cf 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -62,6 +62,16 @@
     return inputs.get(serviceName);
   }
 
+  private final String notReadyThreadName;
+
+  public InputManagerImpl() {
+    this.notReadyThreadName = "InputIsNotReadyMonitor";
+  }
+
+  public InputManagerImpl(String notReadyThreadName) {
+    this.notReadyThreadName = notReadyThreadName;
+  }
+
   @Override
   public void add(String serviceName, Input input) {
     List<Input> inputList = inputs.computeIfAbsent(serviceName, k -> new ArrayList<>());
@@ -130,7 +140,7 @@ private void startDockerMetadataThread() {
   }
 
   private void startMonitorThread() {
-    Thread inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
+    Thread inputIsReadyMonitor = new Thread(notReadyThreadName) {
       @Override
       public void run() {
         logger.info("Going to monitor for these missing files: " + notReadyList.toString());
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
index 965aa84a6b..1a15395f78 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
@@ -123,7 +123,7 @@ public void setDrain(boolean drain) {
 
   @Override
   public String getNameForThread() {
-    return String.format("socket=%s-%s-%s", getLogType(), this.protocol, this.port);
+    return String.format("socket=%s-%s-%s;%s", getLogType(), this.protocol, this.port, getCloudModeSuffix());
   }
 
   @Override
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java
new file mode 100644
index 0000000000..c5a91728c6
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/BlockMerger.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Helper for merge global and input configurations
+ */
+public class BlockMerger {
+  private BlockMerger() {
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
+    for (String key : fromMap.keySet()) {
+      Object objValue = fromMap.get(key);
+      if (objValue == null) {
+        continue;
+      }
+      if (objValue instanceof Map) {
+        Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
+
+        Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
+        if (localFields == null) {
+          localFields = new HashMap<>();
+          toMap.put(key, localFields);
+        }
+
+        if (globalFields != null) {
+          for (String fieldKey : globalFields.keySet()) {
+            if (!localFields.containsKey(fieldKey)) {
+              localFields.put(fieldKey, globalFields.get(fieldKey));
+            }
+          }
+        }
+      }
+    }
+
+    // Let's add the rest of the top level fields if missing
+    for (String key : fromMap.keySet()) {
+      if (!toMap.containsKey(key)) {
+        toMap.put(key, fromMap.get(key));
+      }
+    }
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java
new file mode 100644
index 0000000000..35ad1bd345
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigHolder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Holds common configuration/manager objects for input config managers/handlers in order to provide 1 object as input (instead of many)
+ */
+public class InputConfigHolder {
+
+  private final LogFeederProps logFeederProps;
+  private final LogSearchConfigLogFeeder config;
+  private final List<InputDescriptor> inputConfigList = new ArrayList<>();
+  private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
+  private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
+
+  private final InputManager inputManager;
+  private final OutputManager outputManager;
+
+  public InputConfigHolder(LogSearchConfigLogFeeder config, InputManager inputManager, OutputManager outputManager, LogFeederProps logFeederProps) {
+    this.logFeederProps = logFeederProps;
+    this.config = config;
+    this.inputManager = inputManager;
+    this.outputManager = outputManager;
+  }
+
+  public LogFeederProps getLogFeederProps() {
+    return logFeederProps;
+  }
+
+  public List<InputDescriptor> getInputConfigList() {
+    return inputConfigList;
+  }
+
+  public List<FilterDescriptor> getFilterConfigList() {
+    return filterConfigList;
+  }
+
+  public List<Map<String, Object>> getOutputConfigList() {
+    return outputConfigList;
+  }
+
+  public InputManager getInputManager() {
+    return inputManager;
+  }
+
+  public OutputManager getOutputManager() {
+    return outputManager;
+  }
+
+  public LogSearchConfigLogFeeder getConfig() {
+    return config;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java
similarity index 53%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java
index 61f726c331..925bb65057 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/InputConfigManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,15 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.common;
+package org.apache.ambari.logfeeder.manager;
 
 import com.google.common.collect.Maps;
 import com.google.gson.reflect.TypeToken;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
 import org.apache.ambari.logfeeder.input.InputSimulate;
 import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
-import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.manager.InputManager;
 import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
@@ -40,7 +40,6 @@
 import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputDescriptorImpl;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -48,7 +47,6 @@
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import javax.inject.Inject;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -56,50 +54,115 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
- * Initialize / close input and output managers and monitors input configuration changes.
+ * Facade class for input config operations (add / load / remove configs and start or close input monitoring)
  */
-public class ConfigHandler implements InputConfigMonitor {
-  private static final Logger logger = LogManager.getLogger(ConfigHandler.class);
+public class InputConfigManager implements InputConfigMonitor {
 
-  private final LogSearchConfigLogFeeder logSearchConfig;
+  private Logger logger = LogManager.getLogger(InputConfigManager.class);
 
-  @Inject
-  private InputManager inputManager;
-  @Inject
-  private OutputManager outputManager;
-  @Inject
-  private LogFeederProps logFeederProps;
+  private final InputConfigHandler inputConfigHandler;
+  private final LogSearchConfigLogFeeder logSearchConfig;
+  private final LogFeederProps logFeederProps;
+  private final InputConfigHolder inputConfigHolder;
+  private final boolean loadOutput;
 
   private final Map<String, Object> globalConfigs = new HashMap<>();
   private final List<String> globalConfigJsons = new ArrayList<>();
 
-  private final List<InputDescriptor> inputConfigList = new ArrayList<>();
-  private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
-  private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
-
   private boolean simulateMode = false;
 
-  public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) {
+  public InputConfigManager(LogSearchConfigLogFeeder logSearchConfig, InputManager inputManager,
+                            OutputManager outputManager, InputConfigHandler inputConfigHandler,
+                            LogFeederProps logFeederProps, boolean loadOutput) {
     this.logSearchConfig = logSearchConfig;
+    this.inputConfigHandler = inputConfigHandler;
+    this.logFeederProps = logFeederProps;
+    this.loadOutput = loadOutput;
+    this.inputConfigHolder = new InputConfigHolder(logSearchConfig, inputManager, outputManager, logFeederProps);
   }
 
   @PostConstruct
   public void init() throws Exception {
     loadConfigFiles();
     logSearchConfig.init(Maps.fromProperties(logFeederProps.getProperties()), logFeederProps.getClusterName());
-    loadOutputs();
+    inputConfigHandler.init(inputConfigHolder);
     simulateIfNeeded();
+    if (loadOutput) {
+      loadOutputs();
+    }
+    inputConfigHolder.getInputManager().init();
+    inputConfigHolder.getOutputManager().init();
+  }
+
+  @Override
+  public List<String> getGlobalConfigJsons() {
+    return this.globalConfigJsons;
+  }
+
+  @Override
+  public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception {
+    inputConfigHolder.getInputConfigList().clear();
+    inputConfigHolder.getFilterConfigList().clear();
+    inputConfigHolder.getInputConfigList().addAll(inputConfig.getInput());
+    inputConfigHolder.getFilterConfigList().addAll(inputConfig.getFilter());
+    if (simulateMode) {
+      InputSimulate.loadTypeToFilePath(inputConfigHolder.getInputConfigList());
+    } else {
+      inputConfigHandler.loadInputs(serviceName, inputConfigHolder, inputConfig);
+      inputConfigHandler.assignInputsToOutputs(serviceName, inputConfigHolder, inputConfig);
+    }
+    inputConfigHolder.getInputManager().startInputs(serviceName);
+  }
 
-    inputManager.init();
-    outputManager.init();
+  @Override
+  public void removeInputs(String serviceName) {
+    inputConfigHolder.getInputManager().removeInputsForService(serviceName);
+  }
+
+  public void cleanCheckPointFiles() {
+    inputConfigHolder.getInputManager().getCheckpointHandler().cleanupCheckpoints();
+  }
+
+  public void logStats() {
+    inputConfigHolder.getInputManager().logStats();
+    inputConfigHolder.getOutputManager().logStats();
+  }
+
+  public void addMetrics(List<MetricData> metricsList) {
+    inputConfigHolder.getInputManager().addMetricsContainers(metricsList);
+    inputConfigHolder.getOutputManager().addMetricsContainers(metricsList);
+  }
+
+  @PreDestroy
+  public void close() {
+    inputConfigHolder.getInputManager().close();
+    inputConfigHolder.getOutputManager().close();
+    inputConfigHolder.getInputManager().checkInAll();
+  }
+
+  public Input getTestInput(InputConfig inputConfig, String logId) {
+    for (InputDescriptor inputDescriptor : inputConfig.getInput()) {
+      if (inputDescriptor.getType().equals(logId)) {
+        inputConfigHolder.getInputConfigList().add(inputDescriptor);
+        break;
+      }
+    }
+    if (inputConfigHolder.getInputConfigList().isEmpty()) {
+      throw new IllegalArgumentException("Log Id " + logId + " was not found in shipper configuriaton");
+    }
+
+    for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) {
+      inputConfigHolder.getFilterConfigList().add(filterDescriptor);
+    }
+    inputConfigHandler.loadInputs("test", inputConfigHolder, inputConfig);
+    List<Input> inputList = inputConfigHolder.getInputManager().getInputList("test");
+
+    return inputList != null && inputList.size() == 1 ? inputList.get(0) : null;
   }
 
   private void loadConfigFiles() throws Exception {
@@ -121,13 +184,11 @@ private void loadConfigFiles() throws Exception {
 
   private List<String> getConfigFiles() {
     List<String> configFiles = new ArrayList<>();
-
     String logFeederConfigFilesProperty = logFeederProps.getConfigFiles();
     logger.info("logfeeder.config.files=" + logFeederConfigFilesProperty);
     if (logFeederConfigFilesProperty != null) {
       configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(",")));
     }
-
     return configFiles;
   }
 
@@ -152,57 +213,8 @@ private void loadConfigsUsingClassLoader(String configFileName) throws Exception
     }
   }
 
-  @Override
-  public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception {
-    inputConfigList.clear();
-    filterConfigList.clear();
-
-    inputConfigList.addAll(inputConfig.getInput());
-    filterConfigList.addAll(inputConfig.getFilter());
-
-    if (simulateMode) {
-      InputSimulate.loadTypeToFilePath(inputConfigList);
-    } else {
-      loadInputs(serviceName);
-      loadFilters(serviceName);
-      assignOutputsToInputs(serviceName);
-
-      inputManager.startInputs(serviceName);
-    }
-  }
-
-  @Override
-  public void removeInputs(String serviceName) {
-    inputManager.removeInputsForService(serviceName);
-  }
-
-  public Input getTestInput(InputConfig inputConfig, String logId) {
-    for (InputDescriptor inputDescriptor : inputConfig.getInput()) {
-      if (inputDescriptor.getType().equals(logId)) {
-        inputConfigList.add(inputDescriptor);
-        break;
-      }
-    }
-    if (inputConfigList.isEmpty()) {
-      throw new IllegalArgumentException("Log Id " + logId + " was not found in shipper configuriaton");
-    }
-
-    for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) {
-//      if ("grok".equals(filterDescriptor.getFilter())) {
-//        // Thus ensure that the log entry passed will be parsed immediately
-//        ((FilterGrokDescriptor)filterDescriptor).setMultilinePattern(null);
-//      }
-      filterConfigList.add(filterDescriptor);
-    }
-    loadInputs("test");
-    loadFilters("test");
-    List<Input> inputList = inputManager.getInputList("test");
-
-    return inputList != null && inputList.size() == 1 ? inputList.get(0) : null;
-  }
-
   @SuppressWarnings("unchecked")
-  public void loadConfigs(String configData) throws Exception {
+  private void loadConfigs(String configData) throws Exception {
     Type type = new TypeToken<Map<String, Object>>() {}.getType();
     Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
 
@@ -215,7 +227,7 @@ public void loadConfigs(String configData) throws Exception {
           break;
         case "output" :
           List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
-          outputConfigList.addAll(outputConfig);
+          inputConfigHolder.getOutputConfigList().addAll(outputConfig);
           break;
         default :
           logger.warn("Unknown config key: " + key);
@@ -223,39 +235,13 @@ public void loadConfigs(String configData) throws Exception {
     }
   }
 
-  @Override
-  public List<String> getGlobalConfigJsons() {
-    return globalConfigJsons;
-  }
-
-  private void simulateIfNeeded() throws Exception {
-    int simulatedInputNumber = logFeederProps.getInputSimulateConfig().getSimulateInputNumber();
-    if (simulatedInputNumber == 0)
-      return;
-
-    InputConfigImpl simulateInputConfig = new InputConfigImpl();
-    List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
-    simulateInputConfig.setInput(inputConfigDescriptors);
-    simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>());
-    for (int i = 0; i < simulatedInputNumber; i++) {
-      InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
-      inputDescriptor.setSource("simulate");
-      inputDescriptor.setRowtype("service");
-      inputDescriptor.setAddFields(new HashMap<String, String>());
-      inputConfigDescriptors.add(inputDescriptor);
-    }
-
-    loadInputConfigs("Simulation", simulateInputConfig);
-
-    simulateMode = true;
-  }
-
   private void loadOutputs() {
-    for (Map<String, Object> map : outputConfigList) {
+    for (Map<String, Object> map : inputConfigHolder.getOutputConfigList()) {
       if (map == null) {
+        logger.warn("Output map is empty. Skipping...");
         continue;
       }
-      mergeBlocks(globalConfigs, map);
+      BlockMerger.mergeBlocks(globalConfigs, map);
 
       String value = (String) map.get("destination");
       if (StringUtils.isEmpty(value)) {
@@ -274,182 +260,32 @@ private void loadOutputs() {
       // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
       if (output.isEnabled()) {
         output.logConfigs();
-        outputManager.add(output);
+        inputConfigHolder.getOutputManager().add(output);
       } else {
         logger.info("Output is disabled. So ignoring it. " + output.getShortDescription());
       }
     }
   }
 
-  private void loadInputs(String serviceName) {
-    for (InputDescriptor inputDescriptor : inputConfigList) {
-      if (inputDescriptor == null) {
-        continue;
-      }
-
-      String source = (String) inputDescriptor.getSource();
-      if (StringUtils.isEmpty(source)) {
-        logger.error("Input block doesn't have source element");
-        continue;
-      }
-      Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT);
-      if (input == null) {
-        logger.error("Input object could not be found");
-        continue;
-      }
-      input.setType(source);
-      input.setLogType(inputDescriptor.getType());
-      input.loadConfig(inputDescriptor);
-
-      if (input.isEnabled()) {
-        input.setOutputManager(outputManager);
-        input.setInputManager(inputManager);
-        inputManager.add(serviceName, input);
-        input.logConfigs();
-      } else {
-        logger.info("Input is disabled. So ignoring it. " + input.getShortDescription());
-      }
-    }
-  }
-
-  private void loadFilters(String serviceName) {
-    sortFilters();
-
-    List<Input> toRemoveInputList = new ArrayList<Input>();
-    for (Input input : inputManager.getInputList(serviceName)) {
-      for (FilterDescriptor filterDescriptor : filterConfigList) {
-        if (filterDescriptor == null) {
-          continue;
-        }
-        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled");
-          continue;
-        }
-        if (!input.isFilterRequired(filterDescriptor)) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription());
-          continue;
-        }
-
-        String value = filterDescriptor.getFilter();
-        if (StringUtils.isEmpty(value)) {
-          logger.error("Filter block doesn't have filter element");
-          continue;
-        }
-        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER);
-        if (filter == null) {
-          logger.error("Filter object could not be found");
-          continue;
-        }
-        filter.loadConfig(filterDescriptor);
-        filter.setInput(input);
-
-        filter.setOutputManager(outputManager);
-        input.addFilter(filter);
-        filter.logConfigs();
-      }
-
-      if (input.getFirstFilter() == null) {
-        toRemoveInputList.add(input);
-      }
-    }
-
-    for (Input toRemoveInput : toRemoveInputList) {
-      logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
-      inputManager.removeInput(toRemoveInput);
-    }
-  }
-
-  private void sortFilters() {
-    Collections.sort(filterConfigList, (o1, o2) -> {
-      Integer o1Sort = o1.getSortOrder();
-      Integer o2Sort = o2.getSortOrder();
-      if (o1Sort == null || o2Sort == null) {
-        return 0;
-      }
-
-      return o1Sort - o2Sort;
-    });
-  }
-
-  private void assignOutputsToInputs(String serviceName) {
-    Set<Output> usedOutputSet = new HashSet<Output>();
-    for (Input input : inputManager.getInputList(serviceName)) {
-      for (Output output : outputManager.getOutputs()) {
-        if (input.isOutputRequired(output)) {
-          usedOutputSet.add(output);
-          input.addOutput(output);
-        }
-      }
-    }
-
-    // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
-    for (Output output : InputSimulate.getSimulateOutputs()) {
-      output.setLogSearchConfig(logSearchConfig);
-      outputManager.add(output);
-      usedOutputSet.add(output);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
-    for (String key : fromMap.keySet()) {
-      Object objValue = fromMap.get(key);
-      if (objValue == null) {
-        continue;
-      }
-      if (objValue instanceof Map) {
-        Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
-
-        Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
-        if (localFields == null) {
-          localFields = new HashMap<String, Object>();
-          toMap.put(key, localFields);
-        }
-
-        if (globalFields != null) {
-          for (String fieldKey : globalFields.keySet()) {
-            if (!localFields.containsKey(fieldKey)) {
-              localFields.put(fieldKey, globalFields.get(fieldKey));
-            }
-          }
-        }
-      }
-    }
+  private void simulateIfNeeded() throws Exception {
+    int simulatedInputNumber = inputConfigHolder.getLogFeederProps().getInputSimulateConfig().getSimulateInputNumber();
+    if (simulatedInputNumber == 0)
+      return;
 
-    // Let's add the rest of the top level fields if missing
-    for (String key : fromMap.keySet()) {
-      if (!toMap.containsKey(key)) {
-        toMap.put(key, fromMap.get(key));
-      }
+    InputConfigImpl simulateInputConfig = new InputConfigImpl();
+    List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
+    simulateInputConfig.setInput(inputConfigDescriptors);
+    simulateInputConfig.setFilter(new ArrayList<FilterDescriptorImpl>());
+    for (int i = 0; i < simulatedInputNumber; i++) {
+      InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
+      inputDescriptor.setSource("simulate");
+      inputDescriptor.setRowtype("service");
+      inputDescriptor.setAddFields(new HashMap<String, String>());
+      inputConfigDescriptors.add(inputDescriptor);
     }
-  }
-
-  public void cleanCheckPointFiles() {
-    inputManager.getCheckpointHandler().cleanupCheckpoints();
-  }
-
-  public void logStats() {
-    inputManager.logStats();
-    outputManager.logStats();
-  }
 
-  public void addMetrics(List<MetricData> metricsList) {
-    inputManager.addMetricsContainers(metricsList);
-    outputManager.addMetricsContainers(metricsList);
-  }
-
-  @PreDestroy
-  public void close() {
-    inputManager.close();
-    outputManager.close();
-    inputManager.checkInAll();
-  }
-
-  public void setInputManager(InputManager inputManager) {
-    this.inputManager = inputManager;
-  }
+    loadInputConfigs("Simulation", simulateInputConfig);
 
-  public void setOutputManager(OutputManager outputManager) {
-    this.outputManager = outputManager;
+    simulateMode = true;
   }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java
new file mode 100644
index 0000000000..2e80d0d12d
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/InputConfigHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager.operations;
+
+import org.apache.ambari.logfeeder.manager.InputConfigHolder;;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+
+/**
+ * Holds operations regarding input config handling. (init configs, load input configs and assign inputs to outputs)
+ */
+public interface InputConfigHandler {
+
+  /**
+   * Initialization step before loading inputs/filter/outputs
+   * @param inputConfigHolder object that holds input/filter/output configuration details
+   * @throws Exception error during initialization
+   */
+  void init(InputConfigHolder inputConfigHolder) throws Exception;
+
+  /**
+   * Step during input/filter configurations initialization
+   * @param serviceName group of input configs
+   * @param inputConfigHolder object that holds input/filter/output configuration details
+   * @param config input/filter config object
+   */
+  void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config);
+
+  /**
+   * Assign inputs to outputs - after inputs/filters/outputs are loaded
+   * @param serviceName group of input configs
+   * @param inputConfigHolder object that holds input/filter/output configuration details
+   * @param config input/filter config object
+   * @throws Exception error during input/output assignment
+   */
+  void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) throws Exception;
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
new file mode 100644
index 0000000000..deb3a91663
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager.operations.impl;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederMode;
+import org.apache.ambari.logfeeder.filter.FilterDummy;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
+import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Holds input/filter/output operations in cloud Log Feeder mode.
+ */
+public class CloudStorageInputConfigHandler implements InputConfigHandler {
+
+  private static final Logger logger = LogManager.getLogger(CloudStorageInputConfigHandler.class);
+
+  @Override
+  public void init(InputConfigHolder inputConfigHolder) {
+    logger.info("Call init of cloud input config handler...");
+  }
+
+  @Override
+  public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) {
+    for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
+      if (inputDescriptor == null) {
+        logger.warn("Input descriptor is smpty. Skipping...");
+        continue;
+      }
+      LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode();
+      if (inputDescriptor instanceof InputSocketDescriptor && LogFeederMode.HYBRID.equals(mode)) {
+        logger.info("Socket input is skipped (won't be sent to cloud storage) in hybrid mode");
+        continue;
+      }
+      String source = inputDescriptor.getSource();
+      if (StringUtils.isEmpty(source)) {
+        logger.error("Input block doesn't have source element");
+        continue;
+      }
+      Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT);
+      if (input == null) {
+        logger.error("Input object could not be found");
+        continue;
+      }
+      input.setType(source);
+      input.setLogType(LogFeederConstants.CLOUD_PREFIX + inputDescriptor.getType());
+      input.loadConfig(inputDescriptor);
+      FilterDummy filter = new FilterDummy();
+      filter.setOutputManager(inputConfigHolder.getOutputManager());
+      input.setFirstFilter(filter);
+      input.setCloudInput(true);
+
+      if (input.isEnabled()) {
+        input.setOutputManager(inputConfigHolder.getOutputManager());
+        input.setInputManager(inputConfigHolder.getInputManager());
+        inputConfigHolder.getInputManager().add(serviceName, input);
+        logger.info("New cloud input object registered for service '{}': '{}'", serviceName, input.getLogType());
+        input.logConfigs();
+      } else {
+        logger.info("Input is disabled. So ignoring it. " + input.getShortDescription());
+      }
+    }
+  }
+
+  @Override
+  public void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) {
+    for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
+      List<Output> outputs = inputConfigHolder.getOutputManager().getOutputs();
+      for (Output output : outputs) {
+        input.addOutput(output);
+      }
+    }
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
new file mode 100644
index 0000000000..44da6319e4
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.manager.operations.impl;
+
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
+import org.apache.ambari.logfeeder.input.InputSimulate;
+import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Holds input/filter/output operations in default Log Feeder mode.
+ */
+public class DefaultInputConfigHandler implements InputConfigHandler {
+
+  private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class);
+
+  @Override
+  public void init(InputConfigHolder inputConfigHolder) throws Exception {
+  }
+
+  @Override
+  public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) {
+    loadInputs(serviceName, inputConfigHolder);
+    loadFilters(serviceName, inputConfigHolder);
+  }
+
+  @Override
+  public void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) {
+    for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
+      for (Output output : inputConfigHolder.getOutputManager().getOutputs()) {
+        if (input.isOutputRequired(output)) {
+          input.addOutput(output);
+        }
+      }
+    }
+
+    // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
+    for (Output output : InputSimulate.getSimulateOutputs()) {
+      output.setLogSearchConfig(inputConfigHolder.getConfig());
+      inputConfigHolder.getOutputManager().add(output);
+    }
+  }
+
+  private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
+    for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
+      if (inputDescriptor == null) {
+        logger.warn("Input descriptor is smpty. Skipping...");
+        continue;
+      }
+
+      String source = inputDescriptor.getSource();
+      if (StringUtils.isEmpty(source)) {
+        logger.error("Input block doesn't have source element");
+        continue;
+      }
+      Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT);
+      if (input == null) {
+        logger.error("Input object could not be found");
+        continue;
+      }
+      input.setType(source);
+      input.setLogType(inputDescriptor.getType());
+      input.loadConfig(inputDescriptor);
+
+      if (input.isEnabled()) {
+        input.setOutputManager(inputConfigHolder.getOutputManager());
+        input.setInputManager(inputConfigHolder.getInputManager());
+        inputConfigHolder.getInputManager().add(serviceName, input);
+        logger.info("New input object registered for service '{}': '{}'", serviceName, input.getLogType());
+        input.logConfigs();
+      } else {
+        logger.info("Input is disabled. So ignoring it. " + input.getShortDescription());
+      }
+    }
+  }
+
+  private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) {
+    sortFilters(inputConfigHolder);
+
+    List<Input> toRemoveInputList = new ArrayList<>();
+    for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
+      for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
+        if (filterDescriptor == null) {
+          logger.warn("Filter descriptor is smpty. Skipping...");
+          continue;
+        }
+        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
+          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled");
+          continue;
+        }
+        if (!input.isFilterRequired(filterDescriptor)) {
+          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription());
+          continue;
+        }
+
+        String value = filterDescriptor.getFilter();
+        if (StringUtils.isEmpty(value)) {
+          logger.error("Filter block doesn't have filter element");
+          continue;
+        }
+        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER);
+        if (filter == null) {
+          logger.error("Filter object could not be found");
+          continue;
+        }
+        filter.loadConfig(filterDescriptor);
+        filter.setInput(input);
+
+        filter.setOutputManager(inputConfigHolder.getOutputManager());
+        input.addFilter(filter);
+        filter.logConfigs();
+      }
+
+      if (input.getFirstFilter() == null) {
+        toRemoveInputList.add(input);
+      }
+    }
+
+    for (Input toRemoveInput : toRemoveInputList) {
+      logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
+      inputConfigHolder.getInputManager().removeInput(toRemoveInput);
+    }
+  }
+
+  private void sortFilters(InputConfigHolder inputConfigHolder) {
+    Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
+      Integer o1Sort = o1.getSortOrder();
+      Integer o2Sort = o2.getSortOrder();
+      if (o1Sort == null || o2Sort == null) {
+        return 0;
+      }
+
+      return o1Sort - o2Sort;
+    });
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
index e72fd4313f..bc1510af3d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
@@ -18,7 +18,7 @@
  */
 package org.apache.ambari.logfeeder.metrics;
 
-import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logfeeder.manager.InputConfigManager;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -36,15 +36,15 @@
 
   private long lastCheckPointCleanedMS = 0;
 
-  @Inject
-  private ConfigHandler configHandler;
+  private final InputConfigManager inputConfigManager;
 
   @Inject
   private MetricsManager metricsManager;
 
-  public StatsLogger() {
-    super("statLogger");
+  public StatsLogger(String name, InputConfigManager inputConfigManager) {
+    super(name);
     setDaemon(true);
+    this.inputConfigManager = inputConfigManager;
   }
 
   @PostConstruct
@@ -68,16 +68,16 @@ public void run() {
 
       if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
         lastCheckPointCleanedMS = System.currentTimeMillis();
-        configHandler.cleanCheckPointFiles();
+        inputConfigManager.cleanCheckPointFiles();
       }
     }
   }
 
   private void logStats() {
-    configHandler.logStats();
+    inputConfigManager.logStats();
     if (metricsManager.isMetricsEnabled()) {
       List<MetricData> metricsList = new ArrayList<MetricData>();
-      configHandler.addMetrics(metricsList);
+      inputConfigManager.addMetrics(metricsList);
       metricsManager.useMetrics(metricsList);
     }
   }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index 68db96ab58..afe1c0af95 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -72,6 +72,7 @@ public void add(Output output) {
   @SuppressWarnings("unchecked")
   @Override
   public void init() throws Exception {
+    logger.info("Called init with default output manager.");
     for (Output output : outputs) {
       output.init(logFeederProps);
     }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
new file mode 100644
index 0000000000..871ae93d2b
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+
+/**
+ * Class for creating the right cloud storage outputs based on global Log Feeder configurations
+ * TODO !!!
+ */
+public class CloudStorageFactory {
+
+  public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
+    return new HDFSOutput();
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
new file mode 100644
index 0000000000..561b1413a9
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+
+/**
+ * Class to handle common operations for cloud storage outputs
+ * TODO !!!
+ */
+public abstract class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
new file mode 100644
index 0000000000..4994eb716e
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handle output operations for sending cloud inputs to a cloud storage destination
+ * TODO !!!
+ */
+public class CloudStorageOutputManager extends OutputManager {
+
+  private static final Logger logger = LogManager.getLogger(CloudStorageOutputManager.class);
+
+  @Inject
+  private LogFeederProps logFeederProps;
+
+  private CloudStorageOutput storageOutput = null;
+
+  private List<Output> outputList = new ArrayList<>();
+
+  @Override
+  public void write(Map<String, Object> jsonObj, InputMarker marker) {
+    // TODO: make sense to implement this if we will support filters before calling cloud outputs
+  }
+
+  @Override
+  public void write(String line, InputMarker marker) {
+    logger.info("Output: {}", line);
+    try {
+      storageOutput.write(line, marker);
+    } catch (Exception e) {
+
+    }
+  }
+
+  @Override
+  public void copyFile(File file, InputMarker marker) {
+
+  }
+
+  @Override
+  public void add(Output output) {
+    this.outputList.add(output);
+  }
+
+  @Override
+  public List<Output> getOutputs() {
+    return this.outputList;
+  }
+
+  @Override
+  public void init() throws Exception {
+    logger.info("Called init with cloud storage output manager.");
+    storageOutput = CloudStorageFactory.createCloudStorageOutput(logFeederProps);
+    storageOutput.init(logFeederProps);
+    add(storageOutput);
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void logStats() {
+
+  }
+
+  @Override
+  public void addMetricsContainers(List<MetricData> metricsList) {
+
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
new file mode 100644
index 0000000000..24edb4109f
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+
+/**
+ * HDFS cloud storage output (on-prem)
+ * TODO !!!
+ */
+public class HDFSOutput extends CloudStorageOutput {
+
+  private Logger logger = LogManager.getLogger(HDFSOutput.class);
+
+  @Override
+  public String getOutputType() {
+    return null;
+  }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker) throws Exception {
+  }
+
+  @Override
+  public void write(String line, InputMarker inputMarker) throws Exception {
+    inputMarker.getInput().checkIn(inputMarker);
+  }
+
+  @Override
+  public Long getPendingCount() {
+    return null;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+    return null;
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProperties) throws Exception {
+    logger.info("Initialize on-prem HDFS output");
+  }
+
+  @Override
+  public String getShortDescription() {
+    return null;
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return null;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 0fb1058505..06c95f3bc0 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -38,3 +38,6 @@ logfeeder.tmp.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/tmp
 
 #logfeeder.configs.local.enabled=true
 #logfeeder.configs.filter.solr.enabled=true
+#logfeeder.docker.registry.enabled=true
+logfeeder.cloud.storage.mode=default
+#logfeeder.cloud.storage.mode=cloud
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
index 63799e316d..6674be11be 100644
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -21,7 +21,6 @@
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -64,7 +63,6 @@ public void setupConfiguration() {
     }
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
 
@@ -83,7 +81,6 @@ public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
     assertTrue(outputS3File.shouldRollover(logSpoolerContext));
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldNotRolloverBeforeSufficientSizeIsReached() throws Exception {
     String thresholdSize = Long.toString(15 * 1024 * 1024L);
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
index facc77f687..e070545ef0 100644
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
+++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
@@ -37,7 +37,6 @@
   public static final String ACCESS_KEY_VALUE = "accessKeyValue";
   public static final String SECRET_KEY_VALUE = "secretKeyValue";
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldUploadToS3ToRightBucket() {
     File fileToUpload = mock(File.class);
@@ -66,7 +65,6 @@ protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3
     assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldCleanupLocalFilesOnSuccessfulUpload() {
     File fileToUpload = mock(File.class);
@@ -96,7 +94,6 @@ protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3
     verify(compressedFile);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldNotCleanupUncompressedFileIfNotRequired() {
     File fileToUpload = mock(File.class);
@@ -124,7 +121,6 @@ protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3
     verify(compressedFile);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldExpandVariablesInPath() {
     File fileToUpload = mock(File.class);
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
index 4a7b9b0a7d..2cfe9ff674 100644
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
+++ b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
@@ -22,7 +22,6 @@
 import org.easymock.LogicalOperator;
 import org.easymock.Mock;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -56,7 +55,6 @@ public void setup() {
     spoolDirectory = testFolder.getRoot().getAbsolutePath();
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldSpoolEventToFile() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
@@ -93,7 +91,6 @@ private File setupInputFileExpectations() {
     return mockFile;
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldIncrementSpooledEventsCount() {
 
@@ -126,7 +123,6 @@ protected File initializeSpoolFile() {
     verify(rolloverCondition);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldCloseCurrentSpoolFileOnRollOver() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
@@ -161,7 +157,6 @@ protected File initializeSpoolFile() {
     verify(spoolWriter);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldReinitializeFileOnRollover() {
     final PrintWriter spoolWriter1 = mock(PrintWriter.class);
@@ -217,7 +212,6 @@ protected File initializeSpoolFile() {
     verify(spoolWriter1, spoolWriter2, rolloverCondition);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldCallRolloverHandlerOnRollover() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
@@ -255,7 +249,6 @@ protected File initializeSpoolFile() {
   // Rollover twice - the second rollover should work if the "rolloverInProgress"
   // flag is being reset correctly. Third file expectations being setup due
   // to auto-initialization.
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldResetRolloverInProgressFlag() {
     final PrintWriter spoolWriter1 = mock(PrintWriter.class);
@@ -329,7 +322,6 @@ protected File initializeSpoolFile() {
     verify(spoolWriter1, spoolWriter2, rolloverCondition);
   }
 
-  @Ignore("Until EasyMock 3.7 upgrade - waiting for release")
   @Test
   public void shouldNotRolloverZeroLengthFiles() {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
diff --git a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java
index 92c2b3257f..54ebf457ba 100644
--- a/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java
+++ b/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/LogSearch.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.logsearch;
 
 import org.springframework.boot.Banner;
+import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.data.rest.RepositoryRestMvcAutoConfiguration;
 import org.springframework.boot.autoconfigure.data.solr.SolrRepositoriesAutoConfiguration;
@@ -47,7 +48,7 @@ public static void main(String[] args) {
     new SpringApplicationBuilder(LogSearch.class)
       .bannerMode(Banner.Mode.OFF)
       .listeners(new ApplicationPidFileWriter(pidFile))
-      .web(true)
+      .web(WebApplicationType.SERVLET)
       .run(args);
   }
 
diff --git a/docker/test-config/logfeeder/logfeeder.properties b/docker/test-config/logfeeder/logfeeder.properties
index 8371170977..ffdb0610fe 100644
--- a/docker/test-config/logfeeder/logfeeder.properties
+++ b/docker/test-config/logfeeder/logfeeder.properties
@@ -34,4 +34,5 @@ logfeeder.solr.core.config.name=history
 #logfeeder.solr.urls=http://solr:8983/solr
 #logfeeder.configs.local.enabled=true
 #logfeeder.configs.filter.solr.enabled=true
-#logfeeder.configs.filter.zk.enabled=true
\ No newline at end of file
+#logfeeder.configs.filter.zk.enabled=true
+#logfeeder.cloud.storage.mode=hybrid
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services