You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:12 UTC

svn commit: r1156865 - in /incubator/flume/branches/flume-728/flume-ng-core: ./ src/main/java/org/apache/flume/core/ src/main/java/org/apache/flume/core/util/ src/main/java/org/apache/flume/lifecycle/ src/main/java/org/apache/flume/sink/ src/main/java/...

Author: esammer
Date: Fri Aug 12 00:46:11 2011
New Revision: 1156865

URL: http://svn.apache.org/viewvc?rev=1156865&view=rev
Log:
- Added a channel driver to move events from source to sink. - Context now provides a Reporter for indicating progress. - Event{Sink,Source} interfaces now throw LifecycleException, InterruptedException, and MessageDeliveryException where appropriate. - Added LogicalNode - handles the guts of tying sources to sinks and managing ChannelDriver lifecycle. - Added MessageDeliveryException - thrown by sources and sinks to indicate the message is dead. - Reporter instances now start with the current time for the last report rather than zero. Creation of the Reporter indicates status of some kind. - Added ConcurrencyUtils with a method that attempts to guarantee a Runnable happens within a given timeout as well as reports progress via the Reporter even N millis. Super buggy right now. - Fixed a bug in EventBuilder where we forgot to set the event body. Oops. - Added the LifecycleAware interface. Classes can participate in generic lifecycle events by implementing this interface. Dang
 er: this will change at least once more soon. - Added LifecycleController which includes methods for waiting on specific lifecycle states within a given timeout on any LifecycleAware instance. - Added LifecycleException. Thrown during LifecycleAware methods to indicate a failure during start / stop. - Added LifecycleState. Indicates what state a LifecycleAware instance is currently in. - Updated AbstractEvent{Sink,Source} to conform to the new Event{Sink,Source} interfaces. - Added a LoggerSink implementation that simply passes each event to the logging system (e.g. log4j). Useful for testing and debugging. - Added NullSink - /dev/null for Flume. Does absolutely nothing with a message. Useful for testing and debugging. - Added SequenceGeneratorSource. A source that generates monotonically incrementing integers. - Added unit tests for Context. - Added unit tests for LogicalNode. - Removed a test of Reporter that was no longer true (that timeStamp started at zero). - Added (fl
 akey) unit tests for ConcurrencyUtils. - Added unit tests for EventBuilder. - Added unit tests for LifecycleController. - Added unit tests for LoggerSink. - Added unit tests for SequenceGeneratorSource.

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/pom.xml
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/pom.xml?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/pom.xml (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/pom.xml Fri Aug 12 00:46:11 2011
@@ -22,6 +22,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,171 @@
+package org.apache.flume.core;
+
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class ChannelDriver implements LifecycleAware {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(ChannelDriver.class);
+
+  private String name;
+  private EventSource source;
+  private EventSink sink;
+  private ChannelDriverThread driverThread;
+
+  private LifecycleState lifecycleState;
+
+  public ChannelDriver(String name) {
+    this.name = name;
+
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public void start(Context context) throws LifecycleException {
+    logger.debug("Channel driver starting:{}", this);
+
+    driverThread = new ChannelDriverThread(name);
+
+    driverThread.setSource(source);
+    driverThread.setSink(sink);
+
+    driverThread.start();
+
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop(Context context) throws LifecycleException {
+    logger.debug("Channel driver stopping:{}", this);
+
+    driverThread.setShouldStop(true);
+
+    while (driverThread.isAlive()) {
+      try {
+        logger.debug("Waiting for driver to stop");
+
+        driverThread.join(1000);
+      } catch (InterruptedException e) {
+        logger
+            .debug("Interrupted while waiting for driver thread to shutdown. Interrupting it and stopping.");
+        driverThread.interrupt();
+      }
+    }
+
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  @Override
+  public void transitionTo(LifecycleState state) {
+  }
+
+  @Override
+  public String toString() {
+    return "{ source:" + source + " sink:" + sink + " }";
+  }
+
+  public EventSource getSource() {
+    return source;
+  }
+
+  public void setSource(EventSource source) {
+    this.source = source;
+  }
+
+  public EventSink getSink() {
+    return sink;
+  }
+
+  public void setSink(EventSink sink) {
+    this.sink = sink;
+  }
+
+  private static class ChannelDriverThread extends Thread {
+
+    private EventSource source;
+    private EventSink sink;
+    private Context context;
+
+    private long totalEvents;
+    private long discardedEvents;
+    private long nullEvents;
+    private long successfulEvents;
+
+    volatile private boolean shouldStop;
+
+    public ChannelDriverThread(String name) {
+      super(name);
+
+      totalEvents = 0;
+      discardedEvents = 0;
+      nullEvents = 0;
+      successfulEvents = 0;
+
+      shouldStop = false;
+    }
+
+    @Override
+    public void run() {
+      logger.debug("Channel driver thread running");
+
+      Preconditions.checkState(source != null, "Source can not be null");
+      Preconditions.checkState(sink != null, "Sink can not be null");
+
+      while (!shouldStop) {
+        Event<?> event = null;
+
+        try {
+          event = source.next(context);
+
+          if (event != null) {
+            sink.append(context, event);
+            successfulEvents++;
+          } else {
+            nullEvents++;
+          }
+        } catch (InterruptedException e) {
+          logger.debug("Received an interrupt while moving events - stopping");
+          shouldStop = true;
+        } catch (MessageDeliveryException e) {
+          logger.debug("Unable to deliver event:{} (may be null)", event);
+          discardedEvents++;
+          /* FIXME: Handle dead messages. */
+        }
+
+        totalEvents++;
+      }
+
+      logger.debug("Channel driver thread exiting cleanly");
+      logger
+          .info(
+              "Event metrics - totalEvents:{} successfulEvents:{} nullEvents:{} discardedEvents:{}",
+              new Object[] { totalEvents, successfulEvents, nullEvents,
+                  discardedEvents });
+    }
+
+    public void setSource(EventSource source) {
+      this.source = source;
+    }
+
+    public void setSink(EventSink sink) {
+      this.sink = sink;
+    }
+
+    public void setShouldStop(boolean shouldStop) {
+      this.shouldStop = shouldStop;
+    }
+
+  }
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java Fri Aug 12 00:46:11 2011
@@ -6,9 +6,11 @@ import java.util.Map;
 public class Context {
 
   private Map<String, Object> parameters;
+  private Reporter reporter;
 
   public Context() {
     parameters = new HashMap<String, Object>();
+    reporter = new Reporter();
   }
 
   public void put(String key, Object value) {
@@ -25,7 +27,7 @@ public class Context {
 
   @Override
   public String toString() {
-    return "{ parameters:" + parameters + " }";
+    return "{ parameters:" + parameters + " reporter:" + reporter + " }";
   }
 
   public Map<String, Object> getParameters() {
@@ -36,4 +38,12 @@ public class Context {
     this.parameters = parameters;
   }
 
+  public Reporter getReporter() {
+    return reporter;
+  }
+
+  public void setReporter(Reporter reporter) {
+    this.reporter = reporter;
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java Fri Aug 12 00:46:11 2011
@@ -1,11 +1,16 @@
 package org.apache.flume.core;
 
+import org.apache.flume.lifecycle.LifecycleException;
+
 public interface EventSink {
 
-  public void open(Context context);
+  public void open(Context context) throws InterruptedException,
+      LifecycleException;
 
-  public void append(Context context, Event<?> event);
+  public void append(Context context, Event<?> event)
+      throws InterruptedException, MessageDeliveryException;
 
-  public void close(Context context);
+  public void close(Context context) throws InterruptedException,
+      LifecycleException;
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java Fri Aug 12 00:46:11 2011
@@ -1,11 +1,16 @@
 package org.apache.flume.core;
 
+import org.apache.flume.lifecycle.LifecycleException;
+
 public interface EventSource {
 
-  public void open(Context context);
+  public void open(Context context) throws InterruptedException,
+      LifecycleException;
 
-  public Event<?> next(Context context);
+  public Event<?> next(Context context) throws InterruptedException,
+      MessageDeliveryException;
 
-  public void close(Context context);
+  public void close(Context context) throws InterruptedException,
+      LifecycleException;
 
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,114 @@
+package org.apache.flume.core;
+
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class LogicalNode implements LifecycleAware {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(LogicalNode.class);
+
+  private String name;
+  private EventSource source;
+  private EventSink sink;
+
+  private ChannelDriver driver;
+
+  private LifecycleState lifecycleState;
+
+  public LogicalNode() {
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public void start(Context context) throws LifecycleException {
+    logger.info("Starting logical node:{}", this);
+
+    Preconditions.checkState(name != null, "Logical node name can not be null");
+    Preconditions.checkState(source != null,
+        "Logical node source can not be null");
+    Preconditions.checkState(sink != null, "Logical node sink can not be null");
+
+    driver = new ChannelDriver(name + "-channelDriver");
+
+    driver.setSource(source);
+    driver.setSink(sink);
+
+    driver.start(context);
+
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop(Context context) throws LifecycleException {
+    logger.info("Stopping logical node:{}", this);
+
+    boolean complete = false;
+
+    driver.stop(context);
+
+    try {
+      complete = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+          LifecycleState.STOP, LifecycleState.ERROR }, 10000);
+    } catch (InterruptedException e) {
+      logger.debug("Interrupted while waiting for the driver to stop.");
+      complete = false;
+    }
+
+    if (!complete) {
+      logger
+          .error(
+              "There's a good chance the source or sink aren't shutting down. This will lead to problems. Contact the developers! Trace:{}",
+              Thread.currentThread().getStackTrace());
+    }
+
+    /* Our state is the channel driver's state. */
+    lifecycleState = driver.getLifecycleState();
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  @Override
+  public void transitionTo(LifecycleState state) {
+  }
+
+  @Override
+  public String toString() {
+    return "{ name:" + name + " source:" + source + " sink:" + sink
+        + " lifecycleState:" + lifecycleState + " }";
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public EventSource getSource() {
+    return source;
+  }
+
+  public void setSource(EventSource source) {
+    this.source = source;
+  }
+
+  public EventSink getSink() {
+    return sink;
+  }
+
+  public void setSink(EventSink sink) {
+    this.sink = sink;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.core;
+
+public class MessageDeliveryException extends Exception {
+
+  private static final long serialVersionUID = 1102327497549834945L;
+
+  public MessageDeliveryException() {
+    super();
+  }
+
+  public MessageDeliveryException(String message) {
+    super(message);
+  }
+
+  public MessageDeliveryException(String message, Throwable t) {
+    super(message, t);
+  }
+
+  public MessageDeliveryException(Throwable t) {
+    super(t);
+  }
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java Fri Aug 12 00:46:11 2011
@@ -10,7 +10,7 @@ public class Reporter {
   private long timeStamp;
 
   public Reporter() {
-    timeStamp = 0;
+    timeStamp = System.currentTimeMillis();
   }
 
   public synchronized long progress() {

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,105 @@
+package org.apache.flume.core.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.flume.core.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConcurrencyUtils {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(ConcurrencyUtils.class);
+
+  /**
+   * <p>
+   * Run the provided {@link Runnable} using the {@code executorService}. If it
+   * takes longer than {@code timeLimit} or does not regularly report status
+   * using the supplied {@link Reporter}, cancel it.
+   * </p>
+   * <p>
+   * This is meant for operations that may take time but have an SLA. It was
+   * originally created for ops like source / sink opens or next / append calls
+   * that may block (at least externally). For these cases, we want them to
+   * block but we need to know they're still alive.
+   * </p>
+   * 
+   * @param executorService
+   * @param runnable
+   * @param reporter
+   * @param timeLimit
+   *          (or zero for no time limit)
+   * @return true if the task finished within the timeLimit and with acceptable
+   *         reporting, false otherwise.
+   * @throws InterruptedException
+   */
+  public static boolean executeReportAwareWithSLA(
+      ExecutorService executorService, Runnable runnable, Reporter reporter,
+      long reportInterval, long timeLimit) throws InterruptedException {
+
+    boolean success = false;
+
+    Future<?> future = executorService.submit(runnable);
+
+    long currentTime = System.currentTimeMillis();
+    long deadLine = timeLimit > 0 ? currentTime + timeLimit : 0;
+
+    logger.debug("Need to complete {} by {}. Must see progress every {}ms",
+        new Object[] { future, deadLine, reportInterval });
+
+    while (true) {
+      currentTime = System.currentTimeMillis();
+      long lastReport = reporter.getTimeStamp();
+      long timeSinceReport = currentTime - lastReport;
+
+      logger
+          .debug(
+              "Checking status of {} - lastReport:{} currentTime:{} timeSinceReport:{}",
+              new Object[] { future, lastReport, currentTime, timeSinceReport });
+
+      if (deadLine > 0 && currentTime > deadLine) {
+        logger.debug("Cancelling {} - exceeded deadLine:{}", future, deadLine);
+
+        future.cancel(true);
+        break;
+      } else if (timeSinceReport > reportInterval) {
+        logger.debug("Cancelling {} - no status in {}ms", future,
+            timeSinceReport);
+
+        future.cancel(true);
+        break;
+      } else if (future.isDone()) {
+        logger.debug("Task {} has finished", future);
+
+        success = true;
+        break;
+      } else {
+
+        /*
+         * If there's a deadline, pick the smaller of the next deadline or the
+         * next progress time, otherwise just take the next report time.
+         * 
+         * Then, take the smaller of the above - now (minus 50ms of slop) or
+         * 500ms. Oh, but don't let it be < 0. Math is fun.
+         * 
+         * I'm convinced this isn't right, but I'm moving on. Kick me later.
+         */
+        long smallestRuleLength = timeLimit > 0 ? Math.min(currentTime
+            + deadLine, currentTime + reportInterval) : currentTime
+            + reportInterval;
+        long sleepLength = Math.min(500,
+            Math.max(smallestRuleLength - currentTime - 50, 0));
+
+        logger
+            .debug(
+                "Still waiting for {} to complete - timeSinceReport:{} deadLine:{} sleeping:{}",
+                new Object[] { future, timeSinceReport, deadLine, sleepLength });
+
+        Thread.sleep(sleepLength);
+      }
+    }
+
+    return success;
+  }
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java Fri Aug 12 00:46:11 2011
@@ -11,6 +11,8 @@ public class EventBuilder {
   public static <T> Event<T> withBody(T body) {
     Event<T> event = new SimpleEvent<T>();
 
+    event.setBody(body);
+
     return event;
   }
 

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,15 @@
+package org.apache.flume.lifecycle;
+
+import org.apache.flume.core.Context;
+
+public interface LifecycleAware {
+
+  public void start(Context context) throws LifecycleException;
+
+  public void stop(Context context) throws LifecycleException;
+
+  public LifecycleState getLifecycleState();
+
+  public void transitionTo(LifecycleState state);
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,67 @@
+package org.apache.flume.lifecycle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LifecycleController {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(LifecycleController.class);
+  private static final long shortestSleepDuration = 50;
+  private static final int maxNumberOfChecks = 5;
+
+  /*
+   * public static boolean waitForState(LifecycleAware delegate, LifecycleState
+   * state, long timeout) throws InterruptedException {
+   * 
+   * logger.debug("Waiting for state {} for delegate:{} up to {}ms", new
+   * Object[] { state, delegate, timeout });
+   * 
+   * long sleepInterval = Math.max(shortestSleepDuration, timeout /
+   * maxNumberOfChecks); long deadLine = System.currentTimeMillis() + timeout;
+   * 
+   * do { if (delegate.getLifecycleState().equals(state)) { return true; }
+   * 
+   * logger.debug("Still want state:{} sleeping:{}ms", state, sleepInterval);
+   * Thread.sleep(sleepInterval); } while (System.currentTimeMillis() <
+   * deadLine);
+   * 
+   * logger.debug("Didn't see state within timeout {}ms", timeout);
+   * 
+   * return false; }
+   */
+
+  public static boolean waitForState(LifecycleAware delegate,
+      LifecycleState state, long timeout) throws InterruptedException {
+
+    return waitForOneOf(delegate, new LifecycleState[] { state }, timeout);
+  }
+
+  public static boolean waitForOneOf(LifecycleAware delegate,
+      LifecycleState[] states, long timeout) throws InterruptedException {
+
+    logger.debug("Waiting for state {} for delegate:{} up to {}ms",
+        new Object[] { states, delegate, timeout });
+
+    long sleepInterval = Math.max(shortestSleepDuration, timeout
+        / maxNumberOfChecks);
+    long deadLine = System.currentTimeMillis() + timeout;
+
+    do {
+      for (LifecycleState state : states) {
+        if (delegate.getLifecycleState().equals(state)) {
+          return true;
+        }
+      }
+
+      logger.debug("Still want one of states:{} sleeping:{}ms", states,
+          sleepInterval);
+      Thread.sleep(sleepInterval);
+    } while (System.currentTimeMillis() < deadLine);
+
+    logger.debug("Didn't see state within timeout {}ms", timeout);
+
+    return false;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.lifecycle;
+
+public class LifecycleException extends Exception {
+
+  private static final long serialVersionUID = 4689000562519155240L;
+
+  public LifecycleException() {
+    super();
+  }
+
+  public LifecycleException(String message) {
+    super(message);
+  }
+
+  public LifecycleException(String message, Throwable t) {
+    super(message, t);
+  }
+
+  public LifecycleException(Throwable t) {
+    super(t);
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,5 @@
+package org.apache.flume.lifecycle;
+
+public enum LifecycleState {
+  IDLE, START, STOP, ERROR;
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java Fri Aug 12 00:46:11 2011
@@ -3,19 +3,24 @@ package org.apache.flume.sink;
 import org.apache.flume.core.Context;
 import org.apache.flume.core.Event;
 import org.apache.flume.core.EventSink;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.lifecycle.LifecycleException;
 
 abstract public class AbstractEventSink implements EventSink {
 
   @Override
-  public void open(Context context) {
+  public void open(Context context) throws InterruptedException,
+      LifecycleException {
     // Empty implementation by default.
   }
 
   @Override
-  abstract public void append(Context context, Event<?> event);
+  abstract public void append(Context context, Event<?> event)
+      throws InterruptedException, MessageDeliveryException;
 
   @Override
-  public void close(Context context) {
+  public void close(Context context) throws InterruptedException,
+      LifecycleException {
     // Empty implementation by default.
   }
 

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,21 @@
+package org.apache.flume.sink;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.MessageDeliveryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggerSink extends AbstractEventSink {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestLoggerSink.class);
+
+  @Override
+  public void append(Context context, Event<?> event)
+      throws InterruptedException, MessageDeliveryException {
+
+    logger.info("event:{}", event);
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,17 @@
+package org.apache.flume.sink;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.MessageDeliveryException;
+
+public class NullSink extends AbstractEventSink {
+
+  @Override
+  public void append(Context context, Event<?> event)
+      throws InterruptedException, MessageDeliveryException {
+
+    /* We purposefully do absolutely nothing. */
+
+  }
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java Fri Aug 12 00:46:11 2011
@@ -3,19 +3,21 @@ package org.apache.flume.source;
 import org.apache.flume.core.Context;
 import org.apache.flume.core.Event;
 import org.apache.flume.core.EventSource;
+import org.apache.flume.core.MessageDeliveryException;
 
 abstract public class AbstractEventSource implements EventSource {
 
   @Override
-  public void open(Context context) {
+  public void open(Context context) throws InterruptedException {
     // Empty implementation by default.
   }
 
   @Override
-  abstract public Event<?> next(Context context);
+  abstract public Event<?> next(Context context) throws InterruptedException,
+      MessageDeliveryException;
 
   @Override
-  public void close(Context context) {
+  public void close(Context context) throws InterruptedException {
     // Empty implementation by default.
   }
 

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.source;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.core.SimpleEvent;
+
+public class SequenceGeneratorSource extends AbstractEventSource {
+
+  private long sequence;
+
+  @Override
+  public Event<?> next(Context context) throws InterruptedException,
+      MessageDeliveryException {
+
+    Event<Long> event = new SimpleEvent<Long>();
+
+    event.setBody(sequence++);
+
+    return event;
+  }
+
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java Fri Aug 12 00:46:11 2011
@@ -15,8 +15,7 @@ public class TestContext {
 
   @Test
   public void testPutGet() {
-    Assert.assertEquals("Context is empty", 0, context.getParameters()
-        .size());
+    Assert.assertEquals("Context is empty", 0, context.getParameters().size());
 
     context.put("test", "test");
 
@@ -24,4 +23,9 @@ public class TestContext {
         context.get("test", String.class));
   }
 
+  @Test
+  public void testReporter() {
+    Assert.assertNotNull(context.getReporter());
+  }
+
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,112 @@
+package org.apache.flume.core;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.sink.LoggerSink;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestLogicalNode {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestLogicalNode.class);
+
+  private LogicalNode node;
+
+  @Before
+  public void setUp() {
+    node = new LogicalNode();
+
+    node.setName("test-node-n1");
+    node.setSource(new SequenceGeneratorSource());
+    node.setSink(new LoggerSink());
+  }
+
+  @Test
+  public void testLifecycle() throws LifecycleException, InterruptedException {
+    Context context = new Context();
+
+    node.start(context);
+    boolean reached = LifecycleController.waitForOneOf(node,
+        new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR },
+        5000);
+
+    Assert.assertTrue("Matched a lifecycle state", reached);
+    Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+    node.stop(context);
+    reached = LifecycleController.waitForOneOf(node, new LifecycleState[] {
+        LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+    Assert.assertTrue("Matched a lifecycle state", reached);
+    Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+  }
+
+  @Test
+  public void testMultipleNodes() throws LifecycleException,
+      InterruptedException {
+
+    final AtomicInteger successfulThread = new AtomicInteger(0);
+    final CountDownLatch finishedThreads = new CountDownLatch(10);
+
+    for (int i = 0; i < 10; i++) {
+      final int j = i;
+
+      new Thread("test-node-runner-" + i) {
+
+        @Override
+        public void run() {
+          Context context = new Context();
+          LogicalNode node = new LogicalNode();
+
+          node.setName("test-node-" + j);
+          node.setSource(new SequenceGeneratorSource());
+          node.setSink(new NullSink());
+
+          try {
+            node.start(context);
+
+            boolean reached = LifecycleController.waitForOneOf(node,
+                new LifecycleState[] { LifecycleState.START,
+                    LifecycleState.ERROR }, 5000);
+
+            Assert.assertTrue("Matched a lifecycle state", reached);
+            Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+            Thread.sleep(500);
+
+            node.stop(context);
+            reached = LifecycleController.waitForOneOf(node,
+                new LifecycleState[] { LifecycleState.STOP,
+                    LifecycleState.ERROR }, 5000);
+
+            Assert.assertTrue("Matched a lifecycle state", reached);
+            Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+
+            successfulThread.incrementAndGet();
+          } catch (LifecycleException e) {
+            logger.debug("Exception follows", e);
+          } catch (InterruptedException e) {
+            logger.debug("Exception follows", e);
+          }
+
+          finishedThreads.countDown();
+        }
+      }.start();
+
+    }
+
+    finishedThreads.await();
+
+    Assert.assertEquals(10, successfulThread.get());
+  }
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java Fri Aug 12 00:46:11 2011
@@ -15,7 +15,6 @@ public class TestReporter {
 
   @Test
   public void testProgress() {
-    Assert.assertEquals("Timestamp starts at zero", 0, reporter.getTimeStamp());
     Assert.assertTrue("Timestamp > 0 after progress", reporter.progress() > 0);
   }
 

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,112 @@
+package org.apache.flume.core.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.flume.core.Reporter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestConcurrencyUtils {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestConcurrencyUtils.class);
+
+  private ExecutorService executorService;
+
+  @Before
+  public void setUp() {
+    executorService = Executors.newSingleThreadExecutor();
+  }
+
+  @After
+  public void tearDown() {
+    executorService.shutdownNow();
+  }
+
+  /**
+   * Run a normal task that reports progress AND finishes within the deadline.
+   */
+  @Test
+  public void testNormalRun() throws InterruptedException {
+    final Reporter reporter = new Reporter();
+
+    Runnable r = new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; i < 3; i++) {
+            reporter.progress();
+            Thread.sleep(100);
+          }
+        } catch (InterruptedException e) {
+          logger.error("Unexcepted interruption of thread.", e);
+        }
+      }
+    };
+
+    boolean finished = ConcurrencyUtils.executeReportAwareWithSLA(
+        executorService, r, reporter, 200, 3000);
+
+    Assert.assertTrue(finished);
+  }
+
+  /**
+   * Run a task that fails to report status but completes within the deadline.
+   */
+  @Test
+  public void testNoProgress() throws InterruptedException {
+    final Reporter reporter = new Reporter();
+
+    Runnable r = new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          logger.debug("Someone interrupted us (expected!)");
+        }
+      }
+    };
+
+    boolean finished = ConcurrencyUtils.executeReportAwareWithSLA(
+        executorService, r, reporter, 200, 3000);
+
+    Assert.assertFalse(finished);
+  }
+
+  /**
+   * Run a task that reports status but exceeds the deadline.
+   */
+  @Test
+  public void testDeadline() throws InterruptedException {
+    final Reporter reporter = new Reporter();
+
+    Runnable r = new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; i < 35; i++) {
+            Thread.sleep(100);
+            reporter.progress();
+          }
+        } catch (InterruptedException e) {
+          logger.debug("Someone interrupted us (expected!)");
+        }
+      }
+    };
+
+    boolean finished = ConcurrencyUtils.executeReportAwareWithSLA(
+        executorService, r, reporter, 200, 3000);
+
+    Assert.assertFalse(finished);
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,38 @@
+package org.apache.flume.core.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.core.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestEventBuilder {
+
+  @Test
+  public void testBody() {
+    Event<String> e1 = EventBuilder.withBody("e1");
+    Assert.assertNotNull(e1);
+    Assert.assertEquals("body is correct", "e1", e1.getBody());
+
+    Event<Long> e2 = EventBuilder.withBody(2L);
+    Assert.assertNotNull(e2);
+    Assert.assertEquals("body is correct", Long.valueOf(2L), e2.getBody());
+  }
+
+  @Test
+  public void testHeaders() {
+    Map<String, String> headers = new HashMap<String, String>();
+
+    headers.put("one", "1");
+    headers.put("two", "2");
+
+    Event<?> e1 = EventBuilder.withBody("e1", headers);
+
+    Assert.assertNotNull(e1);
+    Assert.assertEquals("e1 has the proper body", "e1", e1.getBody());
+    Assert.assertEquals("e1 has the proper headers", 2, e1.getHeaders().size());
+    Assert.assertEquals("e1 has a one key", "1", e1.getHeaders().get("one"));
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,115 @@
+package org.apache.flume.lifecycle;
+
+import junit.framework.Assert;
+
+import org.apache.flume.core.Context;
+import org.junit.Test;
+
+public class TestLifecycleController {
+
+  @Test
+  public void testWaitForState() throws LifecycleException,
+      InterruptedException {
+
+    Context context = new Context();
+    LifecycleAware delegate = new SleeperLifecycleDelegate();
+
+    Assert.assertTrue(delegate.getLifecycleState().equals(LifecycleState.IDLE));
+
+    delegate.start(context);
+
+    boolean reached = LifecycleController.waitForState(delegate,
+        LifecycleState.START, 2000);
+
+    Assert.assertEquals(true, reached);
+    Assert.assertEquals(LifecycleState.START, delegate.getLifecycleState());
+
+    delegate.stop(context);
+
+    reached = LifecycleController.waitForState(delegate, LifecycleState.STOP,
+        2000);
+
+    Assert.assertEquals(true, reached);
+    Assert.assertEquals(LifecycleState.STOP, delegate.getLifecycleState());
+
+    delegate.start(context);
+
+    reached = LifecycleController.waitForState(delegate, LifecycleState.IDLE,
+        500);
+
+    Assert.assertEquals(false, reached);
+    Assert.assertEquals(LifecycleState.START, delegate.getLifecycleState());
+
+  }
+
+  @Test
+  public void testWaitForOneOf() throws LifecycleException,
+      InterruptedException {
+
+    Context context = new Context();
+    LifecycleAware delegate = new SleeperLifecycleDelegate();
+
+    Assert.assertEquals(LifecycleState.IDLE, delegate.getLifecycleState());
+
+    delegate.start(context);
+
+    boolean reached = LifecycleController.waitForOneOf(delegate,
+        new LifecycleState[] { LifecycleState.STOP, LifecycleState.START },
+        2000);
+
+    Assert.assertTrue("Matched a state change", reached);
+    Assert.assertEquals(LifecycleState.START, delegate.getLifecycleState());
+  }
+
+  public static class SleeperLifecycleDelegate implements LifecycleAware {
+
+    private long sleepTime;
+    private LifecycleState state;
+
+    public SleeperLifecycleDelegate() {
+      sleepTime = 0;
+      state = LifecycleState.IDLE;
+    }
+
+    @Override
+    public void start(Context context) throws LifecycleException {
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        throw new LifecycleException(e);
+      }
+
+      state = LifecycleState.START;
+    }
+
+    @Override
+    public void stop(Context context) throws LifecycleException {
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        throw new LifecycleException(e);
+      }
+
+      state = LifecycleState.STOP;
+    }
+
+    @Override
+    public LifecycleState getLifecycleState() {
+      return state;
+    }
+
+    @Override
+    public void transitionTo(LifecycleState state) {
+    }
+
+    public long getSleepTime() {
+      return sleepTime;
+    }
+
+    public void setSleepTime(long sleepTime) {
+      this.sleepTime = sleepTime;
+    }
+
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,37 @@
+package org.apache.flume.sink;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.core.util.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLoggerSink {
+
+  private LoggerSink sink;
+
+  @Before
+  public void setUp() {
+    sink = new LoggerSink();
+  }
+
+  /**
+   * Lack of exception test.
+   */
+  @Test
+  public void testAppend() throws InterruptedException, LifecycleException,
+      MessageDeliveryException {
+
+    Context context = new Context();
+
+    sink.open(context);
+
+    for (int i = 0; i < 10; i++) {
+      sink.append(context, EventBuilder.withBody("Test " + i));
+    }
+
+    sink.close(context);
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,39 @@
+package org.apache.flume.source;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.EventSource;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSequenceGeneratorSource {
+
+  private EventSource source;
+
+  @Before
+  public void setUp() {
+    source = new SequenceGeneratorSource();
+  }
+
+  @Test
+  public void testNext() throws InterruptedException, LifecycleException,
+      MessageDeliveryException {
+
+    Context context = new Context();
+
+    source.open(context);
+
+    for (long i = 0; i < 100; i++) {
+      Event<?> next = source.next(context);
+      long value = (Long) next.getBody();
+
+      Assert.assertEquals(i, value);
+    }
+
+    source.close(context);
+  }
+
+}