You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:12 UTC
svn commit: r1156865 - in /incubator/flume/branches/flume-728/flume-ng-core:
./ src/main/java/org/apache/flume/core/
src/main/java/org/apache/flume/core/util/
src/main/java/org/apache/flume/lifecycle/
src/main/java/org/apache/flume/sink/ src/main/java/...
Author: esammer
Date: Fri Aug 12 00:46:11 2011
New Revision: 1156865
URL: http://svn.apache.org/viewvc?rev=1156865&view=rev
Log:
- Added a channel driver to move events from source to sink. - Context now provides a Reporter for indicating progress. - Event{Sink,Source} interfaces now throw LifecycleException, InterruptedException, and MessageDeliveryException where appropriate. - Added LogicalNode - handles the guts of tying sources to sinks and managing ChannelDriver lifecycle. - Added MessageDeliveryException - thrown by sources and sinks to indicate the message is dead. - Reporter instances now start with the current time for the last report rather than zero. Creation of the Reporter indicates status of some kind. - Added ConcurrencyUtils with a method that attempts to guarantee a Runnable happens within a given timeout as well as reports progress via the Reporter even N millis. Super buggy right now. - Fixed a bug in EventBuilder where we forgot to set the event body. Oops. - Added the LifecycleAware interface. Classes can participate in generic lifecycle events by implementing this interface. Dang
er: this will change at least once more soon. - Added LifecycleController which includes methods for waiting on specific lifecycle states within a given timeout on any LifecycleAware instance. - Added LifecycleException. Thrown during LifecycleAware methods to indicate a failure during start / stop. - Added LifecycleState. Indicates what state a LifecycleAware instance is currently in. - Updated AbstractEvent{Sink,Source} to conform to the new Event{Sink,Source} interfaces. - Added a LoggerSink implementation that simply passes each event to the logging system (e.g. log4j). Useful for testing and debugging. - Added NullSink - /dev/null for Flume. Does absolutely nothing with a message. Useful for testing and debugging. - Added SequenceGeneratorSource. A source that generates monotonically incrementing integers. - Added unit tests for Context. - Added unit tests for LogicalNode. - Removed a test of Reporter that was no longer true (that timeStamp started at zero). - Added (fl
akey) unit tests for ConcurrencyUtils. - Added unit tests for EventBuilder. - Added unit tests for LifecycleController. - Added unit tests for LoggerSink. - Added unit tests for SequenceGeneratorSource.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
Modified:
incubator/flume/branches/flume-728/flume-ng-core/pom.xml
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/pom.xml?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/pom.xml (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/pom.xml Fri Aug 12 00:46:11 2011
@@ -22,6 +22,11 @@
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,171 @@
+package org.apache.flume.core;
+
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class ChannelDriver implements LifecycleAware {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(ChannelDriver.class);
+
+ private String name;
+ private EventSource source;
+ private EventSink sink;
+ private ChannelDriverThread driverThread;
+
+ private LifecycleState lifecycleState;
+
+ public ChannelDriver(String name) {
+ this.name = name;
+
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException {
+ logger.debug("Channel driver starting:{}", this);
+
+ driverThread = new ChannelDriverThread(name);
+
+ driverThread.setSource(source);
+ driverThread.setSink(sink);
+
+ driverThread.start();
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException {
+ logger.debug("Channel driver stopping:{}", this);
+
+ driverThread.setShouldStop(true);
+
+ while (driverThread.isAlive()) {
+ try {
+ logger.debug("Waiting for driver to stop");
+
+ driverThread.join(1000);
+ } catch (InterruptedException e) {
+ logger
+ .debug("Interrupted while waiting for driver thread to shutdown. Interrupting it and stopping.");
+ driverThread.interrupt();
+ }
+ }
+
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public void transitionTo(LifecycleState state) {
+ }
+
+ @Override
+ public String toString() {
+ return "{ source:" + source + " sink:" + sink + " }";
+ }
+
+ public EventSource getSource() {
+ return source;
+ }
+
+ public void setSource(EventSource source) {
+ this.source = source;
+ }
+
+ public EventSink getSink() {
+ return sink;
+ }
+
+ public void setSink(EventSink sink) {
+ this.sink = sink;
+ }
+
+ private static class ChannelDriverThread extends Thread {
+
+ private EventSource source;
+ private EventSink sink;
+ private Context context;
+
+ private long totalEvents;
+ private long discardedEvents;
+ private long nullEvents;
+ private long successfulEvents;
+
+ volatile private boolean shouldStop;
+
+ public ChannelDriverThread(String name) {
+ super(name);
+
+ totalEvents = 0;
+ discardedEvents = 0;
+ nullEvents = 0;
+ successfulEvents = 0;
+
+ shouldStop = false;
+ }
+
+ @Override
+ public void run() {
+ logger.debug("Channel driver thread running");
+
+ Preconditions.checkState(source != null, "Source can not be null");
+ Preconditions.checkState(sink != null, "Sink can not be null");
+
+ while (!shouldStop) {
+ Event<?> event = null;
+
+ try {
+ event = source.next(context);
+
+ if (event != null) {
+ sink.append(context, event);
+ successfulEvents++;
+ } else {
+ nullEvents++;
+ }
+ } catch (InterruptedException e) {
+ logger.debug("Received an interrupt while moving events - stopping");
+ shouldStop = true;
+ } catch (MessageDeliveryException e) {
+ logger.debug("Unable to deliver event:{} (may be null)", event);
+ discardedEvents++;
+ /* FIXME: Handle dead messages. */
+ }
+
+ totalEvents++;
+ }
+
+ logger.debug("Channel driver thread exiting cleanly");
+ logger
+ .info(
+ "Event metrics - totalEvents:{} successfulEvents:{} nullEvents:{} discardedEvents:{}",
+ new Object[] { totalEvents, successfulEvents, nullEvents,
+ discardedEvents });
+ }
+
+ public void setSource(EventSource source) {
+ this.source = source;
+ }
+
+ public void setSink(EventSink sink) {
+ this.sink = sink;
+ }
+
+ public void setShouldStop(boolean shouldStop) {
+ this.shouldStop = shouldStop;
+ }
+
+ }
+
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Context.java Fri Aug 12 00:46:11 2011
@@ -6,9 +6,11 @@ import java.util.Map;
public class Context {
private Map<String, Object> parameters;
+ private Reporter reporter;
public Context() {
parameters = new HashMap<String, Object>();
+ reporter = new Reporter();
}
public void put(String key, Object value) {
@@ -25,7 +27,7 @@ public class Context {
@Override
public String toString() {
- return "{ parameters:" + parameters + " }";
+ return "{ parameters:" + parameters + " reporter:" + reporter + " }";
}
public Map<String, Object> getParameters() {
@@ -36,4 +38,12 @@ public class Context {
this.parameters = parameters;
}
+ public Reporter getReporter() {
+ return reporter;
+ }
+
+ public void setReporter(Reporter reporter) {
+ this.reporter = reporter;
+ }
+
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSink.java Fri Aug 12 00:46:11 2011
@@ -1,11 +1,16 @@
package org.apache.flume.core;
+import org.apache.flume.lifecycle.LifecycleException;
+
public interface EventSink {
- public void open(Context context);
+ public void open(Context context) throws InterruptedException,
+ LifecycleException;
- public void append(Context context, Event<?> event);
+ public void append(Context context, Event<?> event)
+ throws InterruptedException, MessageDeliveryException;
- public void close(Context context);
+ public void close(Context context) throws InterruptedException,
+ LifecycleException;
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/EventSource.java Fri Aug 12 00:46:11 2011
@@ -1,11 +1,16 @@
package org.apache.flume.core;
+import org.apache.flume.lifecycle.LifecycleException;
+
public interface EventSource {
- public void open(Context context);
+ public void open(Context context) throws InterruptedException,
+ LifecycleException;
- public Event<?> next(Context context);
+ public Event<?> next(Context context) throws InterruptedException,
+ MessageDeliveryException;
- public void close(Context context);
+ public void close(Context context) throws InterruptedException,
+ LifecycleException;
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,114 @@
+package org.apache.flume.core;
+
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class LogicalNode implements LifecycleAware {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(LogicalNode.class);
+
+ private String name;
+ private EventSource source;
+ private EventSink sink;
+
+ private ChannelDriver driver;
+
+ private LifecycleState lifecycleState;
+
+ public LogicalNode() {
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException {
+ logger.info("Starting logical node:{}", this);
+
+ Preconditions.checkState(name != null, "Logical node name can not be null");
+ Preconditions.checkState(source != null,
+ "Logical node source can not be null");
+ Preconditions.checkState(sink != null, "Logical node sink can not be null");
+
+ driver = new ChannelDriver(name + "-channelDriver");
+
+ driver.setSource(source);
+ driver.setSink(sink);
+
+ driver.start(context);
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException {
+ logger.info("Stopping logical node:{}", this);
+
+ boolean complete = false;
+
+ driver.stop(context);
+
+ try {
+ complete = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+ LifecycleState.STOP, LifecycleState.ERROR }, 10000);
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while waiting for the driver to stop.");
+ complete = false;
+ }
+
+ if (!complete) {
+ logger
+ .error(
+ "There's a good chance the source or sink aren't shutting down. This will lead to problems. Contact the developers! Trace:{}",
+ Thread.currentThread().getStackTrace());
+ }
+
+ /* Our state is the channel driver's state. */
+ lifecycleState = driver.getLifecycleState();
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public void transitionTo(LifecycleState state) {
+ }
+
+ @Override
+ public String toString() {
+ return "{ name:" + name + " source:" + source + " sink:" + sink
+ + " lifecycleState:" + lifecycleState + " }";
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public EventSource getSource() {
+ return source;
+ }
+
+ public void setSource(EventSource source) {
+ this.source = source;
+ }
+
+ public EventSink getSink() {
+ return sink;
+ }
+
+ public void setSink(EventSink sink) {
+ this.sink = sink;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/MessageDeliveryException.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.core;
+
+public class MessageDeliveryException extends Exception {
+
+ private static final long serialVersionUID = 1102327497549834945L;
+
+ public MessageDeliveryException() {
+ super();
+ }
+
+ public MessageDeliveryException(String message) {
+ super(message);
+ }
+
+ public MessageDeliveryException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public MessageDeliveryException(Throwable t) {
+ super(t);
+ }
+
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/Reporter.java Fri Aug 12 00:46:11 2011
@@ -10,7 +10,7 @@ public class Reporter {
private long timeStamp;
public Reporter() {
- timeStamp = 0;
+ timeStamp = System.currentTimeMillis();
}
public synchronized long progress() {
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/ConcurrencyUtils.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,105 @@
+package org.apache.flume.core.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.flume.core.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConcurrencyUtils {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(ConcurrencyUtils.class);
+
+ /**
+ * <p>
+ * Run the provided {@link Runnable} using the {@code executorService}. If it
+ * takes longer than {@code timeLimit} or does not regularly report status
+ * using the supplied {@link Reporter}, cancel it.
+ * </p>
+ * <p>
+ * This is meant for operations that may take time but have an SLA. It was
+ * originally created for ops like source / sink opens or next / append calls
+ * that may block (at least externally). For these cases, we want them to
+ * block but we need to know they're still alive.
+ * </p>
+ *
+ * @param executorService
+ * @param runnable
+ * @param reporter
+ * @param timeLimit
+ * (or zero for no time limit)
+ * @return true if the task finished within the timeLimit and with acceptable
+ * reporting, false otherwise.
+ * @throws InterruptedException
+ */
+ public static boolean executeReportAwareWithSLA(
+ ExecutorService executorService, Runnable runnable, Reporter reporter,
+ long reportInterval, long timeLimit) throws InterruptedException {
+
+ boolean success = false;
+
+ Future<?> future = executorService.submit(runnable);
+
+ long currentTime = System.currentTimeMillis();
+ long deadLine = timeLimit > 0 ? currentTime + timeLimit : 0;
+
+ logger.debug("Need to complete {} by {}. Must see progress every {}ms",
+ new Object[] { future, deadLine, reportInterval });
+
+ while (true) {
+ currentTime = System.currentTimeMillis();
+ long lastReport = reporter.getTimeStamp();
+ long timeSinceReport = currentTime - lastReport;
+
+ logger
+ .debug(
+ "Checking status of {} - lastReport:{} currentTime:{} timeSinceReport:{}",
+ new Object[] { future, lastReport, currentTime, timeSinceReport });
+
+ if (deadLine > 0 && currentTime > deadLine) {
+ logger.debug("Cancelling {} - exceeded deadLine:{}", future, deadLine);
+
+ future.cancel(true);
+ break;
+ } else if (timeSinceReport > reportInterval) {
+ logger.debug("Cancelling {} - no status in {}ms", future,
+ timeSinceReport);
+
+ future.cancel(true);
+ break;
+ } else if (future.isDone()) {
+ logger.debug("Task {} has finished", future);
+
+ success = true;
+ break;
+ } else {
+
+ /*
+ * If there's a deadline, pick the smaller of the next deadline or the
+ * next progress time, otherwise just take the next report time.
+ *
+ * Then, take the smaller of the above - now (minus 50ms of slop) or
+ * 500ms. Oh, but don't let it be < 0. Math is fun.
+ *
+ * I'm convinced this isn't right, but I'm moving on. Kick me later.
+ */
+ long smallestRuleLength = timeLimit > 0 ? Math.min(currentTime
+ + deadLine, currentTime + reportInterval) : currentTime
+ + reportInterval;
+ long sleepLength = Math.min(500,
+ Math.max(smallestRuleLength - currentTime - 50, 0));
+
+ logger
+ .debug(
+ "Still waiting for {} to complete - timeSinceReport:{} deadLine:{} sleeping:{}",
+ new Object[] { future, timeSinceReport, deadLine, sleepLength });
+
+ Thread.sleep(sleepLength);
+ }
+ }
+
+ return success;
+ }
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/util/EventBuilder.java Fri Aug 12 00:46:11 2011
@@ -11,6 +11,8 @@ public class EventBuilder {
public static <T> Event<T> withBody(T body) {
Event<T> event = new SimpleEvent<T>();
+ event.setBody(body);
+
return event;
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,15 @@
+package org.apache.flume.lifecycle;
+
+import org.apache.flume.core.Context;
+
+public interface LifecycleAware {
+
+ public void start(Context context) throws LifecycleException;
+
+ public void stop(Context context) throws LifecycleException;
+
+ public LifecycleState getLifecycleState();
+
+ public void transitionTo(LifecycleState state);
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleController.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,67 @@
+package org.apache.flume.lifecycle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LifecycleController {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(LifecycleController.class);
+ private static final long shortestSleepDuration = 50;
+ private static final int maxNumberOfChecks = 5;
+
+ /*
+ * public static boolean waitForState(LifecycleAware delegate, LifecycleState
+ * state, long timeout) throws InterruptedException {
+ *
+ * logger.debug("Waiting for state {} for delegate:{} up to {}ms", new
+ * Object[] { state, delegate, timeout });
+ *
+ * long sleepInterval = Math.max(shortestSleepDuration, timeout /
+ * maxNumberOfChecks); long deadLine = System.currentTimeMillis() + timeout;
+ *
+ * do { if (delegate.getLifecycleState().equals(state)) { return true; }
+ *
+ * logger.debug("Still want state:{} sleeping:{}ms", state, sleepInterval);
+ * Thread.sleep(sleepInterval); } while (System.currentTimeMillis() <
+ * deadLine);
+ *
+ * logger.debug("Didn't see state within timeout {}ms", timeout);
+ *
+ * return false; }
+ */
+
+ public static boolean waitForState(LifecycleAware delegate,
+ LifecycleState state, long timeout) throws InterruptedException {
+
+ return waitForOneOf(delegate, new LifecycleState[] { state }, timeout);
+ }
+
+ public static boolean waitForOneOf(LifecycleAware delegate,
+ LifecycleState[] states, long timeout) throws InterruptedException {
+
+ logger.debug("Waiting for state {} for delegate:{} up to {}ms",
+ new Object[] { states, delegate, timeout });
+
+ long sleepInterval = Math.max(shortestSleepDuration, timeout
+ / maxNumberOfChecks);
+ long deadLine = System.currentTimeMillis() + timeout;
+
+ do {
+ for (LifecycleState state : states) {
+ if (delegate.getLifecycleState().equals(state)) {
+ return true;
+ }
+ }
+
+ logger.debug("Still want one of states:{} sleeping:{}ms", states,
+ sleepInterval);
+ Thread.sleep(sleepInterval);
+ } while (System.currentTimeMillis() < deadLine);
+
+ logger.debug("Didn't see state within timeout {}ms", timeout);
+
+ return false;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleException.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.lifecycle;
+
+public class LifecycleException extends Exception {
+
+ private static final long serialVersionUID = 4689000562519155240L;
+
+ public LifecycleException() {
+ super();
+ }
+
+ public LifecycleException(String message) {
+ super(message);
+ }
+
+ public LifecycleException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public LifecycleException(Throwable t) {
+ super(t);
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleState.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,5 @@
+package org.apache.flume.lifecycle;
+
+public enum LifecycleState {
+ IDLE, START, STOP, ERROR;
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractEventSink.java Fri Aug 12 00:46:11 2011
@@ -3,19 +3,24 @@ package org.apache.flume.sink;
import org.apache.flume.core.Context;
import org.apache.flume.core.Event;
import org.apache.flume.core.EventSink;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.lifecycle.LifecycleException;
abstract public class AbstractEventSink implements EventSink {
@Override
- public void open(Context context) {
+ public void open(Context context) throws InterruptedException,
+ LifecycleException {
// Empty implementation by default.
}
@Override
- abstract public void append(Context context, Event<?> event);
+ abstract public void append(Context context, Event<?> event)
+ throws InterruptedException, MessageDeliveryException;
@Override
- public void close(Context context) {
+ public void close(Context context) throws InterruptedException,
+ LifecycleException {
// Empty implementation by default.
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,21 @@
+package org.apache.flume.sink;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.MessageDeliveryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggerSink extends AbstractEventSink {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestLoggerSink.class);
+
+ @Override
+ public void append(Context context, Event<?> event)
+ throws InterruptedException, MessageDeliveryException {
+
+ logger.info("event:{}", event);
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/NullSink.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,17 @@
+package org.apache.flume.sink;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.MessageDeliveryException;
+
+public class NullSink extends AbstractEventSink {
+
+ @Override
+ public void append(Context context, Event<?> event)
+ throws InterruptedException, MessageDeliveryException {
+
+ /* We purposefully do absolutely nothing. */
+
+ }
+
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventSource.java Fri Aug 12 00:46:11 2011
@@ -3,19 +3,21 @@ package org.apache.flume.source;
import org.apache.flume.core.Context;
import org.apache.flume.core.Event;
import org.apache.flume.core.EventSource;
+import org.apache.flume.core.MessageDeliveryException;
abstract public class AbstractEventSource implements EventSource {
@Override
- public void open(Context context) {
+ public void open(Context context) throws InterruptedException {
// Empty implementation by default.
}
@Override
- abstract public Event<?> next(Context context);
+ abstract public Event<?> next(Context context) throws InterruptedException,
+ MessageDeliveryException;
@Override
- public void close(Context context) {
+ public void close(Context context) throws InterruptedException {
// Empty implementation by default.
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,23 @@
+package org.apache.flume.source;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.core.SimpleEvent;
+
+public class SequenceGeneratorSource extends AbstractEventSource {
+
+ private long sequence;
+
+ @Override
+ public Event<?> next(Context context) throws InterruptedException,
+ MessageDeliveryException {
+
+ Event<Long> event = new SimpleEvent<Long>();
+
+ event.setBody(sequence++);
+
+ return event;
+ }
+
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestContext.java Fri Aug 12 00:46:11 2011
@@ -15,8 +15,7 @@ public class TestContext {
@Test
public void testPutGet() {
- Assert.assertEquals("Context is empty", 0, context.getParameters()
- .size());
+ Assert.assertEquals("Context is empty", 0, context.getParameters().size());
context.put("test", "test");
@@ -24,4 +23,9 @@ public class TestContext {
context.get("test", String.class));
}
+ @Test
+ public void testReporter() {
+ Assert.assertNotNull(context.getReporter());
+ }
+
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestLogicalNode.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,112 @@
+package org.apache.flume.core;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.sink.LoggerSink;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestLogicalNode {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestLogicalNode.class);
+
+ private LogicalNode node;
+
+ @Before
+ public void setUp() {
+ node = new LogicalNode();
+
+ node.setName("test-node-n1");
+ node.setSource(new SequenceGeneratorSource());
+ node.setSink(new LoggerSink());
+ }
+
+ @Test
+ public void testLifecycle() throws LifecycleException, InterruptedException {
+ Context context = new Context();
+
+ node.start(context);
+ boolean reached = LifecycleController.waitForOneOf(node,
+ new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR },
+ 5000);
+
+ Assert.assertTrue("Matched a lifecycle state", reached);
+ Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+ node.stop(context);
+ reached = LifecycleController.waitForOneOf(node, new LifecycleState[] {
+ LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+ Assert.assertTrue("Matched a lifecycle state", reached);
+ Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+ }
+
+ @Test
+ public void testMultipleNodes() throws LifecycleException,
+ InterruptedException {
+
+ final AtomicInteger successfulThread = new AtomicInteger(0);
+ final CountDownLatch finishedThreads = new CountDownLatch(10);
+
+ for (int i = 0; i < 10; i++) {
+ final int j = i;
+
+ new Thread("test-node-runner-" + i) {
+
+ @Override
+ public void run() {
+ Context context = new Context();
+ LogicalNode node = new LogicalNode();
+
+ node.setName("test-node-" + j);
+ node.setSource(new SequenceGeneratorSource());
+ node.setSink(new NullSink());
+
+ try {
+ node.start(context);
+
+ boolean reached = LifecycleController.waitForOneOf(node,
+ new LifecycleState[] { LifecycleState.START,
+ LifecycleState.ERROR }, 5000);
+
+ Assert.assertTrue("Matched a lifecycle state", reached);
+ Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+ Thread.sleep(500);
+
+ node.stop(context);
+ reached = LifecycleController.waitForOneOf(node,
+ new LifecycleState[] { LifecycleState.STOP,
+ LifecycleState.ERROR }, 5000);
+
+ Assert.assertTrue("Matched a lifecycle state", reached);
+ Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+
+ successfulThread.incrementAndGet();
+ } catch (LifecycleException e) {
+ logger.debug("Exception follows", e);
+ } catch (InterruptedException e) {
+ logger.debug("Exception follows", e);
+ }
+
+ finishedThreads.countDown();
+ }
+ }.start();
+
+ }
+
+ finishedThreads.await();
+
+ Assert.assertEquals(10, successfulThread.get());
+ }
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java?rev=1156865&r1=1156864&r2=1156865&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestReporter.java Fri Aug 12 00:46:11 2011
@@ -15,7 +15,6 @@ public class TestReporter {
@Test
public void testProgress() {
- Assert.assertEquals("Timestamp starts at zero", 0, reporter.getTimeStamp());
Assert.assertTrue("Timestamp > 0 after progress", reporter.progress() > 0);
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestConcurrencyUtils.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,112 @@
+package org.apache.flume.core.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.flume.core.Reporter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestConcurrencyUtils {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestConcurrencyUtils.class);
+
+ private ExecutorService executorService;
+
+ @Before
+ public void setUp() {
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void tearDown() {
+ executorService.shutdownNow();
+ }
+
+ /**
+ * Run a normal task that reports progress AND finishes within the deadline.
+ */
+ @Test
+ public void testNormalRun() throws InterruptedException {
+ final Reporter reporter = new Reporter();
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 3; i++) {
+ reporter.progress();
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException e) {
+ logger.error("Unexcepted interruption of thread.", e);
+ }
+ }
+ };
+
+ boolean finished = ConcurrencyUtils.executeReportAwareWithSLA(
+ executorService, r, reporter, 200, 3000);
+
+ Assert.assertTrue(finished);
+ }
+
+ /**
+ * Run a task that fails to report status but completes within the deadline.
+ */
+ @Test
+ public void testNoProgress() throws InterruptedException {
+ final Reporter reporter = new Reporter();
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.debug("Someone interrupted us (expected!)");
+ }
+ }
+ };
+
+ boolean finished = ConcurrencyUtils.executeReportAwareWithSLA(
+ executorService, r, reporter, 200, 3000);
+
+ Assert.assertFalse(finished);
+ }
+
+ /**
+ * Run a task that reports status but exceeds the deadline.
+ */
+ @Test
+ public void testDeadline() throws InterruptedException {
+ final Reporter reporter = new Reporter();
+
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 35; i++) {
+ Thread.sleep(100);
+ reporter.progress();
+ }
+ } catch (InterruptedException e) {
+ logger.debug("Someone interrupted us (expected!)");
+ }
+ }
+ };
+
+ boolean finished = ConcurrencyUtils.executeReportAwareWithSLA(
+ executorService, r, reporter, 200, 3000);
+
+ Assert.assertFalse(finished);
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/util/TestEventBuilder.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,38 @@
+package org.apache.flume.core.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.core.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestEventBuilder {
+
+ @Test
+ public void testBody() {
+ Event<String> e1 = EventBuilder.withBody("e1");
+ Assert.assertNotNull(e1);
+ Assert.assertEquals("body is correct", "e1", e1.getBody());
+
+ Event<Long> e2 = EventBuilder.withBody(2L);
+ Assert.assertNotNull(e2);
+ Assert.assertEquals("body is correct", Long.valueOf(2L), e2.getBody());
+ }
+
+ @Test
+ public void testHeaders() {
+ Map<String, String> headers = new HashMap<String, String>();
+
+ headers.put("one", "1");
+ headers.put("two", "2");
+
+ Event<?> e1 = EventBuilder.withBody("e1", headers);
+
+ Assert.assertNotNull(e1);
+ Assert.assertEquals("e1 has the proper body", "e1", e1.getBody());
+ Assert.assertEquals("e1 has the proper headers", 2, e1.getHeaders().size());
+ Assert.assertEquals("e1 has a one key", "1", e1.getHeaders().get("one"));
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/lifecycle/TestLifecycleController.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,115 @@
+package org.apache.flume.lifecycle;
+
+import junit.framework.Assert;
+
+import org.apache.flume.core.Context;
+import org.junit.Test;
+
+public class TestLifecycleController {
+
+ @Test
+ public void testWaitForState() throws LifecycleException,
+ InterruptedException {
+
+ Context context = new Context();
+ LifecycleAware delegate = new SleeperLifecycleDelegate();
+
+ Assert.assertTrue(delegate.getLifecycleState().equals(LifecycleState.IDLE));
+
+ delegate.start(context);
+
+ boolean reached = LifecycleController.waitForState(delegate,
+ LifecycleState.START, 2000);
+
+ Assert.assertEquals(true, reached);
+ Assert.assertEquals(LifecycleState.START, delegate.getLifecycleState());
+
+ delegate.stop(context);
+
+ reached = LifecycleController.waitForState(delegate, LifecycleState.STOP,
+ 2000);
+
+ Assert.assertEquals(true, reached);
+ Assert.assertEquals(LifecycleState.STOP, delegate.getLifecycleState());
+
+ delegate.start(context);
+
+ reached = LifecycleController.waitForState(delegate, LifecycleState.IDLE,
+ 500);
+
+ Assert.assertEquals(false, reached);
+ Assert.assertEquals(LifecycleState.START, delegate.getLifecycleState());
+
+ }
+
+ @Test
+ public void testWaitForOneOf() throws LifecycleException,
+ InterruptedException {
+
+ Context context = new Context();
+ LifecycleAware delegate = new SleeperLifecycleDelegate();
+
+ Assert.assertEquals(LifecycleState.IDLE, delegate.getLifecycleState());
+
+ delegate.start(context);
+
+ boolean reached = LifecycleController.waitForOneOf(delegate,
+ new LifecycleState[] { LifecycleState.STOP, LifecycleState.START },
+ 2000);
+
+ Assert.assertTrue("Matched a state change", reached);
+ Assert.assertEquals(LifecycleState.START, delegate.getLifecycleState());
+ }
+
+ public static class SleeperLifecycleDelegate implements LifecycleAware {
+
+ private long sleepTime;
+ private LifecycleState state;
+
+ public SleeperLifecycleDelegate() {
+ sleepTime = 0;
+ state = LifecycleState.IDLE;
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw new LifecycleException(e);
+ }
+
+ state = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw new LifecycleException(e);
+ }
+
+ state = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return state;
+ }
+
+ @Override
+ public void transitionTo(LifecycleState state) {
+ }
+
+ public long getSleepTime() {
+ return sleepTime;
+ }
+
+ public void setSleepTime(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,37 @@
+package org.apache.flume.sink;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.core.util.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLoggerSink {
+
+ private LoggerSink sink;
+
+ @Before
+ public void setUp() {
+ sink = new LoggerSink();
+ }
+
+ /**
+ * Lack of exception test.
+ */
+ @Test
+ public void testAppend() throws InterruptedException, LifecycleException,
+ MessageDeliveryException {
+
+ Context context = new Context();
+
+ sink.open(context);
+
+ for (int i = 0; i < 10; i++) {
+ sink.append(context, EventBuilder.withBody("Test " + i));
+ }
+
+ sink.close(context);
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1156865&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Fri Aug 12 00:46:11 2011
@@ -0,0 +1,39 @@
+package org.apache.flume.source;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.Event;
+import org.apache.flume.core.EventSource;
+import org.apache.flume.core.MessageDeliveryException;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSequenceGeneratorSource {
+
+ private EventSource source;
+
+ @Before
+ public void setUp() {
+ source = new SequenceGeneratorSource();
+ }
+
+ @Test
+ public void testNext() throws InterruptedException, LifecycleException,
+ MessageDeliveryException {
+
+ Context context = new Context();
+
+ source.open(context);
+
+ for (long i = 0; i < 100; i++) {
+ Event<?> next = source.next(context);
+ long value = (Long) next.getBody();
+
+ Assert.assertEquals(i, value);
+ }
+
+ source.close(context);
+ }
+
+}