You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/12 13:52:44 UTC

[ambari-logsearch] branch master updated: AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs (#19)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bd96e1  AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs (#19)
2bd96e1 is described below

commit 2bd96e130795d1a1b7961ad5cc695487df05af07
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Nov 12 14:52:40 2018 +0100

    AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs (#19)
    
    * AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs
    
    * AMBARI-24833. Use aws/hadoop versions as properties
    
    * Fix typo
    
    * AMBARI-24833. Change logfeeder docker config back to default
    
    * AMBARI-24833. Review changes.
---
 .../ambari/logfeeder/plugin/input/Input.java       |   3 +
 .../logfeeder/plugin/manager/OutputManager.java    |   7 +
 .../ambari/logfeeder/plugin/output/Output.java     |   2 +-
 ambari-logsearch-logfeeder/pom.xml                 |  40 ++-
 .../logfeeder/common/LogFeederConstants.java       |  35 ++
 .../logfeeder/conf/CloudStorageDestination.java}   |  31 +-
 .../ambari/logfeeder/conf/LogFeederProps.java      | 108 +++++-
 .../ambari/logfeeder/conf/output/BucketConfig.java |  46 +++
 .../output/CloudStorageOutputConfig.java}          |  35 +-
 .../logfeeder/conf/output/HdfsOutputConfig.java    | 138 ++++++++
 .../output/OutputConfig.java}                      |  18 +-
 .../logfeeder/conf/output/RolloverConfig.java      | 136 ++++++++
 .../logfeeder/conf/output/S3OutputConfig.java      | 327 ++++++++++++++++++
 .../PlainTextSecretStore.java}                     |  20 +-
 .../apache/ambari/logfeeder/input/InputS3File.java | 118 -------
 .../ambari/logfeeder/output/OutputHDFSFile.java    | 269 ---------------
 .../ambari/logfeeder/output/OutputS3File.java      | 279 ---------------
 .../ambari/logfeeder/output/S3LogPathResolver.java |  54 ---
 .../logfeeder/output/S3OutputConfiguration.java    | 121 -------
 .../apache/ambari/logfeeder/output/S3Uploader.java | 166 ---------
 .../output/cloud/CloudStorageLoggerFactory.java    | 107 ++++++
 .../logfeeder/output/cloud/CloudStorageOutput.java | 133 +++++++-
 .../output/cloud/CloudStorageOutputManager.java    |  21 +-
 .../output/cloud/CloudStorageUploader.java         | 100 ++++++
 .../cloud/CustomTimeBasedTriggeringPolicy.java     |  85 +++++
 .../ambari/logfeeder/output/cloud/HDFSOutput.java  |  74 ----
 .../output/cloud/upload/AbstractCloudClient.java   | 135 ++++++++
 .../output/cloud/upload/HDFSS3UploadClient.java    |  78 +++++
 .../output/cloud/upload/HDFSUploadClient.java      |  77 +++++
 .../output/cloud/upload/S3UploadClient.java        |  91 +++++
 .../SecretKeyPair.java}                            |  23 +-
 .../output/cloud/upload/UploadClient.java          |  52 +++
 .../output/cloud/upload/UploadClientFactory.java   |  53 +++
 .../ambari/logfeeder/output/spool/LogSpooler.java  | 210 ------------
 .../logfeeder/output/spool/LogSpoolerContext.java  |  85 -----
 .../output/spool/LogSpoolerException.java          |  29 --
 .../logfeeder/output/spool/RolloverCondition.java  |  36 --
 .../logfeeder/output/spool/RolloverHandler.java    |  40 ---
 .../ambari/logfeeder/util/LogFeederHDFSUtil.java   |  42 ++-
 .../org/apache/ambari/logfeeder/util/S3Util.java   | 140 --------
 .../src/main/resources/alias_config.json           |   9 -
 .../src/main/resources/logfeeder.properties        |  23 +-
 .../ambari/logfeeder/output/OutputS3FileTest.java  | 100 ------
 .../logfeeder/output/S3LogPathResolverTest.java    |  52 ---
 .../ambari/logfeeder/output/S3UploaderTest.java    | 164 ---------
 .../logfeeder/output/spool/LogSpoolerTest.java     | 374 ---------------------
 ambari-logsearch-server/pom.xml                    |   2 +-
 docker/cloud-docker-compose.yml                    |   3 +
 docker/test-config/logfeeder/logfeeder.properties  |  13 +-
 pom.xml                                            |   9 +-
 50 files changed, 1892 insertions(+), 2421 deletions(-)

diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 9ee4533..be1e793 100644
--- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -232,6 +232,9 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
    */
   public void close() {
     logger.info("Close called. " + getShortDescription());
+    if (getOutputManager() != null) {
+      getOutputManager().release(this);
+    }
     try {
       if (firstFilter != null) {
         firstFilter.close();
diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
index 7ab8825..12f00b9 100644
--- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
+++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ambari.logfeeder.plugin.manager;
 
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.plugin.output.Output;
 
@@ -63,4 +64,10 @@ public abstract class OutputManager implements BlockManager {
    */
   public abstract List<Output> getOutputs();
 
+  /**
+   * Release an input (can be used for cleanup) - by default it won't do anything, override this if needed
+   * @param input holds input object - in order to gather unique details
+   */
+  public void release(Input input) {
+  }
 }
diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
index 73caf68..cc2e5e5 100644
--- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
+++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
@@ -166,7 +166,7 @@ public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
   }
 
   public void close() {
-    LOG.info("Calling base close()." + getShortDescription());
+    LOG.info("Calling base close() = " + getShortDescription());
     isClosed = true;
   }
 }
diff --git a/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch-logfeeder/pom.xml
index 71cf853..e71b3cc 100644
--- a/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch-logfeeder/pom.xml
@@ -35,6 +35,8 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <spring.version>5.1.2.RELEASE</spring.version>
     <spring-boot.version>2.1.0.RELEASE</spring-boot.version>
+    <aws-sdk.version>1.11.445</aws-sdk.version>
+    <gcs-connector.version>hadoop3-1.9.10</gcs-connector.version>
   </properties>
 
   <dependencies>
@@ -122,7 +124,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>25.0-jre</version>
+      <version>27.0-jre</version>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
@@ -145,9 +147,34 @@
       <version>${ambari-metrics.version}</version>
     </dependency>
     <dependency>
-      <groupId>io.minio</groupId>
-      <artifactId>minio</artifactId>
-      <version>5.0.1</version>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-core</artifactId>
+      <version>${aws-sdk.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-s3</artifactId>
+      <version>${aws-sdk.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-azure</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-azure-datalake</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>gcs-connector</artifactId>
+      <version>${gcs-connector.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
@@ -253,13 +280,12 @@
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
     </dependency>
-    <!-- Exclude jars globally-->
     <dependency>
       <groupId>commons-beanutils</groupId>
       <artifactId>commons-beanutils</artifactId>
-      <version>1.7.0</version>
-      <scope>provided</scope>
+      <version>1.9.3</version>
     </dependency>
+    <!-- Exclude jars globally-->
     <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index a9790b2..f528c45 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -110,5 +110,40 @@ public class LogFeederConstants {
   public static final String SOLR_URLS = "logfeeder.solr.urls";
 
   public static final String CLOUD_STORAGE_MODE = "logfeeder.cloud.storage.mode";
+  public static final String CLOUD_STORAGE_DESTINATION = "logfeeder.cloud.storage.destination";
+  public static final String CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN = "logfeeder.cloud.storage.upload.on.shutdown";
+  public static final String CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS = "logfeeder.cloud.storage.uploader.interval.seconds";
+  public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
+  public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = "logfeeder.cloud.storage.use.hdfs.client";
+
+  public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_MIN = "logfeeder.cloud.rollover.threshold.min";
+  public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE = "logfeeder.cloud.rollover.threshold.size";
+  public static final String CLOUD_ROLLOVER_USE_GZIP = "logfeeder.cloud.rollover.use.gzip";
+  public static final String CLOUD_ROLLOVER_IMMEDIATE_FLUSH = "logfeeder.cloud.rollover.immediate.flush";
+  public static final String CLOUD_ROLLOVER_ON_SHUTDOWN = "logfeeder.cloud.rollover.on.shutdown";
+  public static final String CLOUD_ROLLOVER_ON_STARTUP = "logfeeder.cloud.rollover.on.startup";
+
+  public static final String HDFS_HOST = "logfeeder.hdfs.host";
+  public static final String HDFS_PORT = "logfeeder.hdfs.port";
+  public static final String HDFS_USER = "logfeeder.hdfs.user";
+  public static final String HDFS_OUTPUT_BASE_DIR = "logfeeder.hdfs.output.base.dir";
+  public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
+  public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
+
+  public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
+  public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
+  public static final String S3_REGION = "logfeeder.s3.region";
+  public static final String S3_BUCKET = "logfeeder.s3.bucket";
+  public static final String S3_OBJECT_ACL = "logfeeder.s3.object.acl";
+  public static final String S3_PATH_STYLE_ACCESS = "logfeeder.s3.path.style.access";
+  public static final String S3_MULTIOBJECT_DELETE_ENABLE = "logfeeder.s3.multiobject.delete.enable";
+  public static final String S3_SECRET_KEY = "logfeeder.s3.secret.key";
+  public static final String S3_ACCESS_KEY = "logfeeder.s3.access.key";
+  public static final String S3_SECRET_KEY_FILE = "logfeeder.s3.secret.key.file";
+  public static final String S3_ACCESS_KEY_FILE = "logfeeder.s3.access.key.file";
+  public static final String S3_USE_FILE = "logfeeder.s3.credentials.file.enabled";
+  public static final String S3_USE_HADOOP_CREDENTIAL_PROVIDER = "logfeeder.s3.credentials.hadoop.enabled";
+  public static final String S3_HADOOP_CREDENTIAL_SECRET_REF = "logfeeder.s3.credentials.hadoop.secret.ref";
+  public static final String S3_HADOOP_CREDENTIAL_ACCESS_REF = "logfeeder.s3.credentials.hadoop.access.ref";
 
 }
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java
similarity index 52%
rename from ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java
index 51c34a2..1a7eafa 100644
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,29 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.util;
+package org.apache.ambari.logfeeder.conf;
 
-import static org.junit.Assert.assertEquals;
+public enum CloudStorageDestination {
+  HDFS("hdfs"), S3("s3"), GCS("gcs"), ADLS("adls"), NONE("none");
 
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.junit.Test;
+  private String text;
 
-public class S3UtilTest {
-
-  @Test
-  public void testS3Util_pathToBucketName() throws Exception {
-    String s3Path = "s3://bucket_name/path/file.txt";
-    String expectedBucketName = "bucket_name";
-    String actualBucketName = S3Util.getBucketName(s3Path);
-    assertEquals(expectedBucketName, actualBucketName);
+  CloudStorageDestination(String text) {
+    this.text = text;
   }
 
-  @Test
-  public void testS3Util_pathToS3Key() throws Exception {
-    String s3Path = "s3://bucket_name/path/file.txt";
-    String expectedS3key = "path/file.txt";
-    String actualS3key = S3Util.getS3Key(s3Path);
-    assertEquals(expectedS3key, actualS3key);
+  public String getText() {
+    return this.text;
   }
-
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index dc1bfd2..e89f7f4 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,6 +19,9 @@
 package org.apache.ambari.logfeeder.conf;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
+import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
 import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
 import org.apache.commons.lang.StringUtils;
@@ -43,6 +46,15 @@ public class LogFeederProps implements LogFeederProperties {
   @Inject
   private Environment env;
 
+  @Inject
+  private HdfsOutputConfig hdfsOutputConfig;
+
+  @Inject
+  private S3OutputConfig s3OutputConfig;
+
+  @Inject
+  private RolloverConfig rolloverConfig;
+
   private Properties properties;
 
   @LogSearchPropertyDescription(
@@ -209,6 +221,45 @@ public class LogFeederProps implements LogFeederProperties {
   @Value("${" + LogFeederConstants.CLOUD_STORAGE_MODE + ":default}")
   public LogFeederMode cloudStorageMode;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_DESTINATION,
+    description = "Type of storage that is the destination for cloud output logs.",
+    examples = {"hdfs", "s3", "gcs", "adls", "none"},
+    sources = {LogFeederConstants.CLOUD_STORAGE_DESTINATION}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_DESTINATION + ":none}")
+  private CloudStorageDestination cloudStorageDestination;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN,
+    description = "Try to upload archived files on shutdown",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN + ":false}")
+  public boolean cloudStorageUploadOnShutdown;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS,
+    description = "Second interval, that is used to check against there are any files to upload to cloud storage or not.",
+    examples = {"10"},
+    defaultValue = "60",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS + ":60}")
+  public Integer cloudStorageUploaderIntervalSeconds;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT,
+    description = "Use hdfs client with cloud connectors instead of the core clients for shipping data to cloud storage",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
+  public boolean useCloudHdfsClient;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -370,6 +421,62 @@ public class LogFeederProps implements LogFeederProperties {
     this.cloudStorageMode = cloudStorageMode;
   }
 
+  public HdfsOutputConfig getHdfsOutputConfig() {
+    return hdfsOutputConfig;
+  }
+
+  public S3OutputConfig getS3OutputConfig() {
+    return s3OutputConfig;
+  }
+
+  public void setS3OutputConfig(S3OutputConfig s3OutputConfig) {
+    this.s3OutputConfig = s3OutputConfig;
+  }
+
+  public RolloverConfig getRolloverConfig() {
+    return rolloverConfig;
+  }
+
+  public void setRolloverConfig(RolloverConfig rolloverConfig) {
+    this.rolloverConfig = rolloverConfig;
+  }
+
+  public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
+    this.hdfsOutputConfig = hdfsOutputConfig;
+  }
+
+  public CloudStorageDestination getCloudStorageDestination() {
+    return cloudStorageDestination;
+  }
+
+  public void setCloudStorageDestination(CloudStorageDestination cloudStorageDestination) {
+    this.cloudStorageDestination = cloudStorageDestination;
+  }
+
+  public boolean isCloudStorageUploadOnShutdown() {
+    return cloudStorageUploadOnShutdown;
+  }
+
+  public void setCloudStorageUploadOnShutdown(boolean cloudStorageUploadOnShutdown) {
+    this.cloudStorageUploadOnShutdown = cloudStorageUploadOnShutdown;
+  }
+
+  public Integer getCloudStorageUploaderIntervalSeconds() {
+    return cloudStorageUploaderIntervalSeconds;
+  }
+
+  public void setCloudStorageUploaderIntervalSeconds(Integer cloudStorageUploaderIntervalSeconds) {
+    this.cloudStorageUploaderIntervalSeconds = cloudStorageUploaderIntervalSeconds;
+  }
+
+  public boolean isUseCloudHdfsClient() {
+    return useCloudHdfsClient;
+  }
+
+  public void setUseCloudHdfsClient(boolean useCloudHdfsClient) {
+    this.useCloudHdfsClient = useCloudHdfsClient;
+  }
+
   public String[] getSolrUrls() {
     if (StringUtils.isNotBlank(this.solrUrlsStr)) {
       return this.solrUrlsStr.split(",");
@@ -392,5 +499,4 @@ public class LogFeederProps implements LogFeederProperties {
       throw new IllegalArgumentException("Cannot find logfeeder.properties on the classpath");
     }
   }
-
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java
new file mode 100644
index 0000000..7432cdd
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class BucketConfig {
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_BUCKET_BOOTSTRAP,
+    description = "Create bucket on startup.",
+    examples = {"false"},
+    defaultValue = "true",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.CLOUD_STORAGE_BUCKET_BOOTSTRAP + ":false}")
+  private boolean createBucketOnStartup;
+
+  public boolean isCreateBucketOnStartup() {
+    return createBucketOnStartup;
+  }
+
+  public void setCreateBucketOnStartup(boolean createBucketOnStartup) {
+    this.createBucketOnStartup = createBucketOnStartup;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
similarity index 58%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
index 871ae93..6603a5d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
@@ -16,17 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.output.cloud;
+package org.apache.ambari.logfeeder.conf.output;
 
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+public interface CloudStorageOutputConfig extends OutputConfig {
+  String getSecretKey();
 
-/**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
- */
-public class CloudStorageFactory {
+  String getAccessKey();
+
+  String getSecretKeyFileLocation();
+
+  String getAccessKeyFileLocation();
+
+  boolean isUseFileSecrets();
+
+  boolean isUseHadoopCredentialStorage();
+
+  String getSecretKeyHadoopCredentialReference();
+
+  String getAccessKeyHadoopCredentialReference();
+
+  String getAccessKeyProperty();
+
+  String getSecretKeyProperty();
+
+  String getAccessKeyEnvVariable();
+
+  String getSecretKeyEnvVariable();
 
-  public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
-    return new HDFSOutput();
-  }
+  String getDescription();
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
new file mode 100644
index 0000000..e81ce7a
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class HdfsOutputConfig implements OutputConfig {
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_HOST,
+    description = "HDFS Name Node host.",
+    examples = {"mynamenodehost"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_HOST + ":}")
+  private String hdfsHost;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_PORT,
+    description = "HDFS Name Node port",
+    examples = {"9000"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_PORT + ":}")
+  private Integer hdfsPort;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_FILE_PERMISSIONS,
+    description = "Default permissions for created files on HDFS",
+    examples = {"600"},
+    defaultValue = "640",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}")
+  private String hdfsFilePermissions;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_USER,
+    description = "Overrides HADOOP_USER_NAME variable at runtime",
+    examples = {"hdfs"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
+  private String hdfsUser;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_OUTPUT_BASE_DIR,
+    description = "HDFS base directory for uploading files",
+    examples = {"/my/path/on/hdfs"},
+    defaultValue = "/user/hdfs",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_OUTPUT_BASE_DIR + ":/user/hdfs}")
+  private String hdfsOutputDir;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS,
+    description = "Enable kerberos support for HDFS",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
+  private boolean secure;
+
+  public String getHdfsHost() {
+    return hdfsHost;
+  }
+
+  public void setHdfsHost(String hdfsHost) {
+    this.hdfsHost = hdfsHost;
+  }
+
+  public Integer getHdfsPort() {
+    return hdfsPort;
+  }
+
+  public void setHdfsPort(Integer hdfsPort) {
+    this.hdfsPort = hdfsPort;
+  }
+
+  public String getHdfsFilePermissions() {
+    return hdfsFilePermissions;
+  }
+
+  public void setHdfsFilePermissions(String hdfsFilePermissions) {
+    this.hdfsFilePermissions = hdfsFilePermissions;
+  }
+
+  public String getHdfsUser() {
+    return hdfsUser;
+  }
+
+  public void setHdfsUser(String hdfsUser) {
+    this.hdfsUser = hdfsUser;
+  }
+
+  public String getHdfsOutputDir() {
+    return hdfsOutputDir;
+  }
+
+  public void setHdfsOutputDir(String hdfsOutputDir) {
+    this.hdfsOutputDir = hdfsOutputDir;
+  }
+
+  public boolean isSecure() {
+    return secure;
+  }
+
+  public void setSecure(boolean secure) {
+    this.secure = secure;
+  }
+
+  @Override
+  public String getOutputBasePath() {
+    return this.hdfsOutputDir;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
similarity index 68%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
index 871ae93..0b99e03 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.output.cloud;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+package org.apache.ambari.logfeeder.conf.output;
 
 /**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
+ * Output config marker interface
  */
-public class CloudStorageFactory {
+public interface OutputConfig {
+
+  /**
+   * Holds output destiation for a storage, it can act as a base output directory or a bucket
+   * @return output directory or output bucket
+   */
+  String getOutputBasePath();
 
-  public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
-    return new HDFSOutput();
-  }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java
new file mode 100644
index 0000000..d447838
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RolloverConfig {
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_MIN,
+    description = "Rollover cloud log files after an interval (minutes)",
+    examples = {"1"},
+    defaultValue = "60",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_MIN + ":60}")
+  private int rolloverThresholdTimeMins;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE,
+    description = "Rollover cloud log files after the log file size reach this limit",
+    examples = {"1024KB"},
+    defaultValue = "80MB",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE + ":80MB}")
+  private String rolloverSize;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_ROLLOVER_USE_GZIP,
+    description = "Use GZip on archived logs.",
+    examples = {"false"},
+    defaultValue = "true",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_USE_GZIP + ":true}")
+  private boolean useGzip;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_ROLLOVER_IMMEDIATE_FLUSH,
+    description = "Immediately flush cloud logs (to active location).",
+    examples = {"false"},
+    defaultValue = "true",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_IMMEDIATE_FLUSH + ":false}")
+  private boolean immediateFlush;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_ROLLOVER_ON_SHUTDOWN,
+    description = "Rollover log files on shutdown",
+    examples = {"false"},
+    defaultValue = "true",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_ON_SHUTDOWN + ":false}")
+  private boolean rolloverOnShutdown;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_ROLLOVER_ON_STARTUP,
+    description = "Rollover log files on startup",
+    examples = {"false"},
+    defaultValue = "true",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_ON_STARTUP + ":false}")
+  private boolean rolloverOnStartup;
+
+  public int getRolloverThresholdTimeMins() {
+    return rolloverThresholdTimeMins;
+  }
+
+  public void setRolloverThresholdTimeMins(int rolloverThresholdTimeMins) {
+    this.rolloverThresholdTimeMins = rolloverThresholdTimeMins;
+  }
+
+  public String getRolloverSize() {
+    return rolloverSize;
+  }
+
+  public void setRolloverSize(String rolloverSize) {
+    this.rolloverSize = rolloverSize;
+  }
+
+  public boolean isUseGzip() {
+    return useGzip;
+  }
+
+  public void setUseGzip(boolean useGzip) {
+    this.useGzip = useGzip;
+  }
+
+  public boolean isImmediateFlush() {
+    return immediateFlush;
+  }
+
+  public void setImmediateFlush(boolean immediateFlush) {
+    this.immediateFlush = immediateFlush;
+  }
+
+  public boolean isRolloverOnShutdown() {
+    return rolloverOnShutdown;
+  }
+
+  public void setRolloverOnShutdown(boolean rolloverOnShutdown) {
+    this.rolloverOnShutdown = rolloverOnShutdown;
+  }
+
+  public boolean isRolloverOnStartup() {
+    return rolloverOnStartup;
+  }
+
+  public void setRolloverOnStartup(boolean rolloverOnStartup) {
+    this.rolloverOnStartup = rolloverOnStartup;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java
new file mode 100644
index 0000000..2da2698
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+import javax.inject.Inject;
+
+@Configuration
+public class S3OutputConfig implements CloudStorageOutputConfig {
+
+  private final BucketConfig bucketConfig;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_ENDPOINT,
+    description = "Amazon S3 endpoint.",
+    examples = {"https://s3.amazonaws.com"},
+    defaultValue = LogFeederConstants.S3_ENDPOINT_DEFAULT,
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_ENDPOINT + ":" + LogFeederConstants.S3_ENDPOINT_DEFAULT +"}")
+  private String endpoint;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_SECRET_KEY,
+    description = "Amazon S3 secret key.",
+    examples = {"MySecretKey"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_SECRET_KEY + ":}")
+  private String secretKey;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_ACCESS_KEY,
+    description = "Amazon S3 secret access key.",
+    examples = {"MySecretAccessKey"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_ACCESS_KEY + ":}")
+  private String accessKey;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_SECRET_KEY_FILE,
+    description = "Amazon S3 secret key file (that contains only the key).",
+    examples = {"/my/path/secret_key"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_ACCESS_KEY + ":}")
+  private String secretKeyFileLocation;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_ACCESS_KEY_FILE,
+    description = "Amazon S3 secret access key file (that contains only the key).",
+    examples = {"/my/path/access_key"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_ACCESS_KEY_FILE + ":}")
+  private String accessKeyFileLocation;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_USE_FILE,
+    description = "Enable to get Amazon S3 secret/access keys from files.",
+    examples = {"true"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_USE_FILE + ":false}")
+  private boolean useFileSecrets;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_USE_HADOOP_CREDENTIAL_PROVIDER,
+    description = "Enable to get Amazon S3 secret/access keys from Hadoop credential store API.",
+    examples = {"true"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_USE_HADOOP_CREDENTIAL_PROVIDER + ":false}")
+  private boolean useHadoopCredentialStorage;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_HADOOP_CREDENTIAL_SECRET_REF,
+    description = "Amazon S3 secret access key reference in Hadoop credential store..",
+    examples = {"logfeeder.s3.secret.key"},
+    defaultValue = "logfeeder.s3.secret.key",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_HADOOP_CREDENTIAL_SECRET_REF + ":logfeeder.s3.secret.key}")
+  private String secretKeyHadoopCredentialReference;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_HADOOP_CREDENTIAL_ACCESS_REF,
+    description = "Amazon S3 access key reference in Hadoop credential store..",
+    examples = {"logfeeder.s3.access.key"},
+    defaultValue = "logfeeder.s3.access.key",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_HADOOP_CREDENTIAL_ACCESS_REF + ":logfeeder.s3.access.key}")
+  private String accessKeyHadoopCredentialReference;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_REGION,
+    description = "Amazon S3 region.",
+    examples = {"us-east-2"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_REGION + ":us-east-2}")
+  private String region;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_BUCKET,
+    description = "Amazon S3 bucket.",
+    examples = {"logs"},
+    defaultValue = "logfeeder",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_BUCKET + ":logfeeder}")
+  private String bucket;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_OBJECT_ACL,
+    description = "Amazon S3 ACLs for new objects.",
+    examples = {"logs"},
+    defaultValue = "private",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_OBJECT_ACL + ":private}")
+  private String objectAcl;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_PATH_STYLE_ACCESS,
+    description = "Enable S3 path style access will disable the default virtual hosting behaviour (DNS).",
+    defaultValue = "false",
+    examples = {"true"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_PATH_STYLE_ACCESS + ":false}")
+  private boolean pathStyleAccess;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.S3_MULTIOBJECT_DELETE_ENABLE,
+    description = "When enabled, multiple single-object delete requests are replaced by\n" +
+      "    a single 'delete multiple objects'-request, reducing the number of requests.",
+    defaultValue = "true",
+    examples = {"false"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.S3_MULTIOBJECT_DELETE_ENABLE + ":true}")
+  private boolean multiobjectDeleteEnable;
+
+  @Inject
+  public S3OutputConfig(BucketConfig bucketConfig) {
+    this.bucketConfig = bucketConfig;
+  }
+
+  @Override
+  public String getOutputBasePath() {
+    return this.bucket;
+  }
+
+  public String getBucket() {
+    return this.bucket;
+  }
+
+  public void setBucket(String bucketName) {
+    this.bucket = bucketName;
+  }
+
+  public String getEndpoint() {
+    return endpoint;
+  }
+
+  public void setEndpoint(String endpoint) {
+    this.endpoint = endpoint;
+  }
+
+  public String getRegion() {
+    return region;
+  }
+
+  public void setRegion(String region) {
+    this.region = region;
+  }
+
+  public String getSecretKey() {
+    return secretKey;
+  }
+
+  public void setSecretKey(String secretKey) {
+    this.secretKey = secretKey;
+  }
+
+  public String getAccessKey() {
+    return accessKey;
+  }
+
+  public void setAccessKey(String accessKey) {
+    this.accessKey = accessKey;
+  }
+
+  public String getSecretKeyFileLocation() {
+    return secretKeyFileLocation;
+  }
+
+  public void setSecretKeyFileLocation(String secretKeyFileLocation) {
+    this.secretKeyFileLocation = secretKeyFileLocation;
+  }
+
+  public String getAccessKeyFileLocation() {
+    return accessKeyFileLocation;
+  }
+
+  public void setAccessKeyFileLocation(String accessKeyFileLocation) {
+    this.accessKeyFileLocation = accessKeyFileLocation;
+  }
+
+  public boolean isUseFileSecrets() {
+    return useFileSecrets;
+  }
+
+  public void setUseFileSecrets(boolean useFileSecrets) {
+    this.useFileSecrets = useFileSecrets;
+  }
+
+  public boolean isUseHadoopCredentialStorage() {
+    return useHadoopCredentialStorage;
+  }
+
+  public void setUseHadoopCredentialStorage(boolean useHadoopCredentialStorage) {
+    this.useHadoopCredentialStorage = useHadoopCredentialStorage;
+  }
+
+  public String getSecretKeyHadoopCredentialReference() {
+    return secretKeyHadoopCredentialReference;
+  }
+
+  public void setSecretKeyHadoopCredentialReference(String secretKeyHadoopCredentialReference) {
+    this.secretKeyHadoopCredentialReference = secretKeyHadoopCredentialReference;
+  }
+
+  public String getAccessKeyHadoopCredentialReference() {
+    return accessKeyHadoopCredentialReference;
+  }
+
+  @Override
+  public String getAccessKeyProperty() {
+    return LogFeederConstants.S3_ACCESS_KEY;
+  }
+
+  @Override
+  public String getSecretKeyProperty() {
+    return LogFeederConstants.S3_SECRET_KEY;
+  }
+
+  @Override
+  public String getAccessKeyEnvVariable() {
+    return "AWS_ACCESS_KEY_ID";
+  }
+
+  @Override
+  public String getSecretKeyEnvVariable() {
+    return "AWS_SECRET_ACCESS_KEY";
+  }
+
+  @Override
+  public String getDescription() {
+    return "AWS S3";
+  }
+
+  public void setAccessKeyHadoopCredentialReference(String accessKeyHadoopCredentialReference) {
+    this.accessKeyHadoopCredentialReference = accessKeyHadoopCredentialReference;
+  }
+
+  public String getObjectAcl() {
+    return objectAcl;
+  }
+
+  public void setObjectAcl(String objectAcl) {
+    this.objectAcl = objectAcl;
+  }
+
+  public boolean isPathStyleAccess() {
+    return pathStyleAccess;
+  }
+
+  public void setPathStyleAccess(boolean pathStyleAccess) {
+    this.pathStyleAccess = pathStyleAccess;
+  }
+
+  public boolean isMultiobjectDeleteEnable() {
+    return multiobjectDeleteEnable;
+  }
+
+  public void setMultiobjectDeleteEnable(boolean multiobjectDeleteEnable) {
+    this.multiobjectDeleteEnable = multiobjectDeleteEnable;
+  }
+
+  public BucketConfig getBucketConfig() {
+    return bucketConfig;
+  }
+
+  public CannedAccessControlList calculateAcls(String aclStr) {
+    for (CannedAccessControlList val : CannedAccessControlList.values()) {
+      if (val.toString().equals(aclStr)) {
+        return val;
+      }
+    }
+    throw new IllegalArgumentException(String.format("'%s' is not a valid ACL setting", aclStr));
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PlainTextSecretStore.java
similarity index 69%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PlainTextSecretStore.java
index 871ae93..abc462e 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PlainTextSecretStore.java
@@ -16,17 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.output.cloud;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+package org.apache.ambari.logfeeder.credential;
 
 /**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
+ * Plain text secret store is not recommended for production
  */
-public class CloudStorageFactory {
+public class PlainTextSecretStore implements SecretStore {
+
+  private final String secret;
+
+  public PlainTextSecretStore(String secret) {
+    this.secret = secret;
+  }
 
-  public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
-    return new HDFSOutput();
+  @Override
+  public char[] getSecret() {
+    return secret.toCharArray();
   }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
deleted file mode 100644
index 5191045..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.input;
-
-import org.apache.ambari.logfeeder.output.S3OutputConfiguration;
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.solr.common.util.Base64;
-
-import java.io.BufferedReader;
-import java.io.File;
-
-/**
- * Download file from S3 then start processing it.
- */
-public class InputS3File extends InputFile {
-
-  private static final Logger logger = LogManager.getLogger(InputS3File.class);
-
-  private boolean ready = false;
-
-  @Override
-  public boolean isReady() {
-    if (!ready) {
-      // Let's try to check whether the file is available
-      setLogFiles(getActualFiles(getLogPath()));
-      if (!ArrayUtils.isEmpty(getLogFiles())) {
-        if (isTail() && getLogFiles().length > 1) {
-          logger.warn("Found multiple files (" + getLogFiles().length + ") for the file filter " + getFilePath() +
-              ". Will use only the first one. Using " + getLogFiles()[0].getAbsolutePath());
-        }
-        logger.info("File filter " + getFilePath() + " expanded to " + getLogFiles()[0].getAbsolutePath());
-        setReady(true);
-      } else {
-        logger.debug(getLogPath() + " file doesn't exist. Ignoring for now");
-      }
-    }
-    return ready;
-  }
-
-  @Override
-  public void setReady(boolean ready) {
-    this.ready = ready;
-  }
-
-  private File[] getActualFiles(String searchPath) {
-    // TODO search file on s3
-    return new File[] { new File(searchPath) };
-  }
-
-  @Override
-  public void start() throws Exception {
-    if (ArrayUtils.isEmpty(getLogFiles())) {
-      return;
-    }
-    for (int i = getLogFiles().length - 1; i >= 0; i--) {
-      File file = getLogFiles()[i];
-      if (i == 0 || !isTail()) {
-        try {
-          processFile(file, i == 0);
-          if (isClosed() || isDrain()) {
-            logger.info("isClosed or isDrain. Now breaking loop.");
-            break;
-          }
-        } catch (Throwable t) {
-          logger.error("Error processing file=" + file.getAbsolutePath(), t);
-        }
-      }
-    }
-    close();
-  }
-
-  @Override
-  public BufferedReader openLogFile(File logPathFile) throws Exception {
-    String s3AccessKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3AccessKey();
-    String s3SecretKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3SecretKey();
-    String s3Endpoint = ((InputS3FileDescriptor)getInputDescriptor()).getS3Endpoint();
-    if (s3Endpoint == null) {
-      s3Endpoint = S3OutputConfiguration.DEFAULT_S3_ENDPOINT;
-    }
-    BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3Endpoint, s3AccessKey, s3SecretKey);
-    Object fileKey = getFileKey(logPathFile);
-    setFileKey(fileKey);
-    String base64FileKey = Base64.byteArrayToBase64(getFileKey().toString().getBytes());
-    setBase64FileKey(base64FileKey);
-    logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
-    return br;
-  }
-
-  private Object getFileKey(File logFile) {
-    return logFile.getPath();
-  }
-  
-  @Override
-  public void close() {
-    super.close();
-    setClosed(true);
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
deleted file mode 100644
index 93a2643..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputFileMarker;
-import org.apache.ambari.logfeeder.output.spool.LogSpooler;
-import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
-import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import org.apache.ambari.logfeeder.plugin.output.Output;
-import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * An {@link Output} that records logs to HDFS.
- *
- * The events are spooled on the local file system and uploaded in batches asynchronously.
- */
-public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> implements RolloverHandler, RolloverCondition {
-  private static final Logger logger = LogManager.getLogger(OutputHDFSFile.class);
-  
-  private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
-
-  private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
-
-  private final Object readyMonitor = new Object();
-
-  private Thread hdfsCopyThread = null;
-
-  private String filenamePrefix = "service-logs-";
-  private long rolloverThresholdTimeMillis;
-
-  private String hdfsOutDir = null;
-  private String hdfsHost = null;
-  private String hdfsPort = null;
-  private FileSystem fileSystem = null;
-
-  private LogSpooler logSpooler;
-
-  private LogFeederProps logFeederProps;
-
-  @Override
-  public void init(LogFeederProps logFeederProps) throws Exception {
-    this.logFeederProps = logFeederProps;
-    hdfsOutDir = getStringValue("hdfs_out_dir");
-    hdfsHost = getStringValue("hdfs_host");
-    hdfsPort = getStringValue("hdfs_port");
-    long rolloverThresholdTimeSeconds = getLongValue("rollover_sec", DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS);
-    rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
-    filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
-    if (StringUtils.isEmpty(hdfsOutDir)) {
-      logger.error("HDFS config property <hdfs_out_dir> is not set in config file.");
-      return;
-    }
-    if (StringUtils.isEmpty(hdfsHost)) {
-      logger.error("HDFS config property <hdfs_host> is not set in config file.");
-      return;
-    }
-    if (StringUtils.isEmpty(hdfsPort)) {
-      logger.error("HDFS config property <hdfs_port> is not set in config file.");
-      return;
-    }
-    HashMap<String, String> contextParam = buildContextParam();
-    hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
-    logger.info("hdfs Output dir=" + hdfsOutDir);
-    String localFileDir = logFeederProps.getTmpDir() + "hdfs/service/";
-    logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
-    this.startHDFSCopyThread();
-  }
-
-  @Override
-  public void close() {
-    logger.info("Closing file." + getShortDescription());
-    logSpooler.rollover();
-    this.stopHDFSCopyThread();
-    shouldCloseOutput();
-  }
-
-  @Override
-  public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
-    if (block != null) {
-      logSpooler.add(block);
-      statMetric.value++;
-    }
-  }
-
-  @Override
-  public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
-    String block = LogFeederUtil.getGson().toJson(jsonObj);
-    write(block, inputMarker);
-  }
-
-  
-  @Override
-  public String getShortDescription() {
-    return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
-  }
-
-  private void startHDFSCopyThread() {
-
-    hdfsCopyThread = new Thread("hdfsCopyThread") {
-      @Override
-      public void run() {
-        try {
-          while (true) {
-            Iterator<File> localFileIterator = localReadyFiles.iterator();
-            while (localFileIterator.hasNext()) {
-              File localFile = localFileIterator.next();
-              fileSystem = LogFeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort);
-              if (fileSystem != null && localFile.exists()) {
-                String destFilePath = hdfsOutDir + "/" + localFile.getName();
-                String localPath = localFile.getAbsolutePath();
-                boolean overWrite = true;
-                boolean delSrc = true;
-                boolean isCopied = LogFeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem,
-                    overWrite, delSrc);
-                if (isCopied) {
-                  logger.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath);
-                } else {
-                  // TODO Need to write retry logic, in next release we can handle it
-                  logger.error("Hdfs file copy  failed for hdfspath :" + destFilePath + " and localpath :" + localPath);
-                }
-              }
-              localFileIterator.remove();
-            }
-            try {
-              // wait till new file comes in reayList
-              synchronized (readyMonitor) {
-                if (localReadyFiles.isEmpty()) {
-                  readyMonitor.wait();
-                }
-              }
-            } catch (InterruptedException e) {
-              logger.error(e.getLocalizedMessage(),e);
-            }
-          }
-        } catch (Exception e) {
-          logger.error("Exception in hdfsCopyThread errorMsg:" + e.getLocalizedMessage(), e);
-        }
-      }
-    };
-    hdfsCopyThread.setDaemon(true);
-    hdfsCopyThread.start();
-  }
-
-  private void stopHDFSCopyThread() {
-    if (hdfsCopyThread != null) {
-      logger.info("waiting till copy all local files to hdfs.......");
-      while (!localReadyFiles.isEmpty()) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          logger.error(e.getLocalizedMessage(), e);
-        }
-        logger.debug("still waiting to copy all local files to hdfs.......");
-      }
-      logger.info("calling interrupt method for hdfsCopyThread to stop it.");
-      try {
-        hdfsCopyThread.interrupt();
-      } catch (SecurityException exception) {
-        logger.error(" Current thread : '" + Thread.currentThread().getName() +
-            "' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'");
-      }
-      LogFeederHDFSUtil.closeFileSystem(fileSystem);
-    }
-  }
-
-  private HashMap<String, String> buildContextParam() {
-    HashMap<String, String> contextParam = new HashMap<String, String>();
-    contextParam.put("host", LogFeederUtil.hostName);
-    return contextParam;
-  }
-
-  private void addFileInReadyList(File localFile) {
-    localReadyFiles.add(localFile);
-    try {
-      synchronized (readyMonitor) {
-        readyMonitor.notifyAll();
-      }
-    } catch (Exception e) {
-      logger.error(e.getLocalizedMessage(),e);
-    }
-  }
-
-  @Override
-  public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
-    throw new UnsupportedOperationException("copyFile method is not yet supported for output=hdfs");
-  }
-
-  /**
-   * Add the rollover file to a daemon thread for uploading to HDFS
-   * @param rolloverFile the file to be uploaded to HDFS
-   */
-  @Override
-  public void handleRollover(File rolloverFile) {
-    addFileInReadyList(rolloverFile);
-  }
-
-  /**
-   * Determines whether it is time to handleRollover the current spool file.
-   *
-   * The file will handleRollover if the time since creation of the file is more than
-   * the timeout specified in rollover_sec configuration.
-   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state of active Spool file
-   * @return true if time since creation is greater than value specified in rollover_sec,
-   *          false otherwise.
-   */
-  @Override
-  public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
-    long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
-    boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
-    if (shouldRollover) {
-      logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
-          " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
-    }
-    return shouldRollover;
-  }
-
-  @Override
-  public String getOutputType() {
-    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
-  }
-
-  @Override
-  public Long getPendingCount() {
-    return 0L;
-  }
-
-  @Override
-  public String getWriteBytesMetricName() {
-    return "output.hdfs.write_bytes";
-  }
-
-  @Override
-  public String getStatMetricName() {
-    return "output.hdfs.write_logs";
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
deleted file mode 100644
index a2f6b08..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputFile;
-import org.apache.ambari.logfeeder.input.InputFileMarker;
-import org.apache.ambari.logfeeder.output.spool.LogSpooler;
-import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
-import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.plugin.filter.Filter;
-import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigImpl;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputDescriptorImpl;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputS3FileDescriptorImpl;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Write log file into s3 bucket.
- *
- * This class supports two modes of upload:
- * <ul>
- * <li>A one time upload of files matching a pattern</li>
- * <li>A batch mode, asynchronous, periodic upload of files</li>
- * </ul>
- */
-public class OutputS3File extends OutputFile implements RolloverCondition, RolloverHandler {
-  private static final Logger logger = LogManager.getLogger(OutputS3File.class);
-
-  public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
-
-  private LogSpooler logSpooler;
-  private S3OutputConfiguration s3OutputConfiguration;
-  private S3Uploader s3Uploader;
-  private LogFeederProps logFeederProps;
-
-  @Override
-  public void init(LogFeederProps logFeederProps) throws Exception {
-    this.logFeederProps = logFeederProps;
-    s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
-  }
-
-  private static boolean uploadedGlobalConfig = false;
-
-  /**
-   * Copy local log files and corresponding config to S3 bucket one time.
-   * @param inputFile The file to be copied
-   * @param inputMarker Contains information about the configuration to be uploaded.
-   */
-  @Override
-  public void copyFile(File inputFile, InputMarker inputMarker) {
-    String type = inputMarker.getInput().getInputDescriptor().getType();
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
-    String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.getInput().getInputDescriptor().getType());
-
-    uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
-  }
-
-  private void uploadConfig(InputMarker inputMarker, String type, S3OutputConfiguration s3OutputConfiguration,
-      String resolvedPath) {
-
-    ArrayList<FilterDescriptor> filters = new ArrayList<>();
-    addFilters(filters, inputMarker.getInput().getFirstFilter());
-    InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.getInput().getInputDescriptor();
-    InputS3FileDescriptorImpl inputS3FileDescriptor = InputConfigGson.gson.fromJson(
-        InputConfigGson.gson.toJson(inputS3FileDescriptorOriginal), InputS3FileDescriptorImpl.class);
-    String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
-        LogFeederConstants.S3_PATH_SEPARATOR + resolvedPath;
-    inputS3FileDescriptor.setPath(s3CompletePath);
-
-    ArrayList<InputDescriptorImpl> inputConfigList = new ArrayList<>();
-    inputConfigList.add(inputS3FileDescriptor);
-    // set source s3_file
-    // remove global config from input config
-    removeS3GlobalConfig(inputS3FileDescriptor);
-    // write config into s3 file
-    InputConfigImpl inputConfig = new InputConfigImpl();
-    inputConfig.setInput(inputConfigList);
-    
-    writeConfigToS3(inputConfig, getComponentConfigFileName(type), s3OutputConfiguration);
-    // write global config
-    writeGlobalConfig(s3OutputConfiguration);
-  }
-
-  private void addFilters(ArrayList<FilterDescriptor> filters, Filter filter) {
-    if (filter != null) {
-      FilterDescriptor filterDescriptorOriginal = filter.getFilterDescriptor();
-      FilterDescriptor filterDescriptor = InputConfigGson.gson.fromJson(
-          InputConfigGson.gson.toJson(filterDescriptorOriginal), filterDescriptorOriginal.getClass());
-      filters.add(filterDescriptor);
-      if (filter.getNextFilter() != null) {
-        addFilters(filters, filter.getNextFilter());
-      }
-    }
-  }
-
-  private void writeConfigToS3(Object config, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) {
-    String configJson = InputConfigGson.gson.toJson(config);
-
-    String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix,
-        s3OutputConfiguration.getCluster());
-
-    S3Util.writeDataIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey,
-      s3OutputConfiguration.getS3Endpoint(), s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey());
-  }
-
-  private String getComponentConfigFileName(String componentName) {
-    return "input.config-" + componentName + ".json";
-  }
-
-  private void removeS3GlobalConfig(InputS3FileDescriptorImpl inputS3FileDescriptor) {
-    inputS3FileDescriptor.setSource(null);
-    inputS3FileDescriptor.setCopyFile(null);
-    inputS3FileDescriptor.setProcessFile(null);
-    inputS3FileDescriptor.setTail(null);
-    inputS3FileDescriptor.getAddFields().remove("ip");
-    inputS3FileDescriptor.getAddFields().remove("host");
-    inputS3FileDescriptor.getAddFields().remove("bundle_id");
-  }
-
-  /**
-   * write global config in s3 file Invoke only once
-   */
-  @SuppressWarnings("unchecked")
-  private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) {
-    if (!uploadedGlobalConfig) {
-      Map<String, Object> globalConfig = new HashMap<>();
-      //updating global config before write to s3
-      globalConfig.put("source", "s3_file");
-      globalConfig.put("copy_file", false);
-      globalConfig.put("process_file", true);
-      globalConfig.put("tail", false);
-      Map<String, Object> addFields = (Map<String, Object>) globalConfig.get("add_fields");
-      if (addFields == null) {
-        addFields = new HashMap<>();
-      }
-      addFields.put("ip", LogFeederUtil.ipAddress);
-      addFields.put("host", LogFeederUtil.hostName);
-      // add bundle id same as cluster if its not there
-      String bundle_id = (String) addFields.get("bundle_id");
-      if (bundle_id == null || bundle_id.isEmpty()) {
-        String cluster = (String) addFields.get("cluster");
-        if (cluster != null && !cluster.isEmpty()) {
-          addFields.put("bundle_id", bundle_id);
-        }
-      }
-      globalConfig.put("add_fields", addFields);
-      Map<String, Object> config = new HashMap<String, Object>();
-      config.put("global", globalConfig);
-      writeConfigToS3(config, GLOBAL_CONFIG_S3_PATH_SUFFIX, s3OutputConfiguration);
-      uploadedGlobalConfig = true;
-    }
-  }
-
-  /**
-   * Write a log line to local file, to upload to S3 bucket asynchronously.
-   *
-   * This method uses a {@link LogSpooler} to spool the log lines to a local file.
-
-   * @param block The log event to upload
-   * @param inputMarker Contains information about the log file feeding the lines.
-   */
-  @Override
-  public void write(String block, InputFileMarker inputMarker) {
-    createLogSpoolerIfRequired(inputMarker);
-    logSpooler.add(block);
-  }
-
-  @Override
-  public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
-    String block = LogFeederUtil.getGson().toJson(jsonObj);
-    write(block, inputMarker);
-  }
-
-  private void createLogSpoolerIfRequired(InputFileMarker inputMarker) {
-    if (logSpooler == null) {
-      if (inputMarker.getInput().getClass().isAssignableFrom(InputFile.class)) {
-        InputFile input = (InputFile) inputMarker.getInput();
-        logSpooler = createSpooler(input.getFilePath());
-        s3Uploader = createUploader(input.getInputDescriptor().getType());
-      } else {
-        logger.error("Cannot write from non local file...");
-      }
-    }
-  }
-
-  @VisibleForTesting
-  protected S3Uploader createUploader(String logType) {
-    S3Uploader uploader = new S3Uploader(s3OutputConfiguration, true, logType);
-    uploader.startUploaderThread();
-    return uploader;
-  }
-
-  @VisibleForTesting
-  protected LogSpooler createSpooler(String filePath) {
-    String spoolDirectory = logFeederProps.getTmpDir() + "/s3/service";
-    logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
-    return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
-        s3OutputConfiguration.getRolloverTimeThresholdSecs());
-  }
-
-  /**
-   * Check whether the locally spooled file should be rolled over, based on file size.
-   *
-   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
-   *                                                       for rollover.
-   * @return true if sufficient size has been reached based on {@link S3OutputConfiguration#getRolloverSizeThresholdBytes()},
-   *              false otherwise
-   */
-  @Override
-  public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
-    File spoolFile = currentSpoolerContext.getActiveSpoolFile();
-    long currentSize = spoolFile.length();
-    boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
-    if (result) {
-      logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
-          s3OutputConfiguration.getRolloverSizeThresholdBytes()));
-    }
-    return result;
-  }
-
-  /**
-   * Stops dependent objects that consume resources.
-   */
-  @Override
-  public void close() {
-    if (s3Uploader != null) {
-      s3Uploader.stopUploaderThread();
-    }
-    if (logSpooler != null) {
-      logSpooler.close();
-    }
-  }
-
-  /**
-   * Adds the locally spooled file to the {@link S3Uploader} to be uploaded asynchronously.
-   *
-   * @param rolloverFile The file that has been rolled over.
-   */
-  @Override
-  public void handleRollover(File rolloverFile) {
-    s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath());
-  }
-
-  @Override
-  public String getShortDescription() {
-    return "output:destination=s3,bucket=" + s3OutputConfiguration.getS3BucketName();
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
deleted file mode 100644
index 8c544cf..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
-
-import java.util.HashMap;
-
-/**
- * A utility class that resolves variables like hostname, IP address and cluster name in S3 paths.
- */
-public class S3LogPathResolver {
-
-  /**
-   * Construct a full S3 path by resolving variables in the path name including hostname, IP address
-   * and cluster name
-   * @param baseKeyPrefix The prefix which can contain the variables.
-   * @param keySuffix The suffix appended to the prefix after variable expansion
-   * @param cluster The name of the cluster
-   * @return full S3 path.
-   */
-  public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) {
-    HashMap<String, String> contextParam = buildContextParam(cluster);
-    String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam);
-    return resolvedKeyPrefix + LogFeederConstants.S3_PATH_SEPARATOR + keySuffix;
-  }
-
-  private HashMap<String, String> buildContextParam(String cluster) {
-    HashMap<String, String> contextParam = new HashMap<>();
-    contextParam.put("host", LogFeederUtil.hostName);
-    contextParam.put("ip", LogFeederUtil.ipAddress);
-    contextParam.put("cluster", cluster);
-    return contextParam;
-  }
-
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
deleted file mode 100644
index 293f011..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Holds all configuration relevant for S3 upload.
- */
-public class S3OutputConfiguration {
-
-  public static final String SPOOL_DIR_KEY = "spool_dir";
-  public static final String ROLLOVER_SIZE_THRESHOLD_BYTES_KEY = "rollover_size_threshold_bytes";
-  public static final Long DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES = 10 * 1024 * 1024L;
-  public static final String ROLLOVER_TIME_THRESHOLD_SECS_KEY = "rollover_time_threshold_secs";
-  public static final Long DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS = 3600L;
-  public static final String S3_BUCKET_NAME_KEY = "s3_bucket";
-  public static final String S3_LOG_DIR_KEY = "s3_log_dir";
-  public static final String S3_ACCESS_KEY = "s3_access_key";
-  public static final String S3_SECRET_KEY = "s3_secret_key";
-  public static final String S3_ENDPOINT = "s3_endpoint";
-  public static final String DEFAULT_S3_ENDPOINT = "https://s3.amazonaws.com";
-  public static final String COMPRESSION_ALGO_KEY = "compression_algo";
-  public static final String ADDITIONAL_FIELDS_KEY = "add_fields";
-  public static final String CLUSTER_KEY = "cluster";
-
-  private Map<String, Object> configs;
-
-  S3OutputConfiguration(Map<String, Object> configs) {
-    this.configs = configs;
-  }
-
-  public String getS3BucketName() {
-    return (String) configs.get(S3_BUCKET_NAME_KEY);
-  }
-
-  public String getS3Endpoint() {
-    return (String) configs.getOrDefault(S3_ENDPOINT, DEFAULT_S3_ENDPOINT);
-  }
-
-  public String getS3Path() {
-    return (String) configs.get(S3_LOG_DIR_KEY);
-  }
-
-  public String getS3AccessKey() {
-    return (String) configs.get(S3_ACCESS_KEY);
-  }
-
-  public String getS3SecretKey() {
-    return (String) configs.get(S3_SECRET_KEY);
-  }
-
-  public String getCompressionAlgo() {
-    return (String) configs.get(COMPRESSION_ALGO_KEY);
-  }
-
-  public Long getRolloverSizeThresholdBytes() {
-    return (Long) configs.get(ROLLOVER_SIZE_THRESHOLD_BYTES_KEY);
-  }
-
-  public Long getRolloverTimeThresholdSecs() {
-    return (Long) configs.get(ROLLOVER_TIME_THRESHOLD_SECS_KEY);
-  }
-
-  @SuppressWarnings("unchecked")
-  public String getCluster() {
-    return ((Map<String, String>) configs.get(ADDITIONAL_FIELDS_KEY)).get(CLUSTER_KEY);
-  }
-
-  public static S3OutputConfiguration fromConfigBlock(ConfigItem configItem) {
-    Map<String, Object> configs = new HashMap<>();
-    String[] stringValuedKeysToCopy = new String[] {
-        SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY,
-        S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY, S3_ENDPOINT
-    };
-
-    for (String key : stringValuedKeysToCopy) {
-      String value = configItem.getStringValue(key);
-      if (value != null) {
-        configs.put(key, value);
-      }
-    }
-
-    String[] longValuedKeysToCopy = new String[] {
-        ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, ROLLOVER_TIME_THRESHOLD_SECS_KEY
-    };
-
-    Long[] defaultValuesForLongValuedKeys = new Long[] {
-        DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES, DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS
-    };
-
-    for (int i = 0; i < longValuedKeysToCopy.length; i++) {
-      configs.put(longValuedKeysToCopy[i], configItem.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
-    }
-
-    configs.put(ADDITIONAL_FIELDS_KEY, configItem.getNVList(ADDITIONAL_FIELDS_KEY));
-
-    configs.putIfAbsent(S3_ENDPOINT, DEFAULT_S3_ENDPOINT);
-
-    return new S3OutputConfiguration(configs);
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
deleted file mode 100644
index 4273cc7..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.CompressionUtil;
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.Date;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A class that handles the uploading of files to S3.
- *
- * This class can be used to upload a file one time, or start a daemon thread that can
- * be used to upload files added to a queue one after the other. When used to upload
- * files via a queue, one instance of this class is created for each file handled in
- * {@link org.apache.ambari.logfeeder.input.InputFile}.
- */
-public class S3Uploader implements Runnable {
-  private static final Logger logger = LogManager.getLogger(S3Uploader.class);
-  
-  public static final String POISON_PILL = "POISON-PILL";
-
-  private final S3OutputConfiguration s3OutputConfiguration;
-  private final boolean deleteOnEnd;
-  private final String logType;
-  private final BlockingQueue<String> fileContextsToUpload;
-  private final AtomicBoolean stopRunningThread = new AtomicBoolean(false);
-
-  public S3Uploader(S3OutputConfiguration s3OutputConfiguration, boolean deleteOnEnd, String logType) {
-    this.s3OutputConfiguration = s3OutputConfiguration;
-    this.deleteOnEnd = deleteOnEnd;
-    this.logType = logType;
-    this.fileContextsToUpload = new LinkedBlockingQueue<>();
-  }
-
-  /**
-   * Starts a thread that can be used to upload files from a queue.
-   *
-   * Add files to be uploaded using the method {@link #addFileForUpload(String)}.
-   * If this thread is started, it must be stopped using the method {@link #stopUploaderThread()}.
-   */
-  void startUploaderThread() {
-    Thread s3UploaderThread = new Thread(this, "s3-uploader-thread-"+logType);
-    s3UploaderThread.setDaemon(true);
-    s3UploaderThread.start();
-  }
-
-  /**
-   * Stops the thread used to upload files from a queue.
-   *
-   * This method must be called to cleanly free up resources, typically on shutdown of the process.
-   * Note that this method does not drain any remaining files, and instead stops the thread
-   * as soon as any file being currently uploaded is complete.
-   */
-  void stopUploaderThread() {
-    stopRunningThread.set(true);
-    boolean offerStatus = fileContextsToUpload.offer(POISON_PILL);
-    if (!offerStatus) {
-      logger.warn("Could not add poison pill to interrupt uploader thread.");
-    }
-  }
-
-  /**
-   * Add a file to a queue to upload asynchronously.
-   * @param fileToUpload Full path to the local file which must be uploaded.
-   */
-  void addFileForUpload(String fileToUpload) {
-    boolean offerStatus = fileContextsToUpload.offer(fileToUpload);
-    if (!offerStatus) {
-      logger.error("Could not add file " + fileToUpload + " for upload.");
-    }
-  }
-
-  @Override
-  public void run() {
-    while (!stopRunningThread.get()) {
-      try {
-        String fileNameToUpload = fileContextsToUpload.take();
-        if (POISON_PILL.equals(fileNameToUpload)) {
-          logger.warn("Found poison pill while waiting for files to upload, exiting");
-          return;
-        }
-        uploadFile(new File(fileNameToUpload), logType);
-      } catch (InterruptedException e) {
-        logger.error("Interrupted while waiting for elements from fileContextsToUpload", e);
-        return;
-      }
-    }
-  }
-
-  /**
-   * Upload the given file to S3.
-   *
-   * The file which should be available locally, is first compressed using the compression
-   * method specified by {@link S3OutputConfiguration#getCompressionAlgo()}. This compressed
-   * file is what is uploaded to S3.
-   * @param fileToUpload the file to upload
-   * @param logType the name of the log which is used in the S3 path constructed.
-   * @return
-   */
-  String uploadFile(File fileToUpload, String logType) {
-    String bucketName = s3OutputConfiguration.getS3BucketName();
-    String s3AccessKey = s3OutputConfiguration.getS3AccessKey();
-    String s3SecretKey = s3OutputConfiguration.getS3SecretKey();
-    String compressionAlgo = s3OutputConfiguration.getCompressionAlgo();
-    String s3Endpoint = s3OutputConfiguration.getS3Endpoint();
-
-    String keySuffix = fileToUpload.getName() + "." + compressionAlgo;
-    String s3Path = new S3LogPathResolver().getResolvedPath(
-        s3OutputConfiguration.getS3Path() + LogFeederConstants.S3_PATH_SEPARATOR + logType, keySuffix,
-        s3OutputConfiguration.getCluster());
-    logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
-    File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo);
-
-    logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
-    writeFileIntoS3File(sourceFile, bucketName, s3Path, s3Endpoint, s3AccessKey, s3SecretKey);
-
-    // delete local compressed file
-    sourceFile.delete();
-    if (deleteOnEnd) {
-      logger.info("Deleting input file as required");
-      if (!fileToUpload.delete()) {
-        logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
-      }
-    }
-    return s3Path;
-  }
-
-  @VisibleForTesting
-  protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path,
-                                     String s3Endpoint, String s3AccessKey, String s3SecretKey) {
-    S3Util.writeFileIntoS3File(sourceFile.getAbsolutePath(), bucketName, s3Path, s3Endpoint, s3AccessKey, s3SecretKey);
-  }
-
-  @VisibleForTesting
-  protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
-    File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" + new Date().getTime() +
-        "." + compressionAlgo);
-    outputFile = CompressionUtil.compressFile(fileToUpload, outputFile, compressionAlgo);
-    return outputFile;
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
new file mode 100644
index 0000000..6cf0c7c
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.appender.rolling.CompositeTriggeringPolicy;
+import org.apache.logging.log4j.core.appender.rolling.OnStartupTriggeringPolicy;
+import org.apache.logging.log4j.core.appender.rolling.SizeBasedTriggeringPolicy;
+import org.apache.logging.log4j.core.config.AppenderRef;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+
+import java.io.File;
+import java.nio.file.Paths;
+
+/**
+ * Create a custom logger that will be used to ship inputs into specific files
+ * and rollover those files to an archive folder that will be monitored by a
+ * thread which will upload archived files to a cloud storage
+ */
+public class CloudStorageLoggerFactory {
+
+  private static final String ACTIVE_FOLDER = "active";
+  private static final String ARCHIVED_FOLDER = "archived";
+  private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-hh-mm-ss-SSS}.log.gz";
+  private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-hh-mm-ss-SSS}.log";
+
+  public static Logger createLogger(Input input, LoggerContext loggerContext, LogFeederProps logFeederProps) {
+    String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, "");
+    String uniqueThreadName = input.getThread().getName();
+    Configuration config = loggerContext.getConfiguration();
+    String destination = logFeederProps.getCloudStorageDestination().getText();
+    String activeLogDir = Paths.get(logFeederProps.getTmpDir(), ACTIVE_FOLDER, type).toFile().getAbsolutePath();
+    String archiveLogDir = Paths.get(logFeederProps.getTmpDir(), destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath();
+
+    boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip();
+    String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX;
+    String fileName = String.join(File.separator, activeLogDir, type + ".log");
+    String filePattern = String.join(File.separator, archiveLogDir, type + archiveFilePattern);
+    PatternLayout layout = PatternLayout.newBuilder()
+      .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build();
+
+    SizeBasedTriggeringPolicy sizeBasedTriggeringPolicy = SizeBasedTriggeringPolicy.createPolicy(
+      logFeederProps.getRolloverConfig().getRolloverSize());
+    CustomTimeBasedTriggeringPolicy customTimeBasedTriggeringPolicy = CustomTimeBasedTriggeringPolicy
+      .createPolicy(String.valueOf(logFeederProps.getRolloverConfig().getRolloverThresholdTimeMins()));
+
+    final CompositeTriggeringPolicy compositeTriggeringPolicy;
+
+    if (logFeederProps.getRolloverConfig().isRolloverOnStartup()) {
+      OnStartupTriggeringPolicy onStartupTriggeringPolicy = OnStartupTriggeringPolicy.createPolicy(1);
+      compositeTriggeringPolicy = CompositeTriggeringPolicy
+        .createPolicy(sizeBasedTriggeringPolicy, customTimeBasedTriggeringPolicy, onStartupTriggeringPolicy);
+    } else {
+      compositeTriggeringPolicy = CompositeTriggeringPolicy
+        .createPolicy(sizeBasedTriggeringPolicy, customTimeBasedTriggeringPolicy);
+    }
+
+    boolean immediateFlush = logFeederProps.getRolloverConfig().isImmediateFlush();
+    RollingFileAppender appender = RollingFileAppender.newBuilder()
+      .withFileName(fileName)
+      .withFilePattern(filePattern)
+      .withLayout(layout)
+      .withName(type)
+      .withPolicy(compositeTriggeringPolicy)
+      .withImmediateFlush(immediateFlush)
+      .build();
+
+    appender.start();
+    config.addAppender(appender);
+
+    AppenderRef ref = AppenderRef.createAppenderRef(type, null, null);
+    AppenderRef[] refs = new AppenderRef[] {ref};
+
+    LoggerConfig loggerConfig = LoggerConfig
+      .createLogger(false, Level.ALL, input.getThread().getName(),
+        "true", refs, null, config, null);
+    loggerConfig.addAppender(appender, null, null);
+    config.addLogger(uniqueThreadName, loggerConfig);
+    loggerContext.updateLoggers();
+    return loggerContext.getLogger(uniqueThreadName);
+  }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
index 561b141..fbbffe6 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -19,12 +19,139 @@
 package org.apache.ambari.logfeeder.output.cloud;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClientFactory;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Class to handle common operations for cloud storage outputs
- * TODO !!!
+ * Output class for cloud outputs.
+ * Holds loggers - those will ship logs into specific folders, those files can be rolled out to an archive folder,
+ * from there an upload client will be able to ship the log archives to a cloud storage
  */
-public abstract class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
+public class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
+
+  private static final Logger logger = LogManager.getLogger(CloudStorageOutput.class);
+
+  private final Map<String, Logger> cloudOutputLoggers = new ConcurrentHashMap<>();
+  private final UploadClient uploadClient;
+  private final LogFeederProps logFeederProps;
+  private final LoggerContext loggerContext;
+  private final CloudStorageUploader uploader;
+  private final RolloverConfig rolloverConfig;
+
+  public CloudStorageOutput(LogFeederProps logFeederProps) {
+    this.uploadClient = UploadClientFactory.createUploadClient(logFeederProps);
+    this.logFeederProps = logFeederProps;
+    this.rolloverConfig = logFeederProps.getRolloverConfig();
+    loggerContext = (LoggerContext) LogManager.getContext(false);
+    uploader = new CloudStorageUploader(String.format("%s-uploader", logFeederProps.getCloudStorageDestination().getText()), uploadClient, logFeederProps);
+    uploader.setDaemon(true);
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProperties) throws Exception {
+    logger.info("Initialize cloud output.");
+    uploadClient.init(logFeederProperties);
+    uploader.start();
+  }
+
+  @Override
+  public String getOutputType() {
+    return "cloud";
+  }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker) throws Exception {
+    throw new UnsupportedOperationException("Copy file is not supported yet");
+  }
+
+  @Override
+  public void write(String jsonStr, InputMarker inputMarker) throws Exception {
+    String uniqueThreadName = inputMarker.getInput().getThread().getName();
+    Logger cloudLogger = null;
+    if (cloudOutputLoggers.containsKey(uniqueThreadName)) {
+      cloudLogger = cloudOutputLoggers.get(uniqueThreadName);
+    } else {
+      logger.info("New cloud input source found. Register: {}", uniqueThreadName);
+      cloudLogger = CloudStorageLoggerFactory.createLogger(inputMarker.getInput(), loggerContext, logFeederProps);
+      cloudOutputLoggers.put(uniqueThreadName, cloudLogger);
+    }
+    cloudLogger.info(jsonStr);
+    inputMarker.getInput().checkIn(inputMarker);
+  }
+
+  @Override
+  public Long getPendingCount() {
+    return 0L;
+  }
+
+  @Override
+  public String getWriteBytesMetricName() {
+    return "write:cloud";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "cloud";
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return null;
+  }
+
+  void removeWorker(Input input) {
+    String uniqueName = input.getThread().getName();
+    if (rolloverConfig.isRolloverOnShutdown() && cloudOutputLoggers.containsKey(uniqueName)) {
+      rollover(cloudOutputLoggers.get(uniqueName));
+    }
+    logger.info("Remove logger: {}", uniqueName);
+    Configuration config = loggerContext.getConfiguration();
+    config.removeLogger(uniqueName);
+    loggerContext.updateLoggers();
+    cloudOutputLoggers.remove(uniqueName);
+  }
+
+  void stopUploader() {
+    uploader.interrupt();
+    if (logFeederProps.isCloudStorageUploadOnShutdown()) {
+      logger.info("Do last upload before shutdown.");
+      uploader.doUpload();
+    }
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    try {
+      if (uploadClient != null) {
+        uploadClient.close();
+      }
+    } catch (Exception e) {
+      logger.error("Error during closing uploader client", e);
+    }
+  }
+
+  private void rollover(Logger logger) {
+    Map<String, Appender> appenders = ((org.apache.logging.log4j.core.Logger) logger).getAppenders();
+    for (Map.Entry<String, Appender> stringAppenderEntry : appenders.entrySet()) {
+      Appender appender = stringAppenderEntry.getValue();
+      if (appender instanceof RollingFileAppender) {
+        ((RollingFileAppender) appender).getManager().rollover();
+      }
+    }
+  }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
index 4994eb7..16b7e55 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -20,9 +20,11 @@ package org.apache.ambari.logfeeder.output.cloud;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
 import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
 import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -34,7 +36,6 @@ import java.util.Map;
 
 /**
  * Handle output operations for sending cloud inputs to a cloud storage destination
- * TODO !!!
  */
 public class CloudStorageOutputManager extends OutputManager {
 
@@ -49,22 +50,20 @@ public class CloudStorageOutputManager extends OutputManager {
 
   @Override
   public void write(Map<String, Object> jsonObj, InputMarker marker) {
-    // TODO: make sense to implement this if we will support filters before calling cloud outputs
+    write(LogFeederUtil.getGson().toJson(jsonObj), marker);
   }
 
   @Override
   public void write(String line, InputMarker marker) {
-    logger.info("Output: {}", line);
     try {
       storageOutput.write(line, marker);
     } catch (Exception e) {
-
+      logger.error("Error during cloud output write.", e);
     }
   }
 
   @Override
   public void copyFile(File file, InputMarker marker) {
-
   }
 
   @Override
@@ -80,23 +79,29 @@ public class CloudStorageOutputManager extends OutputManager {
   @Override
   public void init() throws Exception {
     logger.info("Called init with cloud storage output manager.");
-    storageOutput = CloudStorageFactory.createCloudStorageOutput(logFeederProps);
+    storageOutput = new CloudStorageOutput(logFeederProps);
     storageOutput.init(logFeederProps);
     add(storageOutput);
   }
 
   @Override
   public void close() {
-
+    logger.info("Close called for cloud outputs.");
+    storageOutput.stopUploader();
+    storageOutput.setDrain(true);
+    storageOutput.close();
   }
 
   @Override
   public void logStats() {
-
   }
 
   @Override
   public void addMetricsContainers(List<MetricData> metricsList) {
+  }
 
+  @Override
+  public void release(Input input) {
+    storageOutput.removeWorker(input);
   }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
new file mode 100644
index 0000000..ebb0cef
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.util.Collection;
+
+/**
+ * Periodically checks a folder (contains archived logs) and if it finds any .log or .gz files, it will try to upload them to cloud storage by an upload client (cloud specific)
+ */
+public class CloudStorageUploader extends Thread {
+
+  private static final Logger logger = LogManager.getLogger(CloudStorageUploader.class);
+
+  private final UploadClient uploadClient;
+  private final LogFeederProps logFeederProps;
+  private final String clusterName;
+  private final String hostName;
+  private final String uploaderType;
+
+  public CloudStorageUploader(String name, UploadClient uploadClient, LogFeederProps logFeederProps) {
+    super(name);
+    this.uploadClient = uploadClient;
+    this.logFeederProps = logFeederProps;
+    this.uploaderType = logFeederProps.getCloudStorageDestination().getText();
+    this.clusterName = logFeederProps.getClusterName();
+    this.hostName = LogFeederUtil.hostName;
+  }
+
+  @Override
+  public void run() {
+    logger.info("Start '{}' uploader", uploaderType);
+    boolean stop = false;
+    do {
+      try {
+        try {
+          doUpload();
+        } catch (Exception e) {
+          logger.error("An error occurred during Uploader operation - " + uploaderType, e);
+        }
+        Thread.sleep(1000 * logFeederProps.getCloudStorageUploaderIntervalSeconds());
+      } catch (InterruptedException ie) {
+        logger.info("Uploader ({}) thread interrupted", uploaderType);
+        stop = true;
+      }
+    } while (!stop && !Thread.currentThread().isInterrupted());
+  }
+
+  /**
+   * Finds .log and .gz files and upload them to cloud storage by an uploader client
+   */
+  void doUpload() {
+    try {
+      final String archiveLogDir = String.join(File.separator, logFeederProps.getTmpDir(), uploaderType, "archived");
+      if (new File(archiveLogDir).exists()) {
+        String[] extensions = {"log", "gz"};
+        Collection<File> filesToUpload = FileUtils.listFiles(new File(archiveLogDir), extensions, true);
+        if (filesToUpload.isEmpty()) {
+          logger.debug("Not found any files to upload.");
+        } else {
+          for (File file : filesToUpload) {
+            String basePath = uploadClient.getOutputConfig().getOutputBasePath();
+            String outputPath = String.format("%s/%s/%s/%s", clusterName, hostName, file.getParentFile().getName(), file.getName())
+              .replaceAll("//", "/");
+            logger.info("Upload will start: input: {}, output: {}", file.getAbsolutePath(), outputPath);
+            uploadClient.upload(file.getAbsolutePath(), outputPath, basePath);
+          }
+        }
+      } else {
+        logger.debug("Directory {} does not exist.", archiveLogDir);
+      }
+    } catch (Exception e) {
+      logger.error("Exception during cloud upload", e);
+    }
+  }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CustomTimeBasedTriggeringPolicy.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CustomTimeBasedTriggeringPolicy.java
new file mode 100644
index 0000000..41bb5ba
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CustomTimeBasedTriggeringPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.logging.log4j.core.Core;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.rolling.AbstractTriggeringPolicy;
+import org.apache.logging.log4j.core.appender.rolling.RollingFileManager;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Rolls a file over based on time. - it works with a specific interval, it does not use the file date pattern from log4j2 configuration
+ */
+@Plugin(name = "CustomTimeBasedTriggeringPolicy", category = Core.CATEGORY_NAME, printObject = true)
+public final class CustomTimeBasedTriggeringPolicy extends AbstractTriggeringPolicy {
+
+  private final long intervalMin;
+
+  private RollingFileManager manager;
+  private long nextRolloverMillis;
+
+  private CustomTimeBasedTriggeringPolicy(final long intervalMin) {
+    this.intervalMin = intervalMin;
+  }
+
+  public long getIntervalMin() {
+    return this.intervalMin;
+  }
+
+  @Override
+  public void initialize(RollingFileManager manager) {
+    this.manager = manager;
+    long fileTime = this.manager.getFileTime();
+    long actualDate = System.currentTimeMillis();
+    long diff = actualDate - fileTime;
+    long intervalMillis = TimeUnit.MINUTES.toMillis(this.intervalMin);
+    if (diff > intervalMillis) {
+      this.nextRolloverMillis = actualDate;
+    } else {
+      long remainingMillis = intervalMillis - diff;
+      this.nextRolloverMillis = actualDate + remainingMillis;
+    }
+  }
+
+  @Override
+  public boolean isTriggeringEvent(LogEvent event) {
+    if (this.manager.getFileSize() == 0L) {
+      return false;
+    } else {
+      long nowMillis = event.getTimeMillis();
+      if (nowMillis >= this.nextRolloverMillis) {
+        this.nextRolloverMillis = nowMillis + TimeUnit.MINUTES.toMillis(this.intervalMin);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  @PluginFactory
+  public static CustomTimeBasedTriggeringPolicy createPolicy(@PluginAttribute("intervalMins") final String intervalMins) {
+    return new CustomTimeBasedTriggeringPolicy(Long.parseLong(intervalMins));
+  }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
deleted file mode 100644
index 24edb41..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output.cloud;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-
-/**
- * HDFS cloud storage output (on-prem)
- * TODO !!!
- */
-public class HDFSOutput extends CloudStorageOutput {
-
-  private Logger logger = LogManager.getLogger(HDFSOutput.class);
-
-  @Override
-  public String getOutputType() {
-    return null;
-  }
-
-  @Override
-  public void copyFile(File inputFile, InputMarker inputMarker) throws Exception {
-  }
-
-  @Override
-  public void write(String line, InputMarker inputMarker) throws Exception {
-    inputMarker.getInput().checkIn(inputMarker);
-  }
-
-  @Override
-  public Long getPendingCount() {
-    return null;
-  }
-
-  @Override
-  public String getWriteBytesMetricName() {
-    return null;
-  }
-
-  @Override
-  public void init(LogFeederProps logFeederProperties) throws Exception {
-    logger.info("Initialize on-prem HDFS output");
-  }
-
-  @Override
-  public String getShortDescription() {
-    return null;
-  }
-
-  @Override
-  public String getStatMetricName() {
-    return null;
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java
new file mode 100644
index 0000000..d4a45de
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.BucketConfig;
+import org.apache.ambari.logfeeder.conf.output.CloudStorageOutputConfig;
+import org.apache.ambari.logfeeder.credential.CompositeSecretStore;
+import org.apache.ambari.logfeeder.credential.EnvSecretStore;
+import org.apache.ambari.logfeeder.credential.FileSecretStore;
+import org.apache.ambari.logfeeder.credential.HadoopCredentialSecretStore;
+import org.apache.ambari.logfeeder.credential.PlainTextSecretStore;
+import org.apache.ambari.logfeeder.credential.PropertySecretStore;
+import org.apache.ambari.logfeeder.credential.SecretStore;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Holds common cloud based client operations
+ */
+abstract class AbstractCloudClient {
+
+  private static final Logger logger = LogManager.getLogger(AbstractCloudClient.class);
+
+  /**
+   * Create a cloud specific bucket if it does not exists
+   * @param bucket name of the bucket
+   */
+  abstract void createBucketIfNeeded(String bucket);
+
+  /**
+   * Try to bootstrap the default bucket, until it is not successful.
+   * @param bucket name of the bucket
+   * @param bucketConfig bucket config holder
+   */
+  void bootstrapBucket(String bucket, BucketConfig bucketConfig) {
+    if (bucketConfig.isCreateBucketOnStartup()) {
+      boolean ready = false;
+      while (!ready) {
+        try {
+          createBucketIfNeeded(bucket);
+          ready = true;
+        } catch (Exception e) {
+          logger.error("Error during bucket creation - bucket : " + bucket, e);
+        }
+        if (!ready) {
+          try {
+            Thread.sleep(5000);
+          } catch (InterruptedException e) {
+            logger.error("Error during thread sleep (bucket bootstrap)", e);
+            Thread.currentThread().interrupt();
+            ready = true;
+          }
+        } else {
+          logger.warn("Bucket ('{}') creation failed. Retry ...", bucket);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get secret key pair (access / secret) key
+   *
+   * @param props  global configuration holder
+   * @param config cloud based configuration
+   * @return secret key pair
+   */
+  SecretKeyPair getSecretKeyPair(LogFeederProps props, CloudStorageOutputConfig config) {
+    String secretFile = config.isUseFileSecrets() ? config.getSecretKeyFileLocation() : null;
+    String secretRef = config.isUseHadoopCredentialStorage() ? config.getSecretKeyHadoopCredentialReference() : null;
+    CompositeSecretStore secretKeyStore = createCompositeSecretStore(props, config.getSecretKey(), config.getSecretKeyProperty(),
+      config.getSecretKeyEnvVariable(), secretFile, secretRef);
+
+    String accessFile = config.isUseFileSecrets() ? config.getAccessKeyFileLocation() : null;
+    String accessRef = config.isUseHadoopCredentialStorage() ? config.getAccessKeyHadoopCredentialReference() : null;
+    CompositeSecretStore accessKeyStore = createCompositeSecretStore(props, config.getAccessKey(), config.getAccessKeyProperty(),
+      config.getAccessKeyEnvVariable(), accessFile, accessRef);
+    if (secretKeyStore.getSecret() == null) {
+      throw new RuntimeException(String.format("No any %s credentials found for secret access key.", config.getDescription()));
+    }
+    if (accessKeyStore.getSecret() == null) {
+      throw new RuntimeException(String.format("No any %s credentials found for access key id.", config.getDescription()));
+    }
+    return new SecretKeyPair(accessKeyStore.getSecret(), secretKeyStore.getSecret());
+  }
+
+  /**
+   * Common operation to create secret stores for cloud secrets
+   * @param props global property holder
+   * @param property java property to check for secret
+   * @param env env variable to check for secret
+   * @param file file to check for secret
+   * @param credentialRef credential provider referece to check for secret
+   * @return composite secret store that contains multiple way to get a secret
+   */
+  CompositeSecretStore createCompositeSecretStore(LogFeederProps props, String plainTextSecret, String property, String env,
+                                                          String file, String credentialRef) {
+    List<SecretStore> secretStores = new ArrayList<>();
+    if (StringUtils.isNotBlank(plainTextSecret)) {
+      secretStores.add(new PlainTextSecretStore(plainTextSecret));
+    }
+    if (StringUtils.isNotBlank(credentialRef)) {
+      secretStores.add(new HadoopCredentialSecretStore(credentialRef, props.getLogFeederSecurityConfig().getCredentialStoreProviderPath()));
+    }
+    if (StringUtils.isNotBlank(file)) {
+      secretStores.add(new FileSecretStore(file));
+    }
+    if (StringUtils.isNotBlank(env)) {
+      secretStores.add(new EnvSecretStore(env));
+    }
+    secretStores.add(new PropertySecretStore(property));
+    return new CompositeSecretStore(secretStores.toArray(new SecretStore[0]));
+  }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java
new file mode 100644
index 0000000..dd5a225
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
+import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public class HDFSS3UploadClient extends AbstractCloudClient implements UploadClient<S3OutputConfig> {
+
+  private static final Logger logger = LogManager.getLogger(HDFSS3UploadClient.class);
+
+  private final S3OutputConfig s3OutputConfig;
+
+  private FileSystem fs;
+
+  public HDFSS3UploadClient(S3OutputConfig s3OutputConfig) {
+    this.s3OutputConfig = s3OutputConfig;
+  }
+
+  @Override
+  void createBucketIfNeeded(String bucket) {
+    logger.warn("HDFS based S3 client won't bootstrap default bucket ('{}')", s3OutputConfig.getBucket());
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProps) {
+    SecretKeyPair keyPair = getSecretKeyPair(logFeederProps, s3OutputConfig);
+    // TODO: load configuration from file
+    Configuration conf = LogFeederHDFSUtil.buildHdfsConfiguration(s3OutputConfig.getBucket(), "s3a");
+    conf.set("fs.s3a.access.key", new String(keyPair.getAccessKey()));
+    conf.set("fs.s3a.secret.key", new String(keyPair.getSecretKey()));
+    conf.set("fs.s3a.aws.credentials.provider", SimpleAWSCredentialsProvider.NAME);
+    conf.set("fs.s3a.endpoint", s3OutputConfig.getEndpoint());
+    conf.set("fs.s3a.path.style.access", String.valueOf(s3OutputConfig.isPathStyleAccess()));
+    conf.set("fs.s3a.multiobjectdelete.enable", String.valueOf(s3OutputConfig.isMultiobjectDeleteEnable()));
+    this.fs = LogFeederHDFSUtil.buildFileSystem(conf);
+  }
+
+  @Override
+  public void upload(String source, String target, String basePath) throws Exception {
+    LogFeederHDFSUtil.copyFromLocal(source, target, this.fs, true, true, null);
+  }
+
+  @Override
+  public S3OutputConfig getOutputConfig() {
+    return this.s3OutputConfig;
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.closeQuietly(fs);
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
new file mode 100644
index 0000000..0c4dce5
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
+import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * HDFS (on-prem) specific uploader client.
+ */
+public class HDFSUploadClient implements UploadClient<HdfsOutputConfig> {
+
+  private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
+
+  private final HdfsOutputConfig hdfsOutputConfig;
+  private final FsPermission fsPermission;
+  private FileSystem fs;
+
+  public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig) {
+    this.hdfsOutputConfig = hdfsOutputConfig;
+    this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProps) {
+    if (StringUtils.isNotBlank(hdfsOutputConfig.getHdfsUser())) {
+      System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getHdfsUser());
+    }
+    this.fs = LogFeederHDFSUtil.buildFileSystem(
+      hdfsOutputConfig.getHdfsHost(),
+      String.valueOf(hdfsOutputConfig.getHdfsPort()));
+    if (logFeederProps.getHdfsOutputConfig().isSecure()) {
+      Configuration conf = fs.getConf();
+      conf.set("hadoop.security.authentication", "kerberos");
+    }
+  }
+
+  @Override
+  public void upload(String source, String target, String basePath) throws Exception {
+    String outputPath = String.format("%s/%s", basePath, target).replaceAll("//", "/");
+    LogFeederHDFSUtil.copyFromLocal(source, outputPath, fs, true, true, fsPermission);
+  }
+
+  @Override
+  public HdfsOutputConfig getOutputConfig() {
+    return this.hdfsOutputConfig;
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeQuietly(fs);
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java
new file mode 100644
index 0000000..819a001
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.tools.ant.util.FileUtils;
+
+import java.io.File;
+
+/**
+ * S3 specific upload client
+ */
+public class S3UploadClient extends AbstractCloudClient implements UploadClient<S3OutputConfig> {
+
+  private static final Logger logger = LogManager.getLogger(S3UploadClient.class);
+
+  private final S3OutputConfig s3OutputConfig;
+  private final CannedAccessControlList acl;
+  private AmazonS3 s3Client;
+
+  public S3UploadClient(S3OutputConfig s3OutputConfig) {
+    this.s3OutputConfig = s3OutputConfig;
+    this.acl = s3OutputConfig.calculateAcls(s3OutputConfig.getObjectAcl());
+  }
+
+  @Override
+  public void init(LogFeederProps logFeederProps) {
+    SecretKeyPair keyPair = getSecretKeyPair(logFeederProps, s3OutputConfig);
+    AWSCredentials awsCredentials = new BasicAWSCredentials(new String(keyPair.getAccessKey()), new String(keyPair.getSecretKey()));
+    AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
+    AwsClientBuilder.EndpointConfiguration endpointConf = new AwsClientBuilder.EndpointConfiguration(s3OutputConfig.getEndpoint(), s3OutputConfig.getRegion());
+    s3Client = AmazonS3ClientBuilder.standard()
+      .withCredentials(credentialsProvider)
+      .withEndpointConfiguration(endpointConf)
+      .withPathStyleAccessEnabled(s3OutputConfig.isPathStyleAccess())
+      .build();
+    bootstrapBucket(s3OutputConfig.getOutputBasePath(), s3OutputConfig.getBucketConfig());
+  }
+
+  @Override
+  public void upload(String source, String target, String bucket) throws Exception {
+    File fileToUpload = new File(source);
+    logger.info("Starting S3 upload {} -> bucket: {}, key: {}", source, bucket, target);
+    s3Client.putObject(bucket, target, new File(source));
+    s3Client.setObjectAcl(bucket, target, acl);
+    FileUtils.delete(fileToUpload);
+  }
+
+  @Override
+  public S3OutputConfig getOutputConfig() {
+    return this.s3OutputConfig;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  void createBucketIfNeeded(String bucket) {
+    if (!s3Client.doesBucketExistV2(bucket)) {
+      s3Client.createBucket(bucket);
+    }
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/SecretKeyPair.java
similarity index 67%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/SecretKeyPair.java
index 871ae93..5321b81 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/SecretKeyPair.java
@@ -16,17 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ambari.logfeeder.output.cloud;
+package org.apache.ambari.logfeeder.output.cloud.upload;
 
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+final class SecretKeyPair {
+  private final char[] accessKey;
+  private final char[] secretKey;
 
-/**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
- */
-public class CloudStorageFactory {
+  SecretKeyPair(char[] accessKey, char[] secretKey) {
+    this.accessKey = accessKey;
+    this.secretKey = secretKey;
+  }
+
+  final char[] getAccessKey() {
+    return accessKey;
+  }
 
-  public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
-    return new HDFSOutput();
+  final char[] getSecretKey() {
+    return secretKey;
   }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java
new file mode 100644
index 0000000..d8adf68
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.OutputConfig;
+
+import java.io.Closeable;
+
+/**
+ * Client that is responsible to upload files to cloud storage implementation.
+ * @param <CONF_TYPE> specific cloud configuration type
+ */
+public interface UploadClient<CONF_TYPE extends OutputConfig> extends Closeable {
+
+  /**
+   * Initialize the client
+   * @param logFeederProps global properties holder
+   */
+  void init(LogFeederProps logFeederProps);
+
+  /**
+   * Upload source file to cloud storage location
+   * @param source file that will be uploaded
+   * @param target file key/output on cloud storage
+   * @param basePath act as a base directory or can be a bucket as well
+   * @throws Exception error during upload
+   */
+  void upload(String source, String target, String basePath) throws Exception;
+
+  /**
+   * Obtain cloud specific output configuration
+   * @return output configuration holder
+   */
+  CONF_TYPE getOutputConfig();
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
new file mode 100644
index 0000000..b86ec6d
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.CloudStorageDestination;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Factory class to create cloud specific data uploader client based on global Log Feeder settings.
+ */
+public class UploadClientFactory {
+
+  private static final Logger logger = LogManager.getLogger(UploadClientFactory.class);
+
+  /**
+   * Create a new cloud specific client that can upload data to cloud storage
+   * @param logFeederProps global settings
+   * @return created cloud specific uploader client
+   */
+  public static UploadClient createUploadClient(LogFeederProps logFeederProps) {
+    CloudStorageDestination destType = logFeederProps.getCloudStorageDestination();
+    logger.info("Creating upload client for storage: {}", destType);
+    if (CloudStorageDestination.HDFS.equals(destType)) {
+      return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+    } else if (CloudStorageDestination.S3.equals(destType)) {
+      if (logFeederProps.isUseCloudHdfsClient()) {
+        return new HDFSS3UploadClient(logFeederProps.getS3OutputConfig());
+      } else {
+        return new S3UploadClient(logFeederProps.getS3OutputConfig());
+      }
+    } else {
+      throw new IllegalArgumentException(String.format("No cloud storage type is selected as destination: %s", destType));
+    }
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
deleted file mode 100644
index 82a3f1b..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.util.DateUtil;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A class that manages local storage of log events before they are uploaded to the output destinations.
- *
- * This class should be used by any {@link org.apache.ambari.logfeeder.plugin.output.Output}s that wish to upload log files to an
- * output destination on a periodic batched basis. Log events should be added to an instance
- * of this class to be stored locally. This class determines when to
- * rollover using calls to an interface {@link RolloverCondition}. Likewise, it uses an interface
- * {@link RolloverHandler} to trigger the handling of the rolled over file.
- */
-public class LogSpooler {
-  
-  private static final Logger logger = LogManager.getLogger(LogSpooler.class);
-
-  private static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
-  private static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
-
-  private String spoolDirectory;
-  private String sourceFileNamePrefix;
-  private RolloverCondition rolloverCondition;
-  private RolloverHandler rolloverHandler;
-  private PrintWriter currentSpoolBufferedWriter;
-  private File currentSpoolFile;
-  private LogSpoolerContext currentSpoolerContext;
-  private Timer rolloverTimer;
-  private AtomicBoolean rolloverInProgress = new AtomicBoolean(false);
-
-  /**
-   * Create an instance of the LogSpooler.
-   * @param spoolDirectory The directory under which spooler files are created.
-   *                       Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
-   * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
-   * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
-   *                          determine when to rollover.
-   * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
-   *                        there should be a rollover.
-   */
-  public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
-                    RolloverHandler rolloverHandler) {
-    this(spoolDirectory, sourceFileNamePrefix, rolloverCondition, rolloverHandler,
-        TIME_BASED_ROLLOVER_DISABLED_THRESHOLD);
-  }
-
-  /**
-   * Create an instance of the LogSpooler.
-   * @param spoolDirectory The directory under which spooler files are created.
-   *                       Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
-   * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
-   * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
-   *                          determine when to rollover.
-   * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
-   *                        there should be a rollover.
-   * @param rolloverTimeThresholdSecs  Setting a non-zero value enables time based rollover of
-   *                                   spool files. Sending a 0 value disables this functionality.
-   */
-  public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
-                    RolloverHandler rolloverHandler, long rolloverTimeThresholdSecs) {
-    this.spoolDirectory = spoolDirectory;
-    this.sourceFileNamePrefix = sourceFileNamePrefix;
-    this.rolloverCondition = rolloverCondition;
-    this.rolloverHandler = rolloverHandler;
-    if (rolloverTimeThresholdSecs != TIME_BASED_ROLLOVER_DISABLED_THRESHOLD) {
-      rolloverTimer = new Timer("log-spooler-timer-" + sourceFileNamePrefix, true);
-      rolloverTimer.scheduleAtFixedRate(new LogSpoolerRolloverTimerTask(),
-          rolloverTimeThresholdSecs*1000, rolloverTimeThresholdSecs*1000);
-    }
-    initializeSpoolState();
-  }
-
-  private void initializeSpoolDirectory() {
-    File spoolDir = new File(spoolDirectory);
-    if (!spoolDir.exists()) {
-      logger.info("Creating spool directory: " + spoolDir);
-      boolean result = spoolDir.mkdirs();
-      if (!result) {
-        throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory);
-      }
-    }
-  }
-
-  private void initializeSpoolState() {
-    initializeSpoolDirectory();
-    currentSpoolFile = initializeSpoolFile();
-    try {
-      currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile);
-    } catch (IOException e) {
-      throw new LogSpoolerException("Could not create buffered writer for spool file: " + currentSpoolFile
-          + ", error message: " + e.getLocalizedMessage(), e);
-    }
-    currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
-    logger.info("Initialized spool file at path: " + currentSpoolFile);
-  }
-
-  @VisibleForTesting
-  protected File initializeSpoolFile() {
-    return new File(spoolDirectory, getCurrentFileName());
-  }
-
-  @VisibleForTesting
-  protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-    return new PrintWriter(new BufferedWriter(new FileWriter(spoolFile)));
-  }
-
-  /**
-   * Add an event for spooling.
-   *
-   * This method adds the event to the current spool file's buffer. On completion, it
-   * calls the {@link RolloverCondition#shouldRollover(LogSpoolerContext)} method to determine if
-   * it is ready to rollover the file.
-   * @param logEvent The log event to spool.
-   */
-  public synchronized void add(String logEvent) {
-    currentSpoolBufferedWriter.println(logEvent);
-    currentSpoolerContext.logEventSpooled();
-    if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
-      logger.info("Trying to rollover based on rollover condition");
-      tryRollover();
-    }
-  }
-
-  /**
-   * Trigger a rollover of the current spool file.
-   *
-   * This method manages the rollover of the spool file, and then invokes the
-   * {@link RolloverHandler#handleRollover(File)} to handle what should be done with the
-   * rolled over file.
-   */
-  public void rollover() {
-    logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
-    currentSpoolBufferedWriter.flush();
-    if (currentSpoolFile.length()==0) {
-      logger.info("No data in file " + currentSpoolFile + ", not doing rollover");
-    } else {
-      currentSpoolBufferedWriter.close();
-      rolloverHandler.handleRollover(currentSpoolFile);
-      logger.info("Invoked rollover handler with file: " + currentSpoolFile);
-      initializeSpoolState();
-    }
-    boolean status = rolloverInProgress.compareAndSet(true, false);
-    if (!status) {
-      logger.error("Should have reset rollover flag!!");
-    }
-  }
-
-  private synchronized void tryRollover() {
-    if (rolloverInProgress.compareAndSet(false, true)) {
-      rollover();
-    } else {
-      logger.warn("Ignoring rollover call as rollover already in progress for file " +
-          currentSpoolFile);
-    }
-  }
-
-  private String getCurrentFileName() {
-    Date currentDate = new Date();
-    String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
-    return sourceFileNamePrefix + dateStr;
-  }
-
-  /**
-   * Cancel's any time based rollover task, if started.
-   */
-  public void close() {
-    if (rolloverTimer != null) {
-      rolloverTimer.cancel();
-    }
-  }
-
-  private class LogSpoolerRolloverTimerTask extends TimerTask {
-    @Override
-    public void run() {
-      logger.info("Trying rollover based on time");
-      tryRollover();
-    }
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
deleted file mode 100644
index 616300f..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import java.io.File;
-import java.util.Date;
-
-/**
- * A class that holds the state of an spool file.
- *
- * The state in this class can be used by a {@link RolloverCondition} to determine
- * if an active spool file should be rolled over.
- */
-public class LogSpoolerContext {
-
-  private File activeSpoolFile;
-  private long numEventsSpooled;
-  private Date activeLogCreationTime;
-
-  /**
-   * Create a new LogSpoolerContext
-   * @param activeSpoolFile the spool file for which to hold state
-   */
-  public LogSpoolerContext(File activeSpoolFile) {
-    this.activeSpoolFile = activeSpoolFile;
-    this.numEventsSpooled = 0;
-    this.activeLogCreationTime = new Date();
-  }
-
-  /**
-   * Increment number of spooled events by one.
-   */
-  public void logEventSpooled() {
-    numEventsSpooled++;
-  }
-
-  public File getActiveSpoolFile() {
-    return activeSpoolFile;
-  }
-
-  public long getNumEventsSpooled() {
-    return numEventsSpooled;
-  }
-
-  public Date getActiveLogCreationTime() {
-    return activeLogCreationTime;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    LogSpoolerContext that = (LogSpoolerContext) o;
-
-    if (numEventsSpooled != that.numEventsSpooled) return false;
-    if (!activeSpoolFile.equals(that.activeSpoolFile)) return false;
-    return activeLogCreationTime.equals(that.activeLogCreationTime);
-
-  }
-
-  @Override
-  public int hashCode() {
-    int result = activeSpoolFile.hashCode();
-    result = 31 * result + (int) (numEventsSpooled ^ (numEventsSpooled >>> 32));
-    result = 31 * result + activeLogCreationTime.hashCode();
-    return result;
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
deleted file mode 100644
index 14bb139..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-public class LogSpoolerException extends RuntimeException {
-  public LogSpoolerException(String message, Exception cause) {
-    super(message, cause);
-  }
-
-  public LogSpoolerException(String message) {
-    super(message);
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
deleted file mode 100644
index 48ace11..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-/**
- * An interface that is used to determine whether a rollover of a locally spooled log file should be triggered.
- */
-public interface RolloverCondition {
-
-  /**
-   * Check if the active spool file should be rolled over.
-   *
-   * If this returns true, the {@link LogSpooler} will initiate activities related
-   * to rollover of the file
-   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
-   *                                                       for rollover.
-   * @return true if active spool file should be rolled over, false otherwise
-   */
-  boolean shouldRollover(LogSpoolerContext currentSpoolerContext);
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
deleted file mode 100644
index 2ec2708..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import java.io.File;
-
-/**
- * An interface that is used to trigger the handling of a rolled over file.
- *
- * Implementations of this interface will typically upload the rolled over file to
- * a target destination, like HDFS.
- */
-public interface RolloverHandler {
-  /**
-   * Handle a rolled over file.
-   *
-   * This method is called inline from the {@link LogSpooler#rollover()} method.
-   * Hence implementations should either complete the handling fast, or do so
-   * asynchronously. The cleanup of the file is left to implementors, but should
-   * typically be done once the upload the file to the target destination is complete.
-   * @param rolloverFile The file that has been rolled over.
-   */
-  void handleRollover(File rolloverFile);
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
index a225a12..61be819 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -33,36 +34,43 @@ public class LogFeederHDFSUtil {
     throw new UnsupportedOperationException();
   }
   
-  public static boolean copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
-      boolean delSrc) {
+  public static void copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
+                                      boolean delSrc, FsPermission fsPermission) throws Exception {
     Path src = new Path(sourceFilepath);
     Path dst = new Path(destFilePath);
-    boolean isCopied = false;
-    try {
-      logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
-      fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
-      isCopied = true;
-    } catch (Exception e) {
-      logger.error("Error copying local file :" + sourceFilepath + " to hdfs location : " + destFilePath, e);
+    logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
+    fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
+    if (fsPermission != null) {
+      fileSystem.setPermission(dst, fsPermission);
     }
-    return isCopied;
   }
 
   public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
+    return buildFileSystem(hdfsHost, hdfsPort, "hdfs");
+  }
+
+  public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort, String scheme) {
+    Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort, scheme);
+    return buildFileSystem(configuration);
+  }
+
+  public static FileSystem buildFileSystem(Configuration configuration) {
     try {
-      Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort);
-      FileSystem fs = FileSystem.get(configuration);
-      return fs;
+      return FileSystem.get(configuration);
     } catch (Exception e) {
-      logger.error("Exception is buildFileSystem :", e);
+      logger.error("Exception during buildFileSystem call:", e);
     }
     return null;
   }
 
-  private static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
-    String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
+  public static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort, String scheme) {
+    return buildHdfsConfiguration(String.format("%s:%s", hdfsHost, hdfsPort), scheme);
+  }
+
+  public static Configuration buildHdfsConfiguration(String address, String scheme) {
+    String url = String.format("%s://%s/", scheme, address);
     Configuration configuration = new Configuration();
-    configuration.set("fs.default.name", url);
+    configuration.set("fs.defaultFS", url);
     return configuration;
   }
 
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
deleted file mode 100644
index 1135eea..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.util;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.zip.GZIPInputStream;
-
-import io.minio.MinioClient;
-import io.minio.errors.InvalidEndpointException;
-import io.minio.errors.InvalidPortException;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * Utility to connect to s3
- */
-public class S3Util {
-  private static final Logger logger = LogManager.getLogger(S3Util.class);
-
-  private S3Util() {
-    throw new UnsupportedOperationException();
-  }
-  
-  public static MinioClient getS3Client(String endpoint, String accessKey, String secretKey) throws InvalidPortException, InvalidEndpointException {
-    return new MinioClient(endpoint, accessKey, secretKey);
-  }
-
-  public static String getBucketName(String s3Path) {
-    String bucketName = null;
-    // s3path
-    if (s3Path != null) {
-      String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
-      bucketName = s3PathParts[0];
-    }
-    return bucketName;
-  }
-
-  public static String getS3Key(String s3Path) {
-    StringBuilder s3Key = new StringBuilder();
-    if (s3Path != null) {
-      String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
-      ArrayList<String> s3PathList = new ArrayList<>(Arrays.asList(s3PathParts));
-      s3PathList.remove(0);// remove bucketName
-      for (int index = 0; index < s3PathList.size(); index++) {
-        if (index > 0) {
-          s3Key.append(LogFeederConstants.S3_PATH_SEPARATOR);
-        }
-        s3Key.append(s3PathList.get(index));
-      }
-    }
-    return s3Key.toString();
-  }
-
-  /**
-   * Get the buffer reader to read s3 file as a stream
-   * @param s3Path s3 specific path
-   * @param s3Endpoint url of an s3 server
-   * @param accessKey s3 access key - pass by an input shipper configuration
-   * @param secretKey s3 secret key - pass by an input shipper configuration
-   * @return buffered reader object which can be used to read s3 file object
-   * @throws Exception error that happens during reading s3 file
-   */
-  public static BufferedReader getReader(String s3Path, String s3Endpoint, String accessKey, String secretKey) throws Exception {
-    // TODO error handling
-    // Compression support
-    // read header and decide the compression(auto detection)
-    // For now hard-code GZIP compression
-    String s3Bucket = getBucketName(s3Path);
-    String s3Key = getS3Key(s3Path);
-    GZIPInputStream objectInputStream = null;
-    InputStreamReader inputStreamReader = null;
-    BufferedReader bufferedReader = null;
-    try {
-      MinioClient s3Client = getS3Client(s3Endpoint, accessKey, secretKey);
-      s3Client.statObject(s3Bucket, s3Key);
-      objectInputStream = new GZIPInputStream(s3Client.getObject(s3Bucket, s3Key));
-      inputStreamReader = new InputStreamReader(objectInputStream);
-      bufferedReader = new BufferedReader(inputStreamReader);
-      return bufferedReader;
-    } catch (Exception e) {
-      logger.error("Error in creating stream reader for s3 file :" + s3Path, e.getCause());
-      throw e;
-    } finally {
-      try {
-        if (inputStreamReader != null) {
-          inputStreamReader.close();
-        }
-        if (bufferedReader != null) {
-          bufferedReader.close();
-        }
-        if (objectInputStream != null) {
-          objectInputStream.close();
-        }
-      } catch (Exception e) {
-        // do nothing
-      }
-    }
-  }
-
-  public static void writeFileIntoS3File(String filename, String bucketName, String s3Key, String endpoint, String accessKey, String secretKey) {
-    try {
-      MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey);
-      s3Client.putObject(bucketName, s3Key, filename);
-    } catch (Exception e) {
-      logger.error("Could not write file to s3", e);
-    }
-  }
-
-  public static void writeDataIntoS3File(String data, String bucketName, String s3Key, String endpoint, String accessKey, String secretKey) {
-    try (ByteArrayInputStream bai = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) {
-      MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey);
-      s3Client.putObject(bucketName, s3Key, bai, bai.available(), "application/octet-stream");
-    } catch (Exception e) {
-      logger.error("Could not write data to s3", e);
-    }
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index 229a9b6..163aad3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -3,9 +3,6 @@
     "file": {
       "klass": "org.apache.ambari.logfeeder.input.InputFile"
     },
-    "s3_file": {
-      "klass": "org.apache.ambari.logfeeder.input.InputS3File"
-    },
     "simulate": {
       "klass": "org.apache.ambari.logfeeder.input.InputSimulate"
     },
@@ -53,12 +50,6 @@
     },
     "dev_null": {
       "klass": "org.apache.ambari.logfeeder.output.OutputDevNull"
-    },
-    "s3_file": {
-      "klass": "org.apache.ambari.logfeeder.output.OutputS3File"
-    },
-    "hdfs": {
-      "klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile"
     }
   }
 }
\ No newline at end of file
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 06c95f3..127e1b3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -39,5 +39,26 @@ logfeeder.tmp.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/tmp
 #logfeeder.configs.local.enabled=true
 #logfeeder.configs.filter.solr.enabled=true
 #logfeeder.docker.registry.enabled=true
-logfeeder.cloud.storage.mode=default
+
 #logfeeder.cloud.storage.mode=cloud
+logfeeder.cloud.storage.mode=default
+logfeeder.cloud.storage.destination=s3
+logfeeder.cloud.storage.uploader.interval.seconds=1
+logfeeder.cloud.storage.upload.on.shutdown=true
+logfeeder.cloud.storage.use.hdfs.client=true
+logfeeder.cloud.rollover.threshold.min=1000
+logfeeder.cloud.rollover.threshold.size=1K
+logfeeder.cloud.rollover.immediate.flush=true
+
+logfeeder.hdfs.host=c7401.ambari.apache.org
+logfeeder.hdfs.port=8020
+logfeeder.hdfs.user=hdfs
+logfeeder.hdfs.output.base.dir=/user/hdfs/logfeeder
+
+logfeeder.s3.endpoint=http://localhost:4569
+logfeeder.s3.secret.key=MySecretKey
+logfeeder.s3.access.key=MyAccessKey
+logfeeder.s3.bucket=logfeeder
+logfeeder.s3.object.acl=public-read
+logfeeder.s3.path.style.access=true
+logfeeder.s3.multiobject.delete.enable=false
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
deleted file mode 100644
index 6674be1..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class OutputS3FileTest {
-
-  private Map<String, Object> configMap;
-
-  @Before
-  public void setupConfiguration() {
-    configMap = new HashMap<>();
-    String[] configKeys = new String[] {
-        S3OutputConfiguration.SPOOL_DIR_KEY,
-        S3OutputConfiguration.S3_BUCKET_NAME_KEY,
-        S3OutputConfiguration.S3_LOG_DIR_KEY,
-        S3OutputConfiguration.S3_ACCESS_KEY,
-        S3OutputConfiguration.S3_SECRET_KEY,
-        S3OutputConfiguration.COMPRESSION_ALGO_KEY,
-        S3OutputConfiguration.ADDITIONAL_FIELDS_KEY
-    };
-    Map<String, String> additionalKeys = new HashMap<>();
-    additionalKeys.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
-    Object[] configValues = new Object[] {
-        "/var/ambari-logsearch/logfeeder",
-        "s3_bucket_name",
-        "logs",
-        "ABCDEFGHIJ1234",
-        "amdfbldkfdlf",
-        "gz",
-        additionalKeys
-    };
-    for (int i = 0; i < configKeys.length; i++) {
-      configMap.put(configKeys[i], configValues[i]);
-    }
-  }
-
-  @Test
-  public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
-
-    String thresholdSize = Long.toString(15 * 1024 * 1024L);
-    LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
-    File activeSpoolFile = mock(File.class);
-    expect(activeSpoolFile.length()).andReturn(20*1024*1024L);
-    expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
-    replay(logSpoolerContext, activeSpoolFile);
-
-    OutputS3File outputS3File = new OutputS3File();
-    configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
-    outputS3File.loadConfig(configMap);
-    outputS3File.init(new LogFeederProps());
-
-    assertTrue(outputS3File.shouldRollover(logSpoolerContext));
-  }
-
-  @Test
-  public void shouldNotRolloverBeforeSufficientSizeIsReached() throws Exception {
-    String thresholdSize = Long.toString(15 * 1024 * 1024L);
-    LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
-    File activeSpoolFile = mock(File.class);
-    expect(activeSpoolFile.length()).andReturn(10*1024*1024L);
-    expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
-    replay(logSpoolerContext, activeSpoolFile);
-
-    OutputS3File outputS3File = new OutputS3File();
-    configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
-    outputS3File.loadConfig(configMap);
-    outputS3File.init(new LogFeederProps());
-
-    assertFalse(outputS3File.shouldRollover(logSpoolerContext));
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
deleted file mode 100644
index d1376c4..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-
-public class S3LogPathResolverTest {
-
-  @Test
-  public void shouldResolveHostName() {
-    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$host", "filename.log", "cl1");
-    assertEquals("my_s3_path/" + LogFeederUtil.hostName + "/filename.log", resolvedPath);
-  }
-
-  @Test
-  public void shouldResolveIpAddress() {
-    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$ip", "filename.log", "cl1");
-    assertEquals("my_s3_path/" + LogFeederUtil.ipAddress + "/filename.log", resolvedPath);
-  }
-
-  @Test
-  public void shouldResolveCluster() {
-    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster", "filename.log", "cl1");
-    assertEquals("my_s3_path/cl1/filename.log", resolvedPath);
-  }
-
-  @Test
-  public void shouldResolveCombinations() {
-    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster/$host", "filename.log", "cl1");
-    assertEquals("my_s3_path/cl1/"+ LogFeederUtil.hostName + "/filename.log", resolvedPath);
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
deleted file mode 100644
index e070545..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.assertEquals;
-
-public class S3UploaderTest {
-
-  public static final String TEST_BUCKET = "test_bucket";
-  public static final String TEST_PATH = "test_path";
-  public static final String GZ = "gz";
-  public static final String LOG_TYPE = "hdfs_namenode";
-  public static final String ACCESS_KEY_VALUE = "accessKeyValue";
-  public static final String SECRET_KEY_VALUE = "secretKeyValue";
-
-  @Test
-  public void shouldUploadToS3ToRightBucket() {
-    File fileToUpload = mock(File.class);
-    String fileName = "hdfs_namenode.log.123343493473948";
-    expect(fileToUpload.getName()).andReturn(fileName);
-    final File compressedFile = mock(File.class);
-    Map<String, Object> configs = setupS3Configs();
-
-    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    expect(compressedFile.getAbsolutePath()).andReturn(TEST_BUCKET + "/" + LOG_TYPE + "/" +fileName);
-    expect(compressedFile.delete()).andReturn(true);
-    expect(fileToUpload.delete()).andReturn(true);
-    replay(fileToUpload, compressedFile);
-
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
-      @Override
-      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
-        return compressedFile;
-      }
-      @Override
-      protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
-      }
-    };
-    String resolvedPath = s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-
-    assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath);
-  }
-
-  @Test
-  public void shouldCleanupLocalFilesOnSuccessfulUpload() {
-    File fileToUpload = mock(File.class);
-    String fileName = "hdfs_namenode.log.123343493473948";
-    expect(fileToUpload.getName()).andReturn(fileName);
-    final File compressedFile = mock(File.class);
-    Map<String, Object> configs = setupS3Configs();
-
-    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    expect(compressedFile.delete()).andReturn(true);
-    expect(fileToUpload.delete()).andReturn(true);
-    replay(fileToUpload, compressedFile);
-
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
-      @Override
-      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
-        return compressedFile;
-      }
-
-      @Override
-      protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
-      }
-    };
-    s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-
-    verify(fileToUpload);
-    verify(compressedFile);
-  }
-
-  @Test
-  public void shouldNotCleanupUncompressedFileIfNotRequired() {
-    File fileToUpload = mock(File.class);
-    String fileName = "hdfs_namenode.log.123343493473948";
-    expect(fileToUpload.getName()).andReturn(fileName);
-    final File compressedFile = mock(File.class);
-    Map<String, Object> configs = setupS3Configs();
-
-    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    expect(compressedFile.delete()).andReturn(true);
-    replay(fileToUpload, compressedFile);
-
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, LOG_TYPE) {
-      @Override
-      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
-        return compressedFile;
-      }
-      @Override
-      protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
-      }
-    };
-    s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-
-    verify(fileToUpload);
-    verify(compressedFile);
-  }
-
-  @Test
-  public void shouldExpandVariablesInPath() {
-    File fileToUpload = mock(File.class);
-    String fileName = "hdfs_namenode.log.123343493473948";
-    expect(fileToUpload.getName()).andReturn(fileName);
-    final File compressedFile = mock(File.class);
-    Map<String, Object> configs = setupS3Configs();
-    configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, "$cluster/"+TEST_PATH);
-
-
-    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
-    expect(compressedFile.delete()).andReturn(true);
-    expect(fileToUpload.delete()).andReturn(true);
-    expect(compressedFile.getAbsolutePath()).andReturn(TEST_BUCKET + "/" + LOG_TYPE + "/" +fileName);
-    replay(fileToUpload, compressedFile);
-
-    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
-      @Override
-      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
-        return compressedFile;
-      }
-      @Override
-      protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
-      }
-    };
-    s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-  }
-
-  private Map<String, Object> setupS3Configs() {
-    Map<String, Object> configs = new HashMap<>();
-    configs.put(S3OutputConfiguration.S3_BUCKET_NAME_KEY, TEST_BUCKET);
-    configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, TEST_PATH);
-    configs.put(S3OutputConfiguration.S3_ACCESS_KEY, ACCESS_KEY_VALUE);
-    configs.put(S3OutputConfiguration.S3_SECRET_KEY, SECRET_KEY_VALUE);
-    configs.put(S3OutputConfiguration.COMPRESSION_ALGO_KEY, GZ);
-    Map<String, String> nameValueMap = new HashMap<>();
-    nameValueMap.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
-    configs.put(S3OutputConfiguration.ADDITIONAL_FIELDS_KEY, nameValueMap);
-    return configs;
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
deleted file mode 100644
index 2cfe9ff..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import org.easymock.EasyMockRule;
-import org.easymock.LogicalOperator;
-import org.easymock.Mock;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Comparator;
-
-import static org.easymock.EasyMock.*;
-
-public class LogSpoolerTest {
-
-  @Rule
-  public TemporaryFolder testFolder = new TemporaryFolder();
-
-  @Rule
-  public EasyMockRule mocks = new EasyMockRule(this);
-
-  private String spoolDirectory;
-  private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log";
-
-  @Mock
-  private RolloverCondition rolloverCondition;
-
-  @Mock
-  private RolloverHandler rolloverHandler;
-
-  @Before
-  public void setup() {
-    spoolDirectory = testFolder.getRoot().getAbsolutePath();
-  }
-
-  @Test
-  public void shouldSpoolEventToFile() {
-    final PrintWriter spoolWriter = mock(PrintWriter.class);
-    spoolWriter.println("log event");
-
-    final File mockFile = setupInputFileExpectations();
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
-        andReturn(false);
-
-    replay(spoolWriter, rolloverCondition, mockFile);
-
-    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
-        rolloverCondition, rolloverHandler) {
-      @Override
-      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-        return spoolWriter;
-      }
-
-      @Override
-      protected File initializeSpoolFile() {
-        return mockFile;
-      }
-    };
-    logSpooler.add("log event");
-
-    verify(spoolWriter);
-  }
-
-  private File setupInputFileExpectations() {
-    final File mockFile = mock(File.class);
-    expect(mockFile.length()).andReturn(10240L);
-    return mockFile;
-  }
-
-  @Test
-  public void shouldIncrementSpooledEventsCount() {
-
-    final PrintWriter spoolWriter = mock(PrintWriter.class);
-    spoolWriter.println("log event");
-
-    final File mockFile = setupInputFileExpectations();
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
-    logSpoolerContext.logEventSpooled();
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))).
-        andReturn(false);
-
-    replay(spoolWriter, rolloverCondition, mockFile);
-
-    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
-        rolloverCondition, rolloverHandler) {
-      @Override
-      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-        return spoolWriter;
-      }
-
-      @Override
-      protected File initializeSpoolFile() {
-        return mockFile;
-      }
-    };
-    logSpooler.add("log event");
-
-    verify(rolloverCondition);
-  }
-
-  @Test
-  public void shouldCloseCurrentSpoolFileOnRollOver() {
-    final PrintWriter spoolWriter = mock(PrintWriter.class);
-    spoolWriter.println("log event");
-    spoolWriter.flush();
-    spoolWriter.close();
-
-    final File mockFile = setupInputFileExpectations();
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
-        andReturn(true);
-    rolloverHandler.handleRollover(mockFile);
-
-    replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
-
-    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
-        rolloverCondition, rolloverHandler) {
-
-      @Override
-      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-        return spoolWriter;
-      }
-
-      @Override
-      protected File initializeSpoolFile() {
-        return mockFile;
-      }
-    };
-    logSpooler.add("log event");
-
-    verify(spoolWriter);
-  }
-
-  @Test
-  public void shouldReinitializeFileOnRollover() {
-    final PrintWriter spoolWriter1 = mock(PrintWriter.class);
-    final PrintWriter spoolWriter2 = mock(PrintWriter.class);
-    spoolWriter1.println("log event1");
-    spoolWriter2.println("log event2");
-    spoolWriter1.flush();
-    spoolWriter1.close();
-
-    final File mockFile1 = setupInputFileExpectations();
-    final File mockFile2 = setupInputFileExpectations();
-
-    LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
-    ).andReturn(true);
-
-    LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
-    ).andReturn(false);
-
-    rolloverHandler.handleRollover(mockFile1);
-
-    replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2);
-
-    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
-        rolloverCondition, rolloverHandler) {
-      private boolean wasRolledOver;
-
-      @Override
-      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-        if (!wasRolledOver) {
-          wasRolledOver = true;
-          return spoolWriter1;
-        } else {
-          return spoolWriter2;
-        }
-      }
-
-      @Override
-      protected File initializeSpoolFile() {
-        if (!wasRolledOver) {
-          return mockFile1;
-        } else {
-          return mockFile2;
-        }
-      }
-    };
-    logSpooler.add("log event1");
-    logSpooler.add("log event2");
-
-    verify(spoolWriter1, spoolWriter2, rolloverCondition);
-  }
-
-  @Test
-  public void shouldCallRolloverHandlerOnRollover() {
-    final PrintWriter spoolWriter = mock(PrintWriter.class);
-    spoolWriter.println("log event");
-    spoolWriter.flush();
-    spoolWriter.close();
-
-    final File mockFile = setupInputFileExpectations();
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
-    ).andReturn(true);
-    rolloverHandler.handleRollover(mockFile);
-
-    replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
-
-    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
-        rolloverCondition, rolloverHandler) {
-
-      @Override
-      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-        return spoolWriter;
-      }
-
-      @Override
-      protected File initializeSpoolFile() {
-        return mockFile;
-      }
-    };
-    logSpooler.add("log event");
-
-    verify(rolloverHandler);
-  }
-
-  // Rollover twice - the second rollover should work if the "rolloverInProgress"
-  // flag is being reset correctly. Third file expectations being setup due
-  // to auto-initialization.
-  @Test
-  public void shouldResetRolloverInProgressFlag() {
-    final PrintWriter spoolWriter1 = mock(PrintWriter.class);
-    final PrintWriter spoolWriter2 = mock(PrintWriter.class);
-    final PrintWriter spoolWriter3 = mock(PrintWriter.class);
-    spoolWriter1.println("log event1");
-    spoolWriter2.println("log event2");
-    spoolWriter1.flush();
-    spoolWriter1.close();
-    spoolWriter2.flush();
-    spoolWriter2.close();
-
-    final File mockFile1 = setupInputFileExpectations();
-    final File mockFile2 = setupInputFileExpectations();
-    final File mockFile3 = setupInputFileExpectations();
-
-    LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
-    ).andReturn(true);
-
-    LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
-    ).andReturn(true);
-
-    rolloverHandler.handleRollover(mockFile1);
-    rolloverHandler.handleRollover(mockFile2);
-
-    replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2, mockFile3);
-
-    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
-        rolloverCondition, rolloverHandler) {
-      private int currentFileNum;
-
-      @Override
-      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-        PrintWriter spoolWriter = null;
-        switch (currentFileNum) {
-          case 0:
-            spoolWriter = spoolWriter1;
-            break;
-          case 1:
-            spoolWriter = spoolWriter2;
-            break;
-          case 2:
-            spoolWriter = spoolWriter3;
-            break;
-        }
-        currentFileNum++;
-        return spoolWriter;
-      }
-
-      @Override
-      protected File initializeSpoolFile() {
-        switch (currentFileNum) {
-          case 0:
-            return mockFile1;
-          case 1:
-            return mockFile2;
-          case 2:
-            return mockFile3;
-          default:
-            return null;
-        }
-      }
-    };
-    logSpooler.add("log event1");
-    logSpooler.add("log event2");
-
-    verify(spoolWriter1, spoolWriter2, rolloverCondition);
-  }
-
-  @Test
-  public void shouldNotRolloverZeroLengthFiles() {
-    final PrintWriter spoolWriter = mock(PrintWriter.class);
-    spoolWriter.println("log event");
-    spoolWriter.flush();
-    spoolWriter.close();
-
-    final File mockFile = mock(File.class);
-    expect(mockFile.length()).andReturn(0L);
-
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
-    expect(rolloverCondition.shouldRollover(
-        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
-        andReturn(true);
-
-    replay(spoolWriter, rolloverCondition, mockFile);
-
-    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
-        rolloverCondition, rolloverHandler) {
-
-      @Override
-      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
-        return spoolWriter;
-      }
-
-      @Override
-      protected File initializeSpoolFile() {
-        return mockFile;
-      }
-    };
-    logSpooler.add("log event");
-
-    verify(mockFile);
-  }
-
-  class LogSpoolerFileComparator implements Comparator<LogSpoolerContext> {
-    @Override
-    public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
-      return o1.getActiveSpoolFile()==o2.getActiveSpoolFile() ? 0 : -1;
-    }
-  }
-
-  class LogSpoolerEventCountComparator implements Comparator<LogSpoolerContext> {
-    @Override
-    public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
-      return (int)(o1.getNumEventsSpooled()-o2.getNumEventsSpooled());
-    }
-  }
-
-}
diff --git a/ambari-logsearch-server/pom.xml b/ambari-logsearch-server/pom.xml
index 3349482..3857f35 100755
--- a/ambari-logsearch-server/pom.xml
+++ b/ambari-logsearch-server/pom.xml
@@ -442,7 +442,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-auth</artifactId>
-      <version>3.0.0</version>
+      <version>${hadoop.version}</version>
     </dependency>
     <dependency>
       <groupId>commons-io</groupId>
diff --git a/docker/cloud-docker-compose.yml b/docker/cloud-docker-compose.yml
index 3a9ec05..ae0c8b3 100644
--- a/docker/cloud-docker-compose.yml
+++ b/docker/cloud-docker-compose.yml
@@ -120,6 +120,9 @@ services:
     image: flokkr/hadoop-hdfs-datanode:${HADOOP_VERSION:-3.0.0}
     links:
       - namenode
+    ports:
+      - 61890:61890
+      - 5007:5007
     env_file:
       - Profile
     networks:
diff --git a/docker/test-config/logfeeder/logfeeder.properties b/docker/test-config/logfeeder/logfeeder.properties
index ffdb061..aed8337 100644
--- a/docker/test-config/logfeeder/logfeeder.properties
+++ b/docker/test-config/logfeeder/logfeeder.properties
@@ -35,4 +35,15 @@ logfeeder.solr.core.config.name=history
 #logfeeder.configs.local.enabled=true
 #logfeeder.configs.filter.solr.enabled=true
 #logfeeder.configs.filter.zk.enabled=true
-#logfeeder.cloud.storage.mode=hybrid
\ No newline at end of file
+logfeeder.cloud.storage.mode=default
+logfeeder.cloud.storage.destination=s3
+logfeeder.cloud.rollover.threshold.min=1000
+logfeeder.cloud.rollover.threshold.size=1K
+
+logfeeder.s3.endpoint=http://fakes3:4569
+logfeeder.s3.secret.key=MySecretKey
+logfeeder.s3.access.key=MyAccessKey
+logfeeder.s3.bucket=logfeeder
+logfeeder.s3.object.acl=public-read
+logfeeder.s3.path.style.access=true
+logfeeder.s3.multiobject.delete.enable=false
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a5e4ef2..91e5b09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,7 +84,7 @@
     <deb.architecture>amd64</deb.architecture>
     <deb.dependency.list>${deb.python.ver}</deb.dependency.list>
     <solr.version>7.5.0</solr.version>
-    <hadoop.version>3.0.0</hadoop.version>
+    <hadoop.version>3.1.1</hadoop.version>
     <common.io.version>2.5</common.io.version>
     <zookeeper.version>3.4.6.2.3.0.0-2557</zookeeper.version>
     <forkCount>4</forkCount>
@@ -95,6 +95,7 @@
     <ambari-metrics.version>2.7.0.0.0</ambari-metrics.version>
     <logsearch.docker.tag>latest</logsearch.docker.tag>
     <fasterxml-jackson.version>2.9.5</fasterxml-jackson.version>
+    <log4j2.version>2.11.1</log4j2.version>
   </properties>
 
   <licenses>
@@ -395,17 +396,17 @@
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>
         <artifactId>log4j-api</artifactId>
-        <version>2.10.0</version>
+        <version>${log4j2.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>
         <artifactId>log4j-core</artifactId>
-        <version>2.10.0</version>
+        <version>${log4j2.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>
         <artifactId>log4j-jcl</artifactId>
-        <version>2.10.0</version>
+        <version>${log4j2.version}</version>
       </dependency>
       <dependency>
         <groupId>commons-fileupload</groupId>