You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/10 19:35:07 UTC

[2/3] ambari git commit: AMBARI-17788. Refactor spooler code in OutputHDFSFile to be reusable for OutputS3File (Hemanth Yamijala via oleewere)

AMBARI-17788. Refactor spooler code in OutputHDFSFile to be reusable for OutputS3File (Hemanth Yamijala via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/23bd337f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/23bd337f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/23bd337f

Branch: refs/heads/branch-2.5
Commit: 23bd337fb154049d4d3b9da9c16a66add89cf6a1
Parents: 1d9aa65
Author: oleewere <ol...@gmail.com>
Authored: Fri Aug 5 14:15:48 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Sat Sep 10 21:27:25 2016 +0200

----------------------------------------------------------------------
 .../ambari/logfeeder/output/OutputHDFSFile.java | 154 ++++-------
 .../logfeeder/output/spool/LogSpooler.java      | 137 ++++++++++
 .../output/spool/LogSpoolerContext.java         |  85 ++++++
 .../output/spool/LogSpoolerException.java       |  29 +++
 .../output/spool/RolloverCondition.java         |  36 +++
 .../logfeeder/output/spool/RolloverHandler.java |  40 +++
 .../logfeeder/output/spool/LogSpoolerTest.java  | 258 +++++++++++++++++++
 7 files changed, 640 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index 87cc0eb..f711a5f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -19,28 +19,32 @@
 
 package org.apache.ambari.logfeeder.output;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.ambari.logfeeder.LogFeederUtil;
 import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.util.DateUtil;
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
+import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
+import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
 import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
 import org.apache.ambari.logfeeder.util.PlaceholderUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
-public class OutputHDFSFile extends Output {
+import java.io.File;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * An {@link Output} that records logs to HDFS.
+ *
+ * The events are spooled on the local file system and uploaded in batches asynchronously.
+ */
+public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition {
   private final static Logger logger = Logger.getLogger(OutputHDFSFile.class);
+  private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
 
   private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
 
@@ -48,21 +52,15 @@ public class OutputHDFSFile extends Output {
 
   private Thread hdfsCopyThread = null;
 
-  private PrintWriter outWriter = null;
-  // local writer variables
-  private String localFilePath = null;
   private String filenamePrefix = "service-logs-";
-  private String localFileDir = null;
-  private File localcurrentFile = null;
-  private Date localFileCreateTime = null;
-  private long localFileRolloverSec = 5 * 1 * 60;// 5 min by default
+  private long rolloverThresholdTimeMillis;
 
   private String hdfsOutDir = null;
   private String hdfsHost = null;
   private String hdfsPort = null;
   private FileSystem fileSystem = null;
 
-  private String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+  private LogSpooler logSpooler;
 
   @Override
   public void init() throws Exception {
@@ -70,7 +68,8 @@ public class OutputHDFSFile extends Output {
     hdfsOutDir = getStringValue("hdfs_out_dir");
     hdfsHost = getStringValue("hdfs_host");
     hdfsPort = getStringValue("hdfs_port");
-    localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec);
+    long rolloverThresholdTimeSeconds = getLongValue("rollover_sec", DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS);
+    rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
     filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
     if (StringUtils.isEmpty(hdfsOutDir)) {
       logger
@@ -90,23 +89,15 @@ public class OutputHDFSFile extends Output {
     HashMap<String, String> contextParam = buildContextParam();
     hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
     logger.info("hdfs Output dir=" + hdfsOutDir);
-    localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
-    localFilePath = localFileDir;
+    String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
+    logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
     this.startHDFSCopyThread();
   }
 
   @Override
   public void close() {
     logger.info("Closing file." + getShortDescription());
-    if (outWriter != null) {
-      try {
-        outWriter.flush();
-        outWriter.close();
-        addFileInReadyList(localcurrentFile);
-      } catch (Throwable t) {
-        logger.error(t.getLocalizedMessage(),t);
-      }
-    }
+    logSpooler.rollover();
     this.stopHDFSCopyThread();
     isClosed = true;
   }
@@ -115,12 +106,8 @@ public class OutputHDFSFile extends Output {
   synchronized public void write(String block, InputMarker inputMarker)
       throws Exception {
     if (block != null) {
-      buildOutWriter();
-      if (outWriter != null) {
-        statMetric.count++;
-        outWriter.println(block);
-        closeFileIfNeeded();
-      }
+      logSpooler.add(block);
+      statMetric.count++;
     }
   }
 
@@ -130,59 +117,6 @@ public class OutputHDFSFile extends Output {
     return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
   }
 
-  private synchronized void closeFileIfNeeded() throws FileNotFoundException,
-      IOException {
-    if (outWriter == null) {
-      return;
-    }
-    // TODO: Close the file on absolute time. Currently it is implemented as
-    // relative time
-    if (System.currentTimeMillis() - localFileCreateTime.getTime() > localFileRolloverSec * 1000) {
-      logger.info("Closing file. Rolling over. name="
-          + localcurrentFile.getName() + ", filePath="
-          + localcurrentFile.getAbsolutePath());
-      try {
-        outWriter.flush();
-        outWriter.close();
-        addFileInReadyList(localcurrentFile);
-      } catch (Throwable t) {
-        logger
-            .error("Error on closing output writter. Exception will be ignored. name="
-                + localcurrentFile.getName()
-                + ", filePath="
-                + localcurrentFile.getAbsolutePath());
-      }
-
-      outWriter = null;
-      localcurrentFile = null;
-    }
-  }
-
-  private synchronized void buildOutWriter() {
-    if (outWriter == null) {
-      String currentFilePath = localFilePath + getCurrentFileName();
-      localcurrentFile = new File(currentFilePath);
-      if (localcurrentFile.getParentFile() != null) {
-        File parentDir = localcurrentFile.getParentFile();
-        if (!parentDir.isDirectory()) {
-          parentDir.mkdirs();
-        }
-      }
-      try {
-        outWriter = new PrintWriter(new BufferedWriter(new FileWriter(
-            localcurrentFile, true)));
-      } catch (IOException e) {
-        logger.error("= OutputHDFSFile.buidOutWriter failed for file :  "
-            + localcurrentFile.getAbsolutePath() + " Desc: "
-            + getShortDescription() + " errorMsg: " + e.getLocalizedMessage(),
-            e);
-      }
-      localFileCreateTime = new Date();
-      logger.info("Create file is successful. localFilePath="
-          + localcurrentFile.getAbsolutePath());
-    }
-  }
-
   private void startHDFSCopyThread() {
 
     hdfsCopyThread = new Thread("hdfsCopyThread") {
@@ -261,13 +195,6 @@ public class OutputHDFSFile extends Output {
     }
   }
 
-  private String getCurrentFileName() {
-    Date currentDate = new Date();
-    String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
-    String fileName = filenamePrefix + dateStr;
-    return fileName;
-  }
-
   private HashMap<String, String> buildContextParam() {
     HashMap<String, String> contextParam = new HashMap<String, String>();
     contextParam.put("host", LogFeederUtil.hostName);
@@ -291,4 +218,33 @@ public class OutputHDFSFile extends Output {
     throw new UnsupportedOperationException(
         "copyFile method is not yet supported for output=hdfs");     
   }
+
+  /**
+   * Add the rollover file to a daemon thread for uploading to HDFS
+   * @param rolloverFile the file to be uploaded to HDFS
+   */
+  @Override
+  public void handleRollover(File rolloverFile) {
+    addFileInReadyList(rolloverFile);
+  }
+
+  /**
+   * Determines whether it is time to handleRollover the current spool file.
+   *
+   * The file will handleRollover if the time since creation of the file is more than
+   * the timeout specified in rollover_sec configuration.
+   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state of active Spool file
+   * @return true if time since creation is greater than value specified in rollover_sec,
+   *          false otherwise.
+   */
+  @Override
+  public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
+    long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
+    boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
+    if (shouldRollover) {
+      logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
+                    " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
+    }
+    return shouldRollover;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
new file mode 100644
index 0000000..306326a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.util.DateUtil;
+import org.apache.log4j.Logger;
+
+import java.io.*;
+import java.util.Date;
+
+/**
+ * A class that manages local storage of log events before they are uploaded to the output destinations.
+ *
+ * This class should be used by any {@link Output}s that wish to upload log files to an
+ * output destination on a periodic batched basis. Log events should be added to an instance
+ * of this class to be stored locally. This class determines when to
+ * rollover using calls to an interface {@link RolloverCondition}. Likewise, it uses an interface
+ * {@link RolloverHandler} to trigger the handling of the rolled over file.
+ */
+public class LogSpooler {
+  static private Logger logger = Logger.getLogger(LogSpooler.class);
+  static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+
+  private String spoolDirectory;
+  private String sourceFileNamePrefix;
+  private RolloverCondition rolloverCondition;
+  private RolloverHandler rolloverHandler;
+  private PrintWriter currentSpoolBufferedWriter;
+  private File currentSpoolFile;
+  private LogSpoolerContext currentSpoolerContext;
+
+  /**
+   * Create an instance of the LogSpooler.
+   * @param spoolDirectory The directory under which spooler files are created.
+   *                       Should be unique per instance of {@link Output}
+   * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
+   * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
+   *                          determine when to rollover.
+   * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
+   *                        there should be a rollover.
+   */
+  public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
+                    RolloverHandler rolloverHandler) {
+    this.spoolDirectory = spoolDirectory;
+    this.sourceFileNamePrefix = sourceFileNamePrefix;
+    this.rolloverCondition = rolloverCondition;
+    this.rolloverHandler = rolloverHandler;
+    initializeSpoolFile();
+  }
+
+  private void initializeSpoolDirectory() {
+    File spoolDir = new File(spoolDirectory);
+    if (!spoolDir.exists()) {
+      logger.info("Creating spool directory: " + spoolDir);
+      boolean result = spoolDir.mkdirs();
+      if (!result) {
+        throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory);
+      }
+    }
+  }
+
+  private void initializeSpoolFile() {
+    initializeSpoolDirectory();
+    currentSpoolFile = new File(spoolDirectory, getCurrentFileName());
+    try {
+      currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile);
+    } catch (IOException e) {
+      throw new LogSpoolerException("Could not create buffered writer for spool file: " + currentSpoolFile
+          + ", error message: " + e.getLocalizedMessage(), e);
+    }
+    currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
+    logger.info("Initialized spool file at path: " + currentSpoolFile.getAbsolutePath());
+  }
+
+  @VisibleForTesting
+  protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+    return new PrintWriter(new BufferedWriter(new FileWriter(spoolFile)));
+  }
+
+  /**
+   * Add an event for spooling.
+   *
+   * This method adds the event to the current spool file's buffer. On completion, it
+   * calls the {@link RolloverCondition#shouldRollover(LogSpoolerContext)} method to determine if
+   * it is ready to rollover the file.
+   * @param logEvent The log event to spool.
+   */
+  public void add(String logEvent) {
+    currentSpoolBufferedWriter.println(logEvent);
+    currentSpoolerContext.logEventSpooled();
+    if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
+      rollover();
+    }
+  }
+
+  /**
+   * Trigger a rollover of the current spool file.
+   *
+   * This method manages the rollover of the spool file, and then invokes the
+   * {@link RolloverHandler#handleRollover(File)} to handle what should be done with the
+   * rolled over file.
+   */
+  public void rollover() {
+    logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
+    currentSpoolBufferedWriter.flush();
+    currentSpoolBufferedWriter.close();
+    rolloverHandler.handleRollover(currentSpoolFile);
+    logger.info("Invoked rollover handler with file: " + currentSpoolFile);
+    initializeSpoolFile();
+  }
+
+  @VisibleForTesting
+  protected String getCurrentFileName() {
+    Date currentDate = new Date();
+    String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
+    return sourceFileNamePrefix + dateStr;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
new file mode 100644
index 0000000..084d6a2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import java.io.File;
+import java.util.Date;
+
+/**
+ * A class that holds the state of an spool file.
+ *
+ * The state in this class can be used by a {@link RolloverCondition} to determine
+ * if an active spool file should be rolled over.
+ */
+public class LogSpoolerContext {
+
+  private File activeSpoolFile;
+  private long numEventsSpooled;
+  private Date activeLogCreationTime;
+
+  /**
+   * Create a new LogSpoolerContext
+   * @param activeSpoolFile the spool file for which to hold state
+   */
+  public LogSpoolerContext(File activeSpoolFile) {
+    this.activeSpoolFile = activeSpoolFile;
+    this.numEventsSpooled = 0;
+    this.activeLogCreationTime = new Date();
+  }
+
+  /**
+   * Increment number of spooled events by one.
+   */
+  public void logEventSpooled() {
+    numEventsSpooled++;
+  }
+
+  public File getActiveSpoolFile() {
+    return activeSpoolFile;
+  }
+
+  public long getNumEventsSpooled() {
+    return numEventsSpooled;
+  }
+
+  public Date getActiveLogCreationTime() {
+    return activeLogCreationTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    LogSpoolerContext that = (LogSpoolerContext) o;
+
+    if (numEventsSpooled != that.numEventsSpooled) return false;
+    if (!activeSpoolFile.equals(that.activeSpoolFile)) return false;
+    return activeLogCreationTime.equals(that.activeLogCreationTime);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = activeSpoolFile.hashCode();
+    result = 31 * result + (int) (numEventsSpooled ^ (numEventsSpooled >>> 32));
+    result = 31 * result + activeLogCreationTime.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
new file mode 100644
index 0000000..1e12fb7
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+public class LogSpoolerException extends RuntimeException {
+  public LogSpoolerException(String message, Exception cause) {
+    super(message, cause);
+  }
+
+  public LogSpoolerException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
new file mode 100644
index 0000000..8279645
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+/**
+ * An interface that is used to determine whether a rollover of a locally spooled log file should be triggered.
+ */
+public interface RolloverCondition {
+
+  /**
+   * Check if the active spool file should be rolled over.
+   *
+   * If this returns true, the {@link LogSpooler} will initiate activities related
+   * to rollover of the file
+   * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
+   *                                                       for rollover.
+   * @return true if active spool file should be rolled over, false otherwise
+   */
+  boolean shouldRollover(LogSpoolerContext currentSpoolerContext);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
new file mode 100644
index 0000000..11308e4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import java.io.File;
+
+/**
+ * An interface that is used to trigger the handling of a rolled over file.
+ *
+ * Implementations of this interface will typically upload the rolled over file to
+ * a target destination, like HDFS.
+ */
+public interface RolloverHandler {
+  /**
+   * Handle a rolled over file.
+   *
+   * This method is called inline from the {@link LogSpooler#rollover()} method.
+   * Hence implementations should either complete the handling fast, or do so
+   * asynchronously. The cleanup of the file is left to implementors, but should
+   * typically be done once the upload the file to the target destination is complete.
+   * @param rolloverFile The file that has been rolled over.
+   */
+  void handleRollover(File rolloverFile);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/23bd337f/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
new file mode 100644
index 0000000..7d9d78a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output.spool;
+
+import org.easymock.EasyMockRule;
+import org.easymock.LogicalOperator;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Comparator;
+
+import static org.easymock.EasyMock.*;
+
+public class LogSpoolerTest {
+
+  @Rule
+  public TemporaryFolder testFolder = new TemporaryFolder();
+
+  @Rule
+  public EasyMockRule mocks = new EasyMockRule(this);
+
+  private String spoolDirectory;
+  private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log";
+  private static final String FILE_SUFFIX = "currentFile";
+
+  @Mock
+  private RolloverCondition rolloverCondition;
+
+  @Mock
+  private RolloverHandler rolloverHandler;
+
+  @Before
+  public void setup() {
+    spoolDirectory = testFolder.getRoot().getAbsolutePath();
+  }
+
+  @Test
+  public void shouldSpoolEventToFile() {
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+
+    final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
+        andReturn(false);
+
+    replay(spoolWriter, rolloverCondition);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(spoolWriter);
+  }
+
+  @Test
+  public void shouldIncrementSpooledEventsCount() {
+
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+
+    final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    logSpoolerContext.logEventSpooled();
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))).
+        andReturn(false);
+
+    replay(spoolWriter, rolloverCondition);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(rolloverCondition);
+  }
+
+  @Test
+  public void shouldCloseCurrentSpoolFileOnRollOver() {
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+    spoolWriter.flush();
+    spoolWriter.close();
+
+    File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
+        andReturn(true);
+    rolloverHandler.handleRollover(spoolFile);
+
+    replay(spoolWriter, rolloverCondition, rolloverHandler);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(spoolWriter);
+  }
+
+  @Test
+  public void shouldReinitializeFileOnRollover() {
+    final PrintWriter spoolWriter1 = mock(PrintWriter.class);
+    final PrintWriter spoolWriter2 = mock(PrintWriter.class);
+    spoolWriter1.println("log event1");
+    spoolWriter2.println("log event2");
+    spoolWriter1.flush();
+    spoolWriter1.close();
+
+    File spoolFile1 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1");
+    File spoolFile2 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2");
+
+    LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(spoolFile1);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(true);
+
+    LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(spoolFile2);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(false);
+
+    rolloverHandler.handleRollover(spoolFile1);
+
+    replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+      private boolean wasRolledOver;
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        if (!wasRolledOver) {
+          wasRolledOver = true;
+          return spoolWriter1;
+        } else {
+          return spoolWriter2;
+        }
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        if (!wasRolledOver) {
+          return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1";
+        } else {
+          return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2";
+        }
+      }
+    };
+    logSpooler.add("log event1");
+    logSpooler.add("log event2");
+
+    verify(spoolWriter1, spoolWriter2, rolloverCondition);
+  }
+
+  @Test
+  public void shouldCallRolloverHandlerOnRollover() {
+    final PrintWriter spoolWriter = mock(PrintWriter.class);
+    spoolWriter.println("log event");
+    spoolWriter.flush();
+    spoolWriter.close();
+
+    File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX);
+    LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile);
+    expect(rolloverCondition.shouldRollover(
+        cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
+    ).andReturn(true);
+    rolloverHandler.handleRollover(spoolFile);
+
+    replay(spoolWriter, rolloverCondition, rolloverHandler);
+
+    LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
+        rolloverCondition, rolloverHandler) {
+
+      @Override
+      protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
+        return spoolWriter;
+      }
+
+      @Override
+      protected String getCurrentFileName() {
+        return SOURCE_FILENAME_PREFIX + FILE_SUFFIX;
+      }
+    };
+    logSpooler.add("log event");
+
+    verify(rolloverHandler);
+  }
+
+  class LogSpoolerFileComparator implements Comparator<LogSpoolerContext> {
+    @Override
+    public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
+      return o1.getActiveSpoolFile().compareTo(o2.getActiveSpoolFile());
+    }
+  }
+
+  class LogSpoolerEventCountComparator implements Comparator<LogSpoolerContext> {
+    @Override
+    public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
+      return (int)(o1.getNumEventsSpooled()-o2.getNumEventsSpooled());
+    }
+  }
+
+}