You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/10 19:35:06 UTC

[1/3] ambari git commit: AMBARI-18198. Doc updates about ldap sync related properties (oleewere)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 925448d0c -> 26e5fe0a9


AMBARI-18198. Doc updates about ldap sync related properties (oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1d9aa654
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1d9aa654
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1d9aa654

Branch: refs/heads/branch-2.5
Commit: 1d9aa654d9778736f9c02b13936e54c983c7a9b2
Parents: 925448d
Author: oleewere <ol...@gmail.com>
Authored: Thu Aug 18 17:22:50 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Sat Sep 10 21:20:45 2016 +0200

----------------------------------------------------------------------
 ambari-server/docs/configuration/index.md       |  8 +++----
 .../server/configuration/Configuration.java     | 24 ++++++++++++--------
 2 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1d9aa654/ambari-server/docs/configuration/index.md
----------------------------------------------------------------------
diff --git a/ambari-server/docs/configuration/index.md b/ambari-server/docs/configuration/index.md
index 9cdfc4d..3c3f3d0 100644
--- a/ambari-server/docs/configuration/index.md
+++ b/ambari-server/docs/configuration/index.md
@@ -85,10 +85,10 @@ The following are the properties which can be used to configure Ambari.
 | authentication.ldap.primaryUrl | The LDAP URL used for connecting to an LDAP server when authenticating users. This should include both the host name and port. |`localhost:33389` | 
 | authentication.ldap.referral | Determines whether to follow LDAP referrals to other URLs when the LDAP controller doesn't have the requested object. |`follow` | 
 | authentication.ldap.secondaryUrl | A second LDAP URL to use as a backup when authenticating users. This should include both the host name and port. | | 
-| authentication.ldap.sync.groupMemberFilter | The default filter to use for syncing member from LDAP. | | 
-| authentication.ldap.sync.groupMemberReplacePattern | The default regex pattern to use when replacing the group member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID<br/><br/>The following are examples of valid values:<ul><li>``${member}``</ul> | | 
-| authentication.ldap.sync.userMemberFilter | The default filter to use for syncing users from LDAP. | | 
-| authentication.ldap.sync.userMemberReplacePattern | The default regex pattern to use when replacing the user member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID<br/><br/>The following are examples of valid values:<ul><li>``${member}``</ul> | | 
+| authentication.ldap.sync.groupMemberFilter | Filter to use for syncing group members of a group from LDAP. (by default it is not used)<br/><br/>The following are examples of valid values:<ul><li>`(&(objectclass=posixgroup)(cn={member}))`</ul> | | 
+| authentication.ldap.sync.groupMemberReplacePattern | Regex pattern to use when replacing the group member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID (e.g.: `member: <SID=123>;<GID=123>;cn=myCn,dc=org,dc=apache`)<br/><br/>The following are examples of valid values:<ul><li>`(?<sid>.*);(?<guid>.*);(?<member>.*)`</ul> | | 
+| authentication.ldap.sync.userMemberFilter | Filter to use for syncing user members of a group from LDAP (by default it is not used).<br/><br/>The following are examples of valid values:<ul><li>`(&(objectclass=posixaccount)(uid={member}))`</ul> | | 
+| authentication.ldap.sync.userMemberReplacePattern | Regex pattern to use when replacing the user member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID (e.g.: `member: <SID=123>;<GID=123>;cn=myCn,dc=org,dc=apache`)<br/><br/>The following are examples of valid values:<ul><li>`(?<sid>.*);(?<guid>.*);(?<member>.*)`</ul> | | 
 | authentication.ldap.useSSL | Determines whether to use LDAP over SSL (LDAPS). |`false` | 
 | authentication.ldap.userBase | The filter used when searching for users in LDAP. |`ou=people,dc=ambari,dc=apache,dc=org` | 
 | authentication.ldap.userObjectClass | The class to which user objects in LDAP belong. |`person` | 

http://git-wip-us.apache.org/repos/asf/ambari/blob/1d9aa654/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 500809f..6b89681 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -1055,41 +1055,45 @@ public class Configuration {
       "authentication.ldap.pagination.enabled", "true");
 
   /**
-   * The default regex pattern to use when replacing the user member attribute
+   * Regex pattern to use when replacing the user member attribute
    * ID value with a placeholder. This is used in cases where a UID of an LDAP
    * member is not a full CN or unique ID.
    */
   @Markdown(
-      description = "The default regex pattern to use when replacing the user member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID",
-      examples = { "`${member}`" })
+      description = "Regex pattern to use when replacing the user member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID (e.g.: `member: <SID=123>;<GID=123>;cn=myCn,dc=org,dc=apache`)",
+      examples = { "(?<sid>.*);(?<guid>.*);(?<member>.*)" })
   public static final ConfigurationProperty<String> LDAP_SYNC_USER_MEMBER_REPLACE_PATTERN = new ConfigurationProperty<>(
       "authentication.ldap.sync.userMemberReplacePattern",
       LDAP_SYNC_MEMBER_REPLACE_PATTERN_DEFAULT);
 
   /**
-   * The default regex pattern to use when replacing the group member attribute
+   * Regex pattern to use when replacing the group member attribute
    * ID value with a placeholder. This is used in cases where a UID of an LDAP
    * member is not a full CN or unique ID.
    */
   @Markdown(
-      description = "The default regex pattern to use when replacing the group member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID",
-      examples = { "`${member}`" })
+      description = "Regex pattern to use when replacing the group member attribute ID value with a placeholder. This is used in cases where a UID of an LDAP member is not a full CN or unique ID (e.g.: `member: <SID=123>;<GID=123>;cn=myCn,dc=org,dc=apache`)",
+      examples = { "(?<sid>.*);(?<guid>.*);(?<member>.*)" })
   public static final ConfigurationProperty<String> LDAP_SYCN_GROUP_MEMBER_REPLACE_PATTERN = new ConfigurationProperty<>(
       "authentication.ldap.sync.groupMemberReplacePattern",
       LDAP_SYNC_MEMBER_REPLACE_PATTERN_DEFAULT);
 
   /**
-   * The default filter to use for syncing users from LDAP.
+   * Filter to use for syncing user members of group from LDAP. (by default it is not used)
    */
-  @Markdown(description = "The default filter to use for syncing users from LDAP.")
+  @Markdown(
+    description = "Filter to use for syncing user members of a group from LDAP (by default it is not used).",
+    examples = {"(&(objectclass=posixaccount)(uid={member}))"})
   public static final ConfigurationProperty<String> LDAP_SYNC_USER_MEMBER_FILTER = new ConfigurationProperty<>(
       "authentication.ldap.sync.userMemberFilter",
       LDAP_SYNC_MEMBER_FILTER_DEFAULT);
 
   /**
-   * The default filter to use for syncing member from LDAP.
+   * Filter to use for syncing group members of a group from LDAP. (by default it is not used)
    */
-  @Markdown(description = "The default filter to use for syncing member from LDAP.")
+  @Markdown(
+    description = "Filter to use for syncing group members of a group from LDAP. (by default it is not used)",
+    examples = {"(&(objectclass=posixgroup)(cn={member}))"})
   public static final ConfigurationProperty<String> LDAP_SYNC_GROUP_MEMBER_FILTER = new ConfigurationProperty<>(
       "authentication.ldap.sync.groupMemberFilter",
       LDAP_SYNC_MEMBER_FILTER_DEFAULT);


[2/3] ambari git commit: AMBARI-17788. Refactor spooler code in OutputHDFSFile to be reusable for OutputS3File (Hemanth Yamijala via oleewere)

Posted by ol...@apache.org.
AMBARI-17788. Refactor spooler code in OutputHDFSFile to be reusable for OutputS3File (Hemanth Yamijala via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/23bd337f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/23bd337f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/23bd337f

Branch: refs/heads/branch-2.5
Commit: 23bd337fb154049d4d3b9da9c16a66add89cf6a1
Parents: 1d9aa65
Author: oleewere <ol...@gmail.com>
Authored: Fri Aug 5 14:15:48 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Sat Sep 10 21:27:25 2016 +0200

----------------------------------------------------------------------
 .../ambari/logfeeder/output/OutputHDFSFile.java | 154 ++++-------
 .../logfeeder/output/spool/LogSpooler.java      | 137 ++++++++++
 .../output/spool/LogSpoolerContext.java         |  85 ++++++
 .../output/spool/LogSpoolerException.java       |  29 +++
 .../output/spool/RolloverCondition.java         |  36 +++
 .../logfeeder/output/spool/RolloverHandler.java |  40 +++
 .../logfeeder/output/spool/LogSpoolerTest.java  | 258 +++++++++++++++++++
 7 files changed, 640 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index 87cc0eb..f711a5f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -19,28 +19,32 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.util.DateUtil;
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
+import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
+import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
 import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
 import org.apache.ambari.logfeeder.util.PlaceholderUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
-public class OutputHDFSFile extends Output {
+import java.io.File;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * An {@link Output} that records logs to HDFS.
+ *
+ * The events are spooled on the local file system and uploaded in batches asynchronously.
+ */
+public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition {
   private final static Logger logger = Logger.getLogger(OutputHDFSFile.class);
+  private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
 
   private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
 
@@ -48,21 +52,15 @@ public class OutputHDFSFile extends Output {
 
   private Thread hdfsCopyThread = null;
 
-  private PrintWriter outWriter = null;
-  // local writer variables
-  private String localFilePath = null;
   private String filenamePrefix = "service-logs-";
-  private String localFileDir = null;
-  private File localcurrentFile = null;
-  private Date localFileCreateTime = null;
-  private long localFileRolloverSec = 5 * 1 * 60;// 5 min by default
+  private long rolloverThresholdTimeMillis;
 
   private String hdfsOutDir = null;
   private String hdfsHost = null;
   private String hdfsPort = null;
   private FileSystem fileSystem = null;
 
-  private String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+  private LogSpooler logSpooler;
 
   @Override
   public void init() throws Exception {
@@ -70,7 +68,8 @@ public class OutputHDFSFile extends Output {
     hdfsOutDir = getStringValue("hdfs_out_dir");
     hdfsHost = getStringValue("hdfs_host");
     hdfsPort = getStringValue("hdfs_port");
-    localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec);
+    long rolloverThresholdTimeSeconds = getLongValue("rollover_sec", DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS);
+    rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
     filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
     if (StringUtils.isEmpty(hdfsOutDir)) {
       logger
@@ -90,23 +89,15 @@ public class OutputHDFSFile extends Output {
     HashMap<String, String> contextParam = buildContextParam();
     hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
     logger.info("hdfs Output dir=" + hdfsOutDir);
-    localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
-    localFilePath = localFileDir;
+    String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
+    logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
     this.startHDFSCopyThread();
   }
 
   @Override
   public void close() {
     logger.info("Closing file." + getShortDescription());
-    if (outWriter != null) {
-      try {
-        outWriter.flush();
-        outWriter.close();
-        addFileInReadyList(localcurrentFile);
-      } catch (Throwable t) {
-        logger.error(t.getLocalizedMessage(),t);
-      }
-    }
+    logSpooler.rollover();
     this.stopHDFSCopyThread();
     isClosed = true;
   }
@@ -115,12 +106,8 @@ public class OutputHDFSFile extends Output {
   synchronized public void write(String block, InputMarker inputMarker)
       throws Exception {
     if (block != null) {
-      buildOutWriter();
-      if (outWriter != null) {
-        statMetric.count++;
-        outWriter.println(block);
-        closeFileIfNeeded();
-      }
+      logSpooler.add(block);
+      statMetric.count++;
     }
   }
 
@@ -130,59 +117,6 @@ public class OutputHDFSFile extends Output {
     return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
   }
 
-  private synchronized void closeFileIfNeeded() throws FileNotFoundException,
-      IOException {
-    if (outWriter == null) {
-      return;
-    }
-    // TODO: Close the file on absolute time. Currently it is implemented as
-    // relative time
-    if (System.currentTimeMillis() - localFileCreateTime.getTime() > localFileRolloverSec * 1000) {
-      logger.info("Closing file. Rolling over. name="
-          + localcurrentFile.getName() + ", filePath="
-          + localcurrentFile.getAbsolutePath());
-      try {
-        outWriter.flush();
-        outWriter.close();
-        addFileInReadyList(localcurrentFile);
-      } catch (Throwable t) {
-        logger
-            .error("Error on closing output writter. Exception will be ignored. name="
-                + localcurrentFile.getName()
-                + ", filePath="
-                + localcurrentFile.getAbsolutePath());
-      }
-
-      outWriter = null;
-      localcurrentFile = null;
-    }
-  }
-
-  private synchronized void buildOutWriter() {
-    if (outWriter == null) {
-      String currentFilePath = localFilePath + getCurrentFileName();
-      localcurrentFile = new File(currentFilePath);
-      if (localcurrentFile.getParentFile() != null) {
-        File parentDir = localcurrentFile.getParentFile();
-        if (!parentDir.isDirectory()) {
-          parentDir.mkdirs();
-        }
-      }
-      try {
-        outWriter = new PrintWriter(new BufferedWriter(new FileWriter(
-            localcurrentFile, true)));
-      } catch (IOException e) {
-        logger.error("= OutputHDFSFile.buidOutWriter failed for file :  "
-            + localcurrentFile.getAbsolutePath() + " Desc: "
-            + getShortDescription() + " errorMsg: " + e.getLocalizedMessage(),
-            e);
-      }
-      localFileCreateTime = new Date();
-      logger.info("Create file is successful. localFilePath="
-          + localcurrentFile.getAbsolutePath());
-    }
-  }
-
   private void startHDFSCopyThread() {
 
     hdfsCopyThread = new Thread("hdfsCopyThread") {
@@ -261,13 +195,6 @@ public class OutputHDFSFile extends Output {
     }
   }
 
-  private String getCurrentFileName() {
-    Date currentDate = new Date();
-    String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
-    String fileName = filenamePrefix + dateStr;
-    return fileName;
-  }
-
   private HashMap<String, String> buildContextParam() {
     HashMap<String, String> contextParam = new HashMap<String, String>();
     contextParam.put("host", LogFeederUtil.hostName);
@@ -291,4 +218,33 @@ public class OutputHDFSFile extends Output {
     throw new UnsupportedOperationException(
         "copyFile method is not yet supported for output=hdfs");     
   }
+
+  /**
+   * Add the rollover file to a daemon thread for uploading to HDFS
+   * @param rolloverFile the file to be uploaded to HDFS
+   */
+  @Override
+  public void handleRollover(File rolloverFile) {
+    addFileInReadyList(rolloverFile);
+  }
+
+  /**
+   * Determines whether it is time to handleRollover the current spool file.
+   *
+   * The file will handleRollover if the time since creation of the file is more than
+   * the timeout specified in rollover_sec configuration.
+   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state of active Spool file
+   * @return true if time since creation is greater than value specified in rollover_sec,
+   *          false otherwise.
+   */
+  @Override
+  public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
+    long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
+    boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
+    if (shouldRollover) {
+      logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
+                    " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
+    }
+    return shouldRollover;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
new file mode 100644
index 0000000..306326a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.util.DateUtil;
+import org.apache.log4j.Logger;
+
+import java.io.*;
+import java.util.Date;
+
+/**
+ * A class that manages local storage of log events before they are uploaded to the output destinations.
+ *
+ * This class should be used by any {@link Output}s that wish to upload log files to an
+ * output destination on a periodic batched basis. Log events should be added to an instance
+ * of this class to be stored locally. This class determines when to
+ * rollover using calls to an interface {@link RolloverCondition}. Likewise, it uses an interface
+ * {@link RolloverHandler} to trigger the handling of the rolled over file.
+ */
+public class LogSpooler {
+  static private Logger logger = Logger.getLogger(LogSpooler.class);
+  static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+
+  private String spoolDirectory;
+  private String sourceFileNamePrefix;
+  private RolloverCondition rolloverCondition;
+  private RolloverHandler rolloverHandler;
+  private PrintWriter currentSpoolBufferedWriter;
+  private File currentSpoolFile;
+  private LogSpoolerContext currentSpoolerContext;
+
+  /**
+   * Create an instance of the LogSpooler.
+   * @param spoolDirectory The directory under which spooler files are created.
+   *                       Should be unique per instance of {@link Output}
+   * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
+   * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
+   *                          determine when to rollover.
+   * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
+   *                        there should be a rollover.
+   */
+  public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
+                    RolloverHandler rolloverHandler) {
+    this.spoolDirectory = spoolDirectory;
+    this.sourceFileNamePrefix = sourceFileNamePrefix;
+    this.rolloverCondition = rolloverCondition;
+    this.rolloverHandler = rolloverHandler;
+    initializeSpoolFile();
+  }
+
+  private void initializeSpoolDirectory() {
+    File spoolDir = new File(spoolDirectory);
+    if (!spoolDir.exists()) {
+      logger.info("Creating spool directory: " + spoolDir);
+      boolean result = spoolDir.mkdirs();
+      if (!result) {
+        throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory);
+      }
+    }
+  }
+
+  private void initializeSpoolFile() {
+    initializeSpoolDirectory();
+    currentSpoolFile = new File(spoolDirectory, getCurrentFileName());
+    try {
+      currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile);
+    } catch (IOException e) {
+      throw new LogSpoolerException("Could not create buffered writer for spool file: " + currentSpoolFile
+          + ", error message: " + e.getLocalizedMessage(), e);
+    }
+    currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
+    logger.info("Initialized spool file at path: " + currentSpoolFile.getAbsolutePath());
+  }
+
+  @VisibleForTesting
+  protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+    return new PrintWriter(new BufferedWriter(new FileWriter(spoolFile)));
+  }
+
+  /**
+   * Add an event for spooling.
+   *
+   * This method adds the event to the current spool file's buffer. On completion, it
+   * calls the {@link RolloverCondition#shouldRollover(LogSpoolerContext)} method to determine if
+   * it is ready to rollover the file.
+   * @param logEvent The log event to spool.
+   */
+  public void add(String logEvent) {
+    currentSpoolBufferedWriter.println(logEvent);
+    currentSpoolerContext.logEventSpooled();
+    if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
+      rollover();
+    }
+  }
+
+  /**
+   * Trigger a rollover of the current spool file.
+   *
+   * This method manages the rollover of the spool file, and then invokes the
+   * {@link RolloverHandler#handleRollover(File)} to handle what should be done with the
+   * rolled over file.
+   */
+  public void rollover() {
+    logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
+    currentSpoolBufferedWriter.flush();
+    currentSpoolBufferedWriter.close();
+    rolloverHandler.handleRollover(currentSpoolFile);
+    logger.info("Invoked rollover handler with file: " + currentSpoolFile);
+    initializeSpoolFile();
+  }
+
+  @VisibleForTesting
+  protected String getCurrentFileName() {
+    Date currentDate = new Date();
+    String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
+    return sourceFileNamePrefix + dateStr;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
new file mode 100644
index 0000000..084d6a2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import java.io.File;
+import java.util.Date;
+
+/**
+ * A class that holds the state of an spool file.
+ *
+ * The state in this class can be used by a {@link RolloverCondition} to determine
+ * if an active spool file should be rolled over.
+ */
+public class LogSpoolerContext {
+
+  private File activeSpoolFile;
+  private long numEventsSpooled;
+  private Date activeLogCreationTime;
+
+  /**
+   * Create a new LogSpoolerContext
+   * @param activeSpoolFile the spool file for which to hold state
+   */
+  public LogSpoolerContext(File activeSpoolFile) {
+    this.activeSpoolFile = activeSpoolFile;
+    this.numEventsSpooled = 0;
+    this.activeLogCreationTime = new Date();
+  }
+
+  /**
+   * Increment number of spooled events by one.
+   */
+  public void logEventSpooled() {
+    numEventsSpooled++;
+  }
+
+  public File getActiveSpoolFile() {
+    return activeSpoolFile;
+  }
+
+  public long getNumEventsSpooled() {
+    return numEventsSpooled;
+  }
+
+  public Date getActiveLogCreationTime() {
+    return activeLogCreationTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    LogSpoolerContext that = (LogSpoolerContext) o;
+
+    if (numEventsSpooled != that.numEventsSpooled) return false;
+    if (!activeSpoolFile.equals(that.activeSpoolFile)) return false;
+    return activeLogCreationTime.equals(that.activeLogCreationTime);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = activeSpoolFile.hashCode();
+    result = 31 * result + (int) (numEventsSpooled ^ (numEventsSpooled >>> 32));
+    result = 31 * result + activeLogCreationTime.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
new file mode 100644
index 0000000..1e12fb7
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+public class LogSpoolerException extends RuntimeException {
+  public LogSpoolerException(String message, Exception cause) {
+    super(message, cause);
+  }
+
+  public LogSpoolerException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
new file mode 100644
index 0000000..8279645
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+/**
+ * An interface that is used to determine whether a rollover of a locally spooled log file should be triggered.
+ */
+public interface RolloverCondition {
+
+  /**
+   * Check if the active spool file should be rolled over.
+   *
+   * If this returns true, the {@link LogSpooler} will initiate activities related
+   * to rollover of the file
+   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
+   *                                                       for rollover.
+   * @return true if active spool file should be rolled over, false otherwise
+   */
+  boolean shouldRollover(LogSpoolerContext currentSpoolerContext);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
new file mode 100644
index 0000000..11308e4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import java.io.File;
+
+/**
+ * An interface that is used to trigger the handling of a rolled over file.
+ *
+ * Implementations of this interface will typically upload the rolled over file to
+ * a target destination, like HDFS.
+ */
+public interface RolloverHandler {
+  /**
+   * Handle a rolled over file.
+   *
+   * This method is called inline from the {@link LogSpooler#rollover()} method.
+   * Hence implementations should either complete the handling fast, or do so
+   * asynchronously. The cleanup of the file is left to implementors, but should
+   * typically be done once the upload the file to the target destination is complete.
+   * @param rolloverFile The file that has been rolled over.
+   */
+  void handleRollover(File rolloverFile);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
new file mode 100644
index 0000000..7d9d78a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import org.easymock.EasyMockRule;
+import org.easymock.LogicalOperator;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Comparator;
+
+import static org.easymock.EasyMock.*;
+
+public class LogSpoolerTest {
+
+  @Rule
+  public TemporaryFolder testFolder = new TemporaryFolder();
+
+  @Rule
+  public EasyMockRule mocks = new EasyMockRule(this);
+
+  private String spoolDirectory;
+  private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log";
+  private static final String FILE_SUFFIX = "currentFile";
+
+  @Mock
+  private RolloverCondition rolloverCondition;
+
+  @Mock
+  private RolloverHandler rolloverHandler;
+
+  @Before
+  public void setup() {
+    spoolDirectory = testFolder.getRoot().getAbsolutePath();
+  }
+
+  @Test
+  public void shouldSpoolEventToFile() {
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+
+    final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
+        andReturn(false);
+
+    replay(spoolWriter, rolloverCondition);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(spoolWriter);
+  }
+
+  @Test
+  public void shouldIncrementSpooledEventsCount() {
+
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+
+    final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    logSpoolerContext.logEventSpooled();
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))).
+        andReturn(false);
+
+    replay(spoolWriter, rolloverCondition);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(rolloverCondition);
+  }
+
+  @Test
+  public void shouldCloseCurrentSpoolFileOnRollOver() {
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+    spoolWriter.flush();
+    spoolWriter.close();
+
+    File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
+        andReturn(true);
+    rolloverHandler.handleRollover(spoolFile);
+
+    replay(spoolWriter, rolloverCondition, rolloverHandler);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(spoolWriter);
+  }
+
+  @Test
+  public void shouldReinitializeFileOnRollover() {
+    final PrintWriter spoolWriter1 = mock(PrintWriter.class);
+    final PrintWriter spoolWriter2 = mock(PrintWriter.class);
+    spoolWriter1.println("log event1");
+    spoolWriter2.println("log event2");
+    spoolWriter1.flush();
+    spoolWriter1.close();
+
+    File spoolFile1 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1");
+    File spoolFile2 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2");
+
+    LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(spoolFile1);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(true);
+
+    LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(spoolFile2);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(false);
+
+    rolloverHandler.handleRollover(spoolFile1);
+
+    replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+      private boolean wasRolledOver;
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        if (!wasRolledOver) {
+          wasRolledOver = true;
+          return spoolWriter1;
+        } else {
+          return spoolWriter2;
+        }
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        if (!wasRolledOver) {
+          return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1";
+        } else {
+          return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2";
+        }
+      }
+    };
+    logSpooler.add("log event1");
+    logSpooler.add("log event2");
+
+    verify(spoolWriter1, spoolWriter2, rolloverCondition);
+  }
+
+  @Test
+  public void shouldCallRolloverHandlerOnRollover() {
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+    spoolWriter.flush();
+    spoolWriter.close();
+
+    File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(true);
+    rolloverHandler.handleRollover(spoolFile);
+
+    replay(spoolWriter, rolloverCondition, rolloverHandler);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(rolloverHandler);
+  }
+
+  class LogSpoolerFileComparator implements Comparator<LogSpoolerContext> {
+    @Override
+    public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
+      return o1.getActiveSpoolFile().compareTo(o2.getActiveSpoolFile());
+    }
+  }
+
+  class LogSpoolerEventCountComparator implements Comparator<LogSpoolerContext> {
+    @Override
+    public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
+      return (int)(o1.getNumEventsSpooled()-o2.getNumEventsSpooled());
+    }
+  }
+
+}


[3/3] ambari git commit: AMBARI-17785. Provide support for S3 as a first class destination for log events (Hemanth Yamijala via oleewere)

Posted by ol...@apache.org.
AMBARI-17785. Provide support for S3 as a first class destination for log events (Hemanth Yamijala via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/26e5fe0a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/26e5fe0a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/26e5fe0a

Branch: refs/heads/branch-2.5
Commit: 26e5fe0a9d63cadc9e19be0bac17a507224d0060
Parents: 23bd337
Author: oleewere <ol...@gmail.com>
Authored: Tue Aug 9 15:08:13 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Sat Sep 10 21:27:45 2016 +0200

----------------------------------------------------------------------
 .../ambari/logfeeder/input/InputMarker.java     |   1 -
 .../ambari/logfeeder/output/OutputS3File.java   | 227 ++++++++++++-------
 .../logfeeder/output/S3LogPathResolver.java     |  54 +++++
 .../logfeeder/output/S3OutputConfiguration.java | 114 ++++++++++
 .../ambari/logfeeder/output/S3Uploader.java     | 163 +++++++++++++
 .../logfeeder/output/spool/LogSpooler.java      |  91 +++++++-
 .../org/apache/ambari/logfeeder/s3/S3Util.java  |   8 +-
 .../logfeeder/output/OutputS3FileTest.java      | 198 ++++++++++++++++
 .../logfeeder/output/S3LogPathResolverTest.java |  51 +++++
 .../ambari/logfeeder/output/S3UploaderTest.java | 164 ++++++++++++++
 .../logfeeder/output/spool/LogSpoolerTest.java  | 182 ++++++++++++---
 11 files changed, 1123 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
index 8def4b9..48a7f1d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
@@ -32,5 +32,4 @@ public class InputMarker {
     return "InputMarker [lineNumber=" + lineNumber + ", input="
       + input.getShortDescription() + "]";
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index f42195c..cbc1045 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -18,83 +18,95 @@
  */
 package org.apache.ambari.logfeeder.output;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.ambari.logfeeder.LogFeeder;
 import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
+import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
+import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
 import org.apache.ambari.logfeeder.s3.S3Util;
-import org.apache.ambari.logfeeder.util.CompressionUtil;
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.*;
+import java.util.Map.Entry;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 
 /**
- * Write log file into s3 bucket
+ * Write log file into s3 bucket.
+ *
+ * This class supports two modes of upload:
+ * <ul>
+ * <li>A one time upload of files matching a pattern</li>
+ * <li>A batch mode, asynchronous, periodic upload of files</li>
+ * </ul>
  */
-public class OutputS3File extends Output {
+public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
+
+  public static final String INPUT_ATTRIBUTE_TYPE = "type";
+  public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
+  static private Logger logger = Logger.getLogger(OutputS3File.class);
+
+  private LogSpooler logSpooler;
+  private S3OutputConfiguration s3OutputConfiguration;
+  private S3Uploader s3Uploader;
 
+  @Override
+  public void init() throws Exception {
+    super.init();
+    s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
+  }
 
   private static boolean uploadedGlobalConfig = false;
 
+  /**
+   * Copy local log files and corresponding config to S3 bucket one time.
+   * @param inputFile The file to be copied
+   * @param inputMarker Contains information about the configuration to be uploaded.
+   */
   @Override
   public void copyFile(File inputFile, InputMarker inputMarker) {
-    String bucketName = getStringValue("s3_bucket");
-    String s3LogDir = getStringValue("s3_log_dir");
-    HashMap<String, String> contextParam = buildContextParam();
-    s3LogDir = PlaceholderUtil.replaceVariables(s3LogDir, contextParam);
-    String s3AccessKey = getStringValue("s3_access_key");
-    String s3SecretKey = getStringValue("s3_secret_key");
-    String compressionAlgo = getStringValue("compression_algo");
-    String fileName = inputFile.getName();
-    // create tmp compressed File
-    String tmpDir = LogFeederUtil.getLogfeederTempDir();
-    File outputFile = new File(tmpDir + fileName + "_"
-        + new Date().getTime() + "." + compressionAlgo);
-    outputFile = CompressionUtil.compressFile(inputFile, outputFile,
-        compressionAlgo);
-    String type = inputMarker.input.getStringValue("type");
-    String s3Path = s3LogDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + type
-        + S3Util.INSTANCE.S3_PATH_SEPARATOR + fileName + "."
-        + compressionAlgo;
-    S3Util.INSTANCE.uploadFileTos3(bucketName, s3Path, outputFile, s3AccessKey,
-        s3SecretKey);
-    // delete local compressed file
-    outputFile.deleteOnExit();
-    ArrayList<Map<String, Object>> filters = new ArrayList<Map<String, Object>>();
+    String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE);
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration,
+        S3Util.INSTANCE, false, type);
+    String resolvedPath = s3Uploader.uploadFile(inputFile,
+        inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+
+    uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
+  }
+
+  private void uploadConfig(InputMarker inputMarker, String type,
+                            S3OutputConfiguration s3OutputConfiguration, String resolvedPath) {
+
+    ArrayList<Map<String, Object>> filters = new ArrayList<>();
     addFilters(filters, inputMarker.input.getFirstFilter());
-    Map<String, Object> inputConfig = new HashMap<String, Object>();
+    Map<String, Object> inputConfig = new HashMap<>();
     inputConfig.putAll(inputMarker.input.getConfigs());
-    String s3CompletePath = S3Util.INSTANCE.S3_PATH_START_WITH + bucketName
-        + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Path;
+    String s3CompletePath = S3Util.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName()
+        + S3Util.S3_PATH_SEPARATOR + resolvedPath;
     inputConfig.put("path", s3CompletePath);
 
-    ArrayList<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
+    ArrayList<Map<String, Object>> inputConfigList = new ArrayList<>();
     inputConfigList.add(inputConfig);
     // set source s3_file
     // remove global config from filter config
     removeGlobalConfig(inputConfigList);
     removeGlobalConfig(filters);
     // write config into s3 file
-    String s3Key = getComponentConfigFileName(type);
-    Map<String, Object> config = new HashMap<String, Object>();
+    Map<String, Object> config = new HashMap<>();
     config.put("filter", filters);
     config.put("input", inputConfigList);
-    writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, contextParam,
-        s3Key);
+    writeConfigToS3(config, getComponentConfigFileName(type), s3OutputConfiguration);
     // write global config
-    writeGlobalConfig();
+    writeGlobalConfig(s3OutputConfiguration);
   }
 
-  public void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
+  private void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
     if (filter != null) {
       Map<String, Object> filterConfig = new HashMap<String, Object>();
       filterConfig.putAll(filter.getConfigs());
@@ -105,38 +117,28 @@ public class OutputS3File extends Output {
     }
   }
 
-  public void writeConfigToS3(Map<String, Object> config, String bucketName,
-      String accessKey, String secretKey, HashMap<String, String> contextParam,
-      String s3Key) {
-    String s3ConfigDir = getStringValue("s3_config_dir");
-    s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam);
+  private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix,
+                              S3OutputConfiguration s3OutputConfiguration) {
     Gson gson = new GsonBuilder().setPrettyPrinting().create();
-    String configJson = gson.toJson(config);
+    String configJson = gson.toJson(configToWrite);
 
-    s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key;
-    S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey,
-        secretKey);
-  }
+    String s3ResolvedKey = new S3LogPathResolver().
+        getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster());
 
-  public String getComponentConfigFileName(String componentName) {
-    String fileName = "input.config-" + componentName + ".json";
-    return fileName;
+    S3Util.INSTANCE.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(),
+        s3ResolvedKey, s3OutputConfiguration.getS3AccessKey(),
+        s3OutputConfiguration.getS3SecretKey());
   }
 
-  public HashMap<String, String> buildContextParam() {
-    HashMap<String, String> contextParam = new HashMap<String, String>();
-    contextParam.put("host", LogFeederUtil.hostName);
-    contextParam.put("ip", LogFeederUtil.ipAddress);
-    String cluster = getNVList("add_fields").get("cluster");
-    contextParam.put("cluster", cluster);
-    return contextParam;
+  private String getComponentConfigFileName(String componentName) {
+    return "input.config-" + componentName + ".json";
   }
 
-  
+
   private Map<String, Object> getGlobalConfig() {
     Map<String, Object> globalConfig = LogFeeder.globalMap;
     if (globalConfig == null) {
-      globalConfig = new HashMap<String, Object>();
+      globalConfig = new HashMap<>();
     }
     return globalConfig;
   }
@@ -163,7 +165,7 @@ public class OutputS3File extends Output {
    * write global config in s3 file Invoke only once
    */
   @SuppressWarnings("unchecked")
-  private synchronized void writeGlobalConfig() {
+  private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) {
     if (!uploadedGlobalConfig) {
       Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig());
       //updating global config before write to s3
@@ -174,7 +176,7 @@ public class OutputS3File extends Output {
       Map<String, Object> addFields = (Map<String, Object>) globalConfig
           .get("add_fields");
       if (addFields == null) {
-        addFields = new HashMap<String, Object>();
+        addFields = new HashMap<>();
       }
       addFields.put("ip", LogFeederUtil.ipAddress);
       addFields.put("host", LogFeederUtil.hostName);
@@ -189,20 +191,85 @@ public class OutputS3File extends Output {
       globalConfig.put("add_fields", addFields);
       Map<String, Object> config = new HashMap<String, Object>();
       config.put("global", globalConfig);
-      String s3AccessKey = getStringValue("s3_access_key");
-      String s3SecretKey = getStringValue("s3_secret_key");
-      String bucketName = getStringValue("s3_bucket");
-      String s3Key = "global.config.json";
-      HashMap<String, String> contextParam = buildContextParam();
-      writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey,
-          contextParam, s3Key);
+      writeConfigToS3(config, GLOBAL_CONFIG_S3_PATH_SUFFIX, s3OutputConfiguration);
       uploadedGlobalConfig = true;
     }
   }
 
+  /**
+   * Write a log line to local file, to upload to S3 bucket asynchronously.
+   *
+   * This method uses a {@link LogSpooler} to spool the log lines to a local file.
+
+   * @param block The log event to upload
+   * @param inputMarker Contains information about the log file feeding the lines.
+   * @throws Exception
+   */
   @Override
   public void write(String block, InputMarker inputMarker) throws Exception {
-    throw new UnsupportedOperationException(
-        "write method is not yet supported for output=s3_file");
+    if (logSpooler == null) {
+      logSpooler = createSpooler(inputMarker.input.getFilePath());
+      s3Uploader = createUploader(inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+    }
+    logSpooler.add(block);
+  }
+
+  @VisibleForTesting
+  protected S3Uploader createUploader(String logType) {
+    S3Uploader uploader = new S3Uploader(s3OutputConfiguration, S3Util.INSTANCE, true, logType);
+    uploader.startUploaderThread();
+    return uploader;
+  }
+
+  @VisibleForTesting
+  protected LogSpooler createSpooler(String filePath) {
+    String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service";
+    logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s",
+        spoolDirectory, filePath));
+    return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
+        s3OutputConfiguration.getRolloverTimeThresholdSecs());
+  }
+
+  /**
+   * Check whether the locally spooled file should be rolled over, based on file size.
+   *
+   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
+   *                                                       for rollover.
+   * @return true if sufficient size has been reached based on {@link S3OutputConfiguration#getRolloverSizeThresholdBytes()},
+   *              false otherwise
+   */
+  @Override
+  public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
+    File spoolFile = currentSpoolerContext.getActiveSpoolFile();
+    long currentSize = spoolFile.length();
+    boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
+    if (result) {
+      logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
+          s3OutputConfiguration.getRolloverSizeThresholdBytes()));
+    }
+    return result;
+  }
+
+  /**
+   * Stops dependent objects that consume resources.
+   */
+  @Override
+  public void close() {
+    if (s3Uploader != null) {
+      s3Uploader.stopUploaderThread();
+    }
+    if (logSpooler != null) {
+      logSpooler.close();
+    }
+  }
+
+  /**
+   * Adds the locally spooled file to the {@link S3Uploader} to be uploaded asynchronously.
+   *
+   * @param rolloverFile The file that has been rolled over.
+   */
+  @Override
+  public void handleRollover(File rolloverFile) {
+    s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath());
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
new file mode 100644
index 0000000..1bbf33e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+
+import java.util.HashMap;
+
+/**
+ * A utility class that resolves variables like hostname, IP address and cluster name in S3 paths.
+ */
+public class S3LogPathResolver {
+
+  /**
+   * Construct a full S3 path by resolving variables in the path name including hostname, IP address
+   * and cluster name
+   * @param baseKeyPrefix The prefix which can contain the variables.
+   * @param keySuffix The suffix appended to the prefix after variable expansion
+   * @param cluster The name of the cluster
+   * @return full S3 path.
+   */
+  public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) {
+    HashMap<String, String> contextParam = buildContextParam(cluster);
+    String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam);
+    return resolvedKeyPrefix + S3Util.S3_PATH_SEPARATOR + keySuffix;
+  }
+
+  private HashMap<String, String> buildContextParam(String cluster) {
+    HashMap<String, String> contextParam = new HashMap<>();
+    contextParam.put("host", LogFeederUtil.hostName);
+    contextParam.put("ip", LogFeederUtil.ipAddress);
+    contextParam.put("cluster", cluster);
+    return contextParam;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
new file mode 100644
index 0000000..fb597d3
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.ConfigBlock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Holds all configuration relevant for S3 upload.
+ */
+public class S3OutputConfiguration {
+
+  public static final String SPOOL_DIR_KEY = "spool_dir";
+  public static final String ROLLOVER_SIZE_THRESHOLD_BYTES_KEY = "rollover_size_threshold_bytes";
+  public static final Long DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES = 10 * 1024 * 1024L;
+  public static final String ROLLOVER_TIME_THRESHOLD_SECS_KEY = "rollover_time_threshold_secs";
+  public static final Long DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS = 3600L;
+  public static final String S3_BUCKET_NAME_KEY = "s3_bucket";
+  public static final String S3_LOG_DIR_KEY = "s3_log_dir";
+  public static final String S3_ACCESS_KEY = "s3_access_key";
+  public static final String S3_SECRET_KEY = "s3_secret_key";
+  public static final String COMPRESSION_ALGO_KEY = "compression_algo";
+  public static final String ADDITIONAL_FIELDS_KEY = "add_fields";
+  public static final String CLUSTER_KEY = "cluster";
+
+  private Map<String, Object> configs;
+
+  S3OutputConfiguration(Map<String, Object> configs) {
+    this.configs = configs;
+  }
+
+  public String getS3BucketName() {
+    return (String) configs.get(S3_BUCKET_NAME_KEY);
+  }
+
+  public String getS3Path() {
+    return (String) configs.get(S3_LOG_DIR_KEY);
+  }
+
+  public String getS3AccessKey() {
+    return (String) configs.get(S3_ACCESS_KEY);
+  }
+
+  public String getS3SecretKey() {
+    return (String) configs.get(S3_SECRET_KEY);
+  }
+
+  public String getCompressionAlgo() {
+    return (String) configs.get(COMPRESSION_ALGO_KEY);
+  }
+
+  public Long getRolloverSizeThresholdBytes() {
+    return (Long) configs.get(ROLLOVER_SIZE_THRESHOLD_BYTES_KEY);
+  }
+
+  public Long getRolloverTimeThresholdSecs() {
+    return (Long) configs.get(ROLLOVER_TIME_THRESHOLD_SECS_KEY);
+  }
+
+  @SuppressWarnings("unchecked")
+  public String getCluster() {
+    return ((Map<String, String>) configs.get(ADDITIONAL_FIELDS_KEY)).get(CLUSTER_KEY);
+  }
+
+  public static S3OutputConfiguration fromConfigBlock(ConfigBlock configBlock) {
+    Map<String, Object> configs = new HashMap<>();
+    String[] stringValuedKeysToCopy = new String[] {
+        SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY,
+        S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY
+    };
+
+    for (String key : stringValuedKeysToCopy) {
+      String value = configBlock.getStringValue(key);
+      if (value != null) {
+        configs.put(key, value);
+      }
+    }
+
+    String[] longValuedKeysToCopy = new String[] {
+        ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, ROLLOVER_TIME_THRESHOLD_SECS_KEY
+    };
+
+    Long[] defaultValuesForLongValuedKeys = new Long[] {
+        DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES, DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS
+    };
+
+    for (int i = 0; i < longValuedKeysToCopy.length; i++) {
+      configs.put(longValuedKeysToCopy[i],
+          configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
+    }
+
+    configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY));
+
+    return new S3OutputConfiguration(configs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
new file mode 100644
index 0000000..dec685f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.CompressionUtil;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.util.Date;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A class that handles the uploading of files to S3.
+ *
+ * This class can be used to upload a file one time, or start a daemon thread that can
+ * be used to upload files added to a queue one after the other. When used to upload
+ * files via a queue, one instance of this class is created for each file handled in
+ * {@link org.apache.ambari.logfeeder.input.InputFile}.
+ */
+public class S3Uploader implements Runnable {
+  public static final String POISON_PILL = "POISON-PILL";
+  private static Logger logger = Logger.getLogger(S3Uploader.class);
+
+  private final S3OutputConfiguration s3OutputConfiguration;
+  private final S3Util s3UtilInstance;
+  private final boolean deleteOnEnd;
+  private String logType;
+  private final BlockingQueue<String> fileContextsToUpload;
+  private AtomicBoolean stopRunningThread = new AtomicBoolean(false);
+
+  public S3Uploader(S3OutputConfiguration s3OutputConfiguration, S3Util s3UtilInstance, boolean deleteOnEnd,
+                    String logType) {
+    this.s3OutputConfiguration = s3OutputConfiguration;
+    this.s3UtilInstance = s3UtilInstance;
+    this.deleteOnEnd = deleteOnEnd;
+    this.logType = logType;
+    this.fileContextsToUpload = new LinkedBlockingQueue<>();
+  }
+
+  /**
+   * Starts a thread that can be used to upload files from a queue.
+   *
+   * Add files to be uploaded using the method {@link #addFileForUpload(String)}.
+   * If this thread is started, it must be stopped using the method {@link #stopUploaderThread()}.
+   */
+  void startUploaderThread() {
+    Thread s3UploaderThread = new Thread(this, "s3-uploader-thread-"+logType);
+    s3UploaderThread.setDaemon(true);
+    s3UploaderThread.start();
+  }
+
+  /**
+   * Stops the thread used to upload files from a queue.
+   *
+   * This method must be called to cleanly free up resources, typically on shutdown of the process.
+   * Note that this method does not drain any remaining files, and instead stops the thread
+   * as soon as any file being currently uploaded is complete.
+   */
+  void stopUploaderThread() {
+    stopRunningThread.set(true);
+    boolean offerStatus = fileContextsToUpload.offer(POISON_PILL);
+    if (!offerStatus) {
+      logger.warn("Could not add poison pill to interrupt uploader thread.");
+    }
+  }
+
+  /**
+   * Add a file to a queue to upload asynchronously.
+   * @param fileToUpload Full path to the local file which must be uploaded.
+   */
+  void addFileForUpload(String fileToUpload) {
+    boolean offerStatus = fileContextsToUpload.offer(fileToUpload);
+    if (!offerStatus) {
+      logger.error("Could not add file " + fileToUpload + " for upload.");
+    }
+  }
+
+  @Override
+  public void run() {
+    while (!stopRunningThread.get()) {
+      try {
+        String fileNameToUpload = fileContextsToUpload.take();
+        if (POISON_PILL.equals(fileNameToUpload)) {
+          logger.warn("Found poison pill while waiting for files to upload, exiting");
+          return;
+        }
+        uploadFile(new File(fileNameToUpload), logType);
+      } catch (InterruptedException e) {
+        logger.error("Interrupted while waiting for elements from fileContextsToUpload", e);
+        return;
+      }
+    }
+  }
+
+  /**
+   * Upload the given file to S3.
+   *
+   * The file which should be available locally, is first compressed using the compression
+   * method specified by {@link S3OutputConfiguration#getCompressionAlgo()}. This compressed
+   * file is what is uploaded to S3.
+   * @param fileToUpload the file to upload
+   * @param logType the name of the log which is used in the S3 path constructed.
+   * @return
+   */
+  String uploadFile(File fileToUpload, String logType) {
+    String bucketName = s3OutputConfiguration.getS3BucketName();
+    String s3AccessKey = s3OutputConfiguration.getS3AccessKey();
+    String s3SecretKey = s3OutputConfiguration.getS3SecretKey();
+    String compressionAlgo = s3OutputConfiguration.getCompressionAlgo();
+
+    String keySuffix = fileToUpload.getName() + "." + compressionAlgo;
+    String s3Path = new S3LogPathResolver().
+        getResolvedPath(s3OutputConfiguration.getS3Path()+S3Util.S3_PATH_SEPARATOR+logType,
+            keySuffix, s3OutputConfiguration.getCluster());
+    logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s",
+        s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
+    File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo);
+
+    logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
+    s3UtilInstance.uploadFileTos3(bucketName, s3Path, sourceFile, s3AccessKey,
+        s3SecretKey);
+
+    // delete local compressed file
+    sourceFile.delete();
+    if (deleteOnEnd) {
+      logger.info("Deleting input file as required");
+      if (!fileToUpload.delete()) {
+        logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
+      }
+    }
+    return s3Path;
+  }
+
+  @VisibleForTesting
+  protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+    File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_"
+        + new Date().getTime() + "." + compressionAlgo);
+    outputFile = CompressionUtil.compressFile(fileToUpload, outputFile,
+        compressionAlgo);
+    return outputFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
index 306326a..fb263ba 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -25,6 +25,9 @@ import org.apache.log4j.Logger;
 
 import java.io.*;
 import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A class that manages local storage of log events before they are uploaded to the output destinations.
@@ -36,6 +39,7 @@ import java.util.Date;
  * {@link RolloverHandler} to trigger the handling of the rolled over file.
  */
 public class LogSpooler {
+  public static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
   static private Logger logger = Logger.getLogger(LogSpooler.class);
   static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
 
@@ -46,6 +50,8 @@ public class LogSpooler {
   private PrintWriter currentSpoolBufferedWriter;
   private File currentSpoolFile;
   private LogSpoolerContext currentSpoolerContext;
+  private Timer rolloverTimer;
+  private AtomicBoolean rolloverInProgress = new AtomicBoolean(false);
 
   /**
    * Create an instance of the LogSpooler.
@@ -59,11 +65,34 @@ public class LogSpooler {
    */
   public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
                     RolloverHandler rolloverHandler) {
+    this(spoolDirectory, sourceFileNamePrefix, rolloverCondition, rolloverHandler,
+        TIME_BASED_ROLLOVER_DISABLED_THRESHOLD);
+  }
+
+  /**
+   * Create an instance of the LogSpooler.
+   * @param spoolDirectory The directory under which spooler files are created.
+   *                       Should be unique per instance of {@link Output}
+   * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
+   * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
+   *                          determine when to rollover.
+   * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
+   *                        there should be a rollover.
+   * @param rolloverTimeThresholdSecs  Setting a non-zero value enables time based rollover of
+   *                                   spool files. Sending a 0 value disables this functionality.
+   */
+  public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
+                    RolloverHandler rolloverHandler, long rolloverTimeThresholdSecs) {
     this.spoolDirectory = spoolDirectory;
     this.sourceFileNamePrefix = sourceFileNamePrefix;
     this.rolloverCondition = rolloverCondition;
     this.rolloverHandler = rolloverHandler;
-    initializeSpoolFile();
+    if (rolloverTimeThresholdSecs != TIME_BASED_ROLLOVER_DISABLED_THRESHOLD) {
+      rolloverTimer = new Timer("log-spooler-timer-" + sourceFileNamePrefix, true);
+      rolloverTimer.scheduleAtFixedRate(new LogSpoolerRolloverTimerTask(),
+          rolloverTimeThresholdSecs*1000, rolloverTimeThresholdSecs*1000);
+    }
+    initializeSpoolState();
   }
 
   private void initializeSpoolDirectory() {
@@ -77,9 +106,9 @@ public class LogSpooler {
     }
   }
 
-  private void initializeSpoolFile() {
+  private void initializeSpoolState() {
     initializeSpoolDirectory();
-    currentSpoolFile = new File(spoolDirectory, getCurrentFileName());
+    currentSpoolFile = initializeSpoolFile();
     try {
       currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile);
     } catch (IOException e) {
@@ -87,7 +116,12 @@ public class LogSpooler {
           + ", error message: " + e.getLocalizedMessage(), e);
     }
     currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
-    logger.info("Initialized spool file at path: " + currentSpoolFile.getAbsolutePath());
+    logger.info("Initialized spool file at path: " + currentSpoolFile);
+  }
+
+  @VisibleForTesting
+  protected File initializeSpoolFile() {
+    return new File(spoolDirectory, getCurrentFileName());
   }
 
   @VisibleForTesting
@@ -103,11 +137,12 @@ public class LogSpooler {
    * it is ready to rollover the file.
    * @param logEvent The log event to spool.
    */
-  public void add(String logEvent) {
+  public synchronized void add(String logEvent) {
     currentSpoolBufferedWriter.println(logEvent);
     currentSpoolerContext.logEventSpooled();
     if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
-      rollover();
+      logger.info("Trying to rollover based on rollover condition");
+      tryRollover();
     }
   }
 
@@ -121,17 +156,49 @@ public class LogSpooler {
   public void rollover() {
     logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
     currentSpoolBufferedWriter.flush();
-    currentSpoolBufferedWriter.close();
-    rolloverHandler.handleRollover(currentSpoolFile);
-    logger.info("Invoked rollover handler with file: " + currentSpoolFile);
-    initializeSpoolFile();
+    if (currentSpoolFile.length()==0) {
+      logger.info("No data in file " + currentSpoolFile + ", not doing rollover");
+    } else {
+      currentSpoolBufferedWriter.close();
+      rolloverHandler.handleRollover(currentSpoolFile);
+      logger.info("Invoked rollover handler with file: " + currentSpoolFile);
+      initializeSpoolState();
+    }
+    boolean status = rolloverInProgress.compareAndSet(true, false);
+    if (!status) {
+      logger.error("Should have reset rollover flag!!");
+    }
   }
 
-  @VisibleForTesting
-  protected String getCurrentFileName() {
+  private synchronized void tryRollover() {
+    if (rolloverInProgress.compareAndSet(false, true)) {
+      rollover();
+    } else {
+      logger.warn("Ignoring rollover call as rollover already in progress for file " +
+          currentSpoolFile);
+    }
+  }
+
+  private String getCurrentFileName() {
     Date currentDate = new Date();
     String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
     return sourceFileNamePrefix + dateStr;
   }
 
+  /**
+   * Cancel's any time based rollover task, if started.
+   */
+  public void close() {
+    if (rolloverTimer != null) {
+      rolloverTimer.cancel();
+    }
+  }
+
+  private class LogSpoolerRolloverTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      logger.info("Trying rollover based on time");
+      tryRollover();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
index ced2b5c..db187be 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
@@ -44,13 +44,13 @@ import com.amazonaws.services.s3.transfer.Upload;
 /**
  * Utility to connect to s3
  */
-public enum S3Util {
-  INSTANCE;
+public class S3Util {
+  public static final S3Util INSTANCE = new S3Util();
 
   private static final Logger LOG = Logger.getLogger(S3Util.class);
 
-  public final String S3_PATH_START_WITH = "s3://";
-  public final String S3_PATH_SEPARATOR = "/";
+  public static final String S3_PATH_START_WITH = "s3://";
+  public static final String S3_PATH_SEPARATOR = "/";
 
   public AmazonS3 getS3Client(String accessKey, String secretKey) {
     AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
new file mode 100644
index 0000000..20a4f1f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class OutputS3FileTest {
+
+  private Map<String, Object> configMap;
+
+  @Before
+  public void setupConfiguration() {
+    configMap = new HashMap<>();
+    String[] configKeys = new String[] {
+        S3OutputConfiguration.SPOOL_DIR_KEY,
+        S3OutputConfiguration.S3_BUCKET_NAME_KEY,
+        S3OutputConfiguration.S3_LOG_DIR_KEY,
+        S3OutputConfiguration.S3_ACCESS_KEY,
+        S3OutputConfiguration.S3_SECRET_KEY,
+        S3OutputConfiguration.COMPRESSION_ALGO_KEY,
+        S3OutputConfiguration.ADDITIONAL_FIELDS_KEY
+    };
+    Map<String, String> additionalKeys = new HashMap<>();
+    additionalKeys.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
+    Object[] configValues = new Object[] {
+        "/var/ambari-logsearch/logfeeder",
+        "s3_bucket_name",
+        "logs",
+        "ABCDEFGHIJ1234",
+        "amdfbldkfdlf",
+        "gz",
+        additionalKeys
+    };
+    for (int i = 0; i < configKeys.length; i++) {
+      configMap.put(configKeys[i], configValues[i]);
+    }
+  }
+
+  @Test
+  public void shouldSpoolLogEventToNewSpooler() throws Exception {
+
+    InputMarker inputMarker = mock(InputMarker.class);
+    Input input = mock(Input.class);
+    inputMarker.input = input;
+    expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
+    expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+    final LogSpooler spooler = mock(LogSpooler.class);
+    spooler.add("log event block");
+    final S3Uploader s3Uploader = mock(S3Uploader.class);
+    replay(input, inputMarker, spooler, s3Uploader);
+
+    OutputS3File outputS3File = new OutputS3File() {
+      @Override
+      protected LogSpooler createSpooler(String filePath) {
+        return spooler;
+      }
+
+      @Override
+      protected S3Uploader createUploader(String logType) {
+        return s3Uploader;
+      }
+    };
+    outputS3File.loadConfig(configMap);
+    outputS3File.init();
+    outputS3File.write("log event block", inputMarker);
+    verify(spooler);
+  }
+
+  @Test
+  public void shouldReuseSpoolerForSamePath() throws Exception {
+    InputMarker inputMarker = mock(InputMarker.class);
+    Input input = mock(Input.class);
+    inputMarker.input = input;
+    expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
+    expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+    final LogSpooler spooler = mock(LogSpooler.class);
+    spooler.add("log event block1");
+    spooler.add("log event block2");
+    final S3Uploader s3Uploader = mock(S3Uploader.class);
+    replay(input, inputMarker, spooler, s3Uploader);
+
+    OutputS3File outputS3File = new OutputS3File() {
+      private boolean firstCallComplete;
+      @Override
+      protected LogSpooler createSpooler(String filePath) {
+        if (!firstCallComplete) {
+          firstCallComplete = true;
+          return spooler;
+        }
+        throw new IllegalStateException("Shouldn't call createSpooler for same path.");
+      }
+
+      @Override
+      protected S3Uploader createUploader(String logType) {
+        return s3Uploader;
+      }
+    };
+    outputS3File.loadConfig(configMap);
+    outputS3File.init();
+    outputS3File.write("log event block1", inputMarker);
+    outputS3File.write("log event block2", inputMarker);
+    verify(spooler);
+  }
+
+  @Test
+  public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
+
+    String thresholdSize = Long.toString(15 * 1024 * 1024L);
+    LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
+    File activeSpoolFile = mock(File.class);
+    expect(activeSpoolFile.length()).andReturn(20*1024*1024L);
+    expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
+    replay(logSpoolerContext, activeSpoolFile);
+
+    OutputS3File outputS3File = new OutputS3File();
+    configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
+    outputS3File.loadConfig(configMap);
+    outputS3File.init();
+
+    assertTrue(outputS3File.shouldRollover(logSpoolerContext));
+  }
+
+  @Test
+  public void shouldNotRolloverBeforeSufficientSizeIsReached() throws Exception {
+    String thresholdSize = Long.toString(15 * 1024 * 1024L);
+    LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
+    File activeSpoolFile = mock(File.class);
+    expect(activeSpoolFile.length()).andReturn(10*1024*1024L);
+    expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
+    replay(logSpoolerContext, activeSpoolFile);
+
+    OutputS3File outputS3File = new OutputS3File();
+    configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
+    outputS3File.loadConfig(configMap);
+    outputS3File.init();
+
+    assertFalse(outputS3File.shouldRollover(logSpoolerContext));
+  }
+
+  @Test
+  public void shouldUploadFileOnRollover() throws Exception {
+    InputMarker inputMarker = mock(InputMarker.class);
+    Input input = mock(Input.class);
+    inputMarker.input = input;
+    expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
+    expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode");
+    final LogSpooler spooler = mock(LogSpooler.class);
+    spooler.add("log event block1");
+    final S3Uploader s3Uploader = mock(S3Uploader.class);
+    s3Uploader.addFileForUpload("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz");
+    replay(input, inputMarker, spooler, s3Uploader);
+
+    OutputS3File outputS3File = new OutputS3File() {
+      @Override
+      protected LogSpooler createSpooler(String filePath) {
+        return spooler;
+      }
+      @Override
+      protected S3Uploader createUploader(String logType) {
+        return s3Uploader;
+      }
+    };
+    outputS3File.write("log event block1", inputMarker);
+    outputS3File.handleRollover(new File("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz"));
+
+    verify(s3Uploader);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
new file mode 100644
index 0000000..49cee56
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class S3LogPathResolverTest {
+
+  @Test
+  public void shouldResolveHostName() {
+    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$host", "filename.log", "cl1");
+    assertEquals("my_s3_path/" + LogFeederUtil.hostName + "/filename.log", resolvedPath);
+  }
+
+  @Test
+  public void shouldResolveIpAddress() {
+    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$ip", "filename.log", "cl1");
+    assertEquals("my_s3_path/" + LogFeederUtil.ipAddress + "/filename.log", resolvedPath);
+  }
+
+  @Test
+  public void shouldResolveCluster() {
+    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster", "filename.log", "cl1");
+    assertEquals("my_s3_path/cl1/filename.log", resolvedPath);
+  }
+
+  @Test
+  public void shouldResolveCombinations() {
+    String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster/$host", "filename.log", "cl1");
+    assertEquals("my_s3_path/cl1/"+ LogFeederUtil.hostName + "/filename.log", resolvedPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
new file mode 100644
index 0000000..a0c398e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+
+public class S3UploaderTest {
+
+  public static final String TEST_BUCKET = "test_bucket";
+  public static final String TEST_PATH = "test_path";
+  public static final String GZ = "gz";
+  public static final String LOG_TYPE = "hdfs_namenode";
+  public static final String ACCESS_KEY_VALUE = "accessKeyValue";
+  public static final String SECRET_KEY_VALUE = "secretKeyValue";
+
+  @Test
+  public void shouldUploadToS3ToRightBucket() {
+    File fileToUpload = mock(File.class);
+    String fileName = "hdfs_namenode.log.123343493473948";
+    expect(fileToUpload.getName()).andReturn(fileName);
+    final File compressedFile = mock(File.class);
+    Map<String, Object> configs = setupS3Configs();
+
+    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+    S3Util s3Util = mock(S3Util.class);
+    String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
+    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+    expect(compressedFile.delete()).andReturn(true);
+    expect(fileToUpload.delete()).andReturn(true);
+    replay(fileToUpload, compressedFile, s3Util);
+
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+      @Override
+      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+        return compressedFile;
+      }
+    };
+    String resolvedPath = s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+    verify(s3Util);
+    assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath);
+  }
+
+  @Test
+  public void shouldCleanupLocalFilesOnSuccessfulUpload() {
+    File fileToUpload = mock(File.class);
+    String fileName = "hdfs_namenode.log.123343493473948";
+    expect(fileToUpload.getName()).andReturn(fileName);
+    final File compressedFile = mock(File.class);
+    Map<String, Object> configs = setupS3Configs();
+
+    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+    S3Util s3Util = mock(S3Util.class);
+    String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
+    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+    expect(compressedFile.delete()).andReturn(true);
+    expect(fileToUpload.delete()).andReturn(true);
+    replay(fileToUpload, compressedFile, s3Util);
+
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+      @Override
+      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+        return compressedFile;
+      }
+    };
+    s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+    verify(fileToUpload);
+    verify(compressedFile);
+  }
+
+  @Test
+  public void shouldNotCleanupUncompressedFileIfNotRequired() {
+    File fileToUpload = mock(File.class);
+    String fileName = "hdfs_namenode.log.123343493473948";
+    expect(fileToUpload.getName()).andReturn(fileName);
+    final File compressedFile = mock(File.class);
+    Map<String, Object> configs = setupS3Configs();
+
+    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+    S3Util s3Util = mock(S3Util.class);
+    String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ);
+    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+    expect(compressedFile.delete()).andReturn(true);
+    replay(fileToUpload, compressedFile, s3Util);
+
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, false, LOG_TYPE) {
+      @Override
+      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+        return compressedFile;
+      }
+    };
+    s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+    verify(fileToUpload);
+    verify(compressedFile);
+  }
+
+  @Test
+  public void shouldExpandVariablesInPath() {
+    File fileToUpload = mock(File.class);
+    String fileName = "hdfs_namenode.log.123343493473948";
+    expect(fileToUpload.getName()).andReturn(fileName);
+    final File compressedFile = mock(File.class);
+    Map<String, Object> configs = setupS3Configs();
+    configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, "$cluster/"+TEST_PATH);
+
+
+    S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
+    S3Util s3Util = mock(S3Util.class);
+    String s3Key = String.format("%s/%s/%s/%s.%s", "cl1", TEST_PATH, LOG_TYPE, fileName, GZ);
+    s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE);
+    expect(compressedFile.delete()).andReturn(true);
+    expect(fileToUpload.delete()).andReturn(true);
+    replay(fileToUpload, compressedFile, s3Util);
+
+    S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) {
+      @Override
+      protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
+        return compressedFile;
+      }
+    };
+    s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
+
+    verify(s3Util);
+  }
+
+  private Map<String, Object> setupS3Configs() {
+    Map<String, Object> configs = new HashMap<>();
+    configs.put(S3OutputConfiguration.S3_BUCKET_NAME_KEY, TEST_BUCKET);
+    configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, TEST_PATH);
+    configs.put(S3OutputConfiguration.S3_ACCESS_KEY, ACCESS_KEY_VALUE);
+    configs.put(S3OutputConfiguration.S3_SECRET_KEY, SECRET_KEY_VALUE);
+    configs.put(S3OutputConfiguration.COMPRESSION_ALGO_KEY, GZ);
+    Map<String, String> nameValueMap = new HashMap<>();
+    nameValueMap.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
+    configs.put(S3OutputConfiguration.ADDITIONAL_FIELDS_KEY, nameValueMap);
+    return configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/26e5fe0a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
index 7d9d78a..7a47039 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
@@ -43,7 +43,6 @@ public class LogSpoolerTest {
 
   private String spoolDirectory;
   private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log";
-  private static final String FILE_SUFFIX = "currentFile";
 
   @Mock
   private RolloverCondition rolloverCondition;
@@ -61,13 +60,13 @@ public class LogSpoolerTest {
     final PrintWriter spoolWriter = mock(PrintWriter.class);
     spoolWriter.println("log event");
 
-    final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    final File mockFile = setupInputFileExpectations();
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
     expect(rolloverCondition.shouldRollover(
         cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
         andReturn(false);
 
-    replay(spoolWriter, rolloverCondition);
+    replay(spoolWriter, rolloverCondition, mockFile);
 
     LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
         rolloverCondition, rolloverHandler) {
@@ -77,8 +76,8 @@ public class LogSpoolerTest {
       }
 
       @Override
-      protected String getCurrentFileName() {
-        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      protected File initializeSpoolFile() {
+        return mockFile;
       }
     };
     logSpooler.add("log event");
@@ -86,20 +85,26 @@ public class LogSpoolerTest {
     verify(spoolWriter);
   }
 
+  private File setupInputFileExpectations() {
+    final File mockFile = mock(File.class);
+    expect(mockFile.length()).andReturn(10240L);
+    return mockFile;
+  }
+
   @Test
   public void shouldIncrementSpooledEventsCount() {
 
     final PrintWriter spoolWriter = mock(PrintWriter.class);
     spoolWriter.println("log event");
 
-    final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    final File mockFile = setupInputFileExpectations();
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
     logSpoolerContext.logEventSpooled();
     expect(rolloverCondition.shouldRollover(
         cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))).
         andReturn(false);
 
-    replay(spoolWriter, rolloverCondition);
+    replay(spoolWriter, rolloverCondition, mockFile);
 
     LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
         rolloverCondition, rolloverHandler) {
@@ -109,8 +114,8 @@ public class LogSpoolerTest {
       }
 
       @Override
-      protected String getCurrentFileName() {
-        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      protected File initializeSpoolFile() {
+        return mockFile;
       }
     };
     logSpooler.add("log event");
@@ -125,14 +130,14 @@ public class LogSpoolerTest {
     spoolWriter.flush();
     spoolWriter.close();
 
-    File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    final File mockFile = setupInputFileExpectations();
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
     expect(rolloverCondition.shouldRollover(
         cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
         andReturn(true);
-    rolloverHandler.handleRollover(spoolFile);
+    rolloverHandler.handleRollover(mockFile);
 
-    replay(spoolWriter, rolloverCondition, rolloverHandler);
+    replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
 
     LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
         rolloverCondition, rolloverHandler) {
@@ -143,8 +148,8 @@ public class LogSpoolerTest {
       }
 
       @Override
-      protected String getCurrentFileName() {
-        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      protected File initializeSpoolFile() {
+        return mockFile;
       }
     };
     logSpooler.add("log event");
@@ -161,22 +166,22 @@ public class LogSpoolerTest {
     spoolWriter1.flush();
     spoolWriter1.close();
 
-    File spoolFile1 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1");
-    File spoolFile2 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2");
+    final File mockFile1 = setupInputFileExpectations();
+    final File mockFile2 = setupInputFileExpectations();
 
-    LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(spoolFile1);
+    LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
     expect(rolloverCondition.shouldRollover(
         cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
     ).andReturn(true);
 
-    LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(spoolFile2);
+    LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
     expect(rolloverCondition.shouldRollover(
         cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
     ).andReturn(false);
 
-    rolloverHandler.handleRollover(spoolFile1);
+    rolloverHandler.handleRollover(mockFile1);
 
-    replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler);
+    replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2);
 
     LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
         rolloverCondition, rolloverHandler) {
@@ -193,11 +198,11 @@ public class LogSpoolerTest {
       }
 
       @Override
-      protected String getCurrentFileName() {
+      protected File initializeSpoolFile() {
         if (!wasRolledOver) {
-          return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1";
+          return mockFile1;
         } else {
-          return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2";
+          return mockFile2;
         }
       }
     };
@@ -214,14 +219,14 @@ public class LogSpoolerTest {
     spoolWriter.flush();
     spoolWriter.close();
 
-    File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
-    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    final File mockFile = setupInputFileExpectations();
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
     expect(rolloverCondition.shouldRollover(
         cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
     ).andReturn(true);
-    rolloverHandler.handleRollover(spoolFile);
+    rolloverHandler.handleRollover(mockFile);
 
-    replay(spoolWriter, rolloverCondition, rolloverHandler);
+    replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
 
     LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
         rolloverCondition, rolloverHandler) {
@@ -232,8 +237,8 @@ public class LogSpoolerTest {
       }
 
       @Override
-      protected String getCurrentFileName() {
-        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      protected File initializeSpoolFile() {
+        return mockFile;
       }
     };
     logSpooler.add("log event");
@@ -241,10 +246,121 @@ public class LogSpoolerTest {
     verify(rolloverHandler);
   }
 
+  // Rollover twice - the second rollover should work if the "rolloverInProgress"
+  // flag is being reset correctly. Third file expectations being setup due
+  // to auto-initialization.
+  @Test
+  public void shouldResetRolloverInProgressFlag() {
+    final PrintWriter spoolWriter1 = mock(PrintWriter.class);
+    final PrintWriter spoolWriter2 = mock(PrintWriter.class);
+    final PrintWriter spoolWriter3 = mock(PrintWriter.class);
+    spoolWriter1.println("log event1");
+    spoolWriter2.println("log event2");
+    spoolWriter1.flush();
+    spoolWriter1.close();
+    spoolWriter2.flush();
+    spoolWriter2.close();
+
+    final File mockFile1 = setupInputFileExpectations();
+    final File mockFile2 = setupInputFileExpectations();
+    final File mockFile3 = setupInputFileExpectations();
+
+    LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(true);
+
+    LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(true);
+
+    rolloverHandler.handleRollover(mockFile1);
+    rolloverHandler.handleRollover(mockFile2);
+
+    replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2, mockFile3);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+      private int currentFileNum;
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        PrintWriter spoolWriter = null;
+        switch (currentFileNum) {
+          case 0:
+            spoolWriter = spoolWriter1;
+            break;
+          case 1:
+            spoolWriter = spoolWriter2;
+            break;
+          case 2:
+            spoolWriter = spoolWriter3;
+            break;
+        }
+        currentFileNum++;
+        return spoolWriter;
+      }
+
+      @Override
+      protected File initializeSpoolFile() {
+        switch (currentFileNum) {
+          case 0:
+            return mockFile1;
+          case 1:
+            return mockFile2;
+          case 2:
+            return mockFile3;
+          default:
+            return null;
+        }
+      }
+    };
+    logSpooler.add("log event1");
+    logSpooler.add("log event2");
+
+    verify(spoolWriter1, spoolWriter2, rolloverCondition);
+  }
+
+  @Test
+  public void shouldNotRolloverZeroLengthFiles() {
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+    spoolWriter.flush();
+    spoolWriter.close();
+
+    final File mockFile = mock(File.class);
+    expect(mockFile.length()).andReturn(0L);
+
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
+        andReturn(true);
+
+    replay(spoolWriter, rolloverCondition, mockFile);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected File initializeSpoolFile() {
+        return mockFile;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(mockFile);
+  }
+
   class LogSpoolerFileComparator implements Comparator<LogSpoolerContext> {
     @Override
     public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
-      return o1.getActiveSpoolFile().compareTo(o2.getActiveSpoolFile());
+      return o1.getActiveSpoolFile()==o2.getActiveSpoolFile() ? 0 : -1;
     }
   }