You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:04 UTC
svn commit: r1156882 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
Author: esammer
Date: Fri Aug 12 00:47:03 2011
New Revision: 1156882
URL: http://svn.apache.org/viewvc?rev=1156882&view=rev
Log:
- ChannelDriver now properly exposes any exceptions that occur while running. - Added javadoc including blocking and interrupt semantics.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156882&r1=1156881&r2=1156882&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:47:03 2011
@@ -8,6 +8,22 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+/**
+ * <p>
+ * A channel driver is responsible for managing the core lifecycle of sources
+ * and sinks as well as the shuffle loop.
+ * </p>
+ * <p>
+ * The intended usage is to supply a channel driver with a name - normally
+ * containing the name of the LogicalNode to which it belongs - a source, and a
+ * sink, and ask it to run. It implements {@link LifecycleAware} which is the
+ * only interface for execution.
+ * </p>
+ * <p>
+ * There's no good reason for anything other than a LogicalNode using this
+ * class. This is an internal system.
+ * </p>
+ */
public class ChannelDriver implements LifecycleAware {
private static final Logger logger = LoggerFactory
@@ -19,13 +35,27 @@ public class ChannelDriver implements Li
private ChannelDriverThread driverThread;
private LifecycleState lifecycleState;
+ private Exception lastException;
public ChannelDriver(String name) {
+ Preconditions.checkNotNull(name);
+
this.name = name;
lifecycleState = LifecycleState.IDLE;
}
+ /**
+ * <p>
+ * Start the channel driver.
+ * </p>
+ * <p>
+ * This method blocks on starting the source and sink. Should it receive an
+ * interruption, it attempts to safely close down the source and sink (or the
+ * shuffle loop if it's already running) and may continue to block for a short
+ * time after interruption because of this.
+ * </p>
+ */
@Override
public void start(Context context) throws LifecycleException,
InterruptedException {
@@ -68,6 +98,22 @@ public class ChannelDriver implements Li
lifecycleState = LifecycleState.START;
}
+ /**
+ * <p>
+ * Stop the channel driver and the underlying source and sink. Halts the
+ * shuffle loop if it's running.
+ * </p>
+ * <p>
+ * If there were any errors during open, close, or the shuffle loop, the
+ * lifecycle state will be set to {code ERROR} and last exception <b>MAY</b>
+ * be set.
+ * </p>
+ * <p>
+ * This method blocks on shutting down the source and sink. Upon interruption,
+ * it interrupts its underlying resources but still waits for everything to
+ * exit cleanly.
+ * </p>
+ */
@Override
public void stop(Context context) throws LifecycleException {
logger.debug("Channel driver stopping:{}", this);
@@ -98,6 +144,7 @@ public class ChannelDriver implements Li
if (!driverThreadResult.equals(LifecycleState.START)
|| driverThread.getLastException() != null) {
lifecycleState = LifecycleState.ERROR;
+ lastException = driverThread.getLastException();
} else {
lifecycleState = LifecycleState.STOP;
}
@@ -129,6 +176,29 @@ public class ChannelDriver implements Li
this.sink = sink;
}
+ public Exception getLastException() {
+ return lastException;
+ }
+
+ /**
+ * <p>
+ * The thread that does the actual work of opening, closing, and running the
+ * shuffle loop (i.e. the loop that takes {@link Event}s from the
+ * {@link EventSource} and gives them to the {@link EventSink}.
+ * </p>
+ * <p>
+ * Because this class extends {@link Thread} it doesn't fully implement the
+ * {@link LifecycleAware} interface. That said, it does use
+ * {@link LifecycleState} (or at least some of the states from it) to indicate
+ * its current status. This is kind of gross and should be revisited. Ideally,
+ * this would become a Runnable so we could decouple thread lifetime from
+ * instances.
+ * </p>
+ * <p>
+ * This class is not meant for any users other than {@link ChannelDriver}
+ * itself.
+ * </p>
+ */
private static class ChannelDriverThread extends Thread {
private EventSource source;