You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:04 UTC

svn commit: r1156882 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java

Author: esammer
Date: Fri Aug 12 00:47:03 2011
New Revision: 1156882

URL: http://svn.apache.org/viewvc?rev=1156882&view=rev
Log:
- ChannelDriver now properly exposes any exceptions that occur while running. - Added javadoc including blocking and interrupt semantics.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156882&r1=1156881&r2=1156882&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:47:03 2011
@@ -8,6 +8,22 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * <p>
+ * A channel driver is responsible for managing the core lifecycle of sources
+ * and sinks as well as the shuffle loop.
+ * </p>
+ * <p>
+ * The intended usage is to supply a channel driver with a name - normally
+ * containing the name of the LogicalNode to which it belongs - a source, and a
+ * sink, and ask it to run. It implements {@link LifecycleAware} which is the
+ * only interface for execution.
+ * </p>
+ * <p>
+ * There's no good reason for anything other than a LogicalNode using this
+ * class. This is an internal system.
+ * </p>
+ */
 public class ChannelDriver implements LifecycleAware {
 
   private static final Logger logger = LoggerFactory
@@ -19,13 +35,27 @@ public class ChannelDriver implements Li
   private ChannelDriverThread driverThread;
 
   private LifecycleState lifecycleState;
+  private Exception lastException;
 
   public ChannelDriver(String name) {
+    Preconditions.checkNotNull(name);
+
     this.name = name;
 
     lifecycleState = LifecycleState.IDLE;
   }
 
+  /**
+   * <p>
+   * Start the channel driver.
+   * </p>
+   * <p>
+   * This method blocks on starting the source and sink. Should it receive an
+   * interruption, it attempts to safely close down the source and sink (or the
+   * shuffle loop if it's already running) and may continue to block for a short
+   * time after interruption because of this.
+   * </p>
+   */
   @Override
   public void start(Context context) throws LifecycleException,
       InterruptedException {
@@ -68,6 +98,22 @@ public class ChannelDriver implements Li
     lifecycleState = LifecycleState.START;
   }
 
+  /**
+   * <p>
+   * Stop the channel driver and the underlying source and sink. Halts the
+   * shuffle loop if it's running.
+   * </p>
+   * <p>
+   * If there were any errors during open, close, or the shuffle loop, the
+   * lifecycle state will be set to {code ERROR} and last exception <b>MAY</b>
+   * be set.
+   * </p>
+   * <p>
+   * This method blocks on shutting down the source and sink. Upon interruption,
+   * it interrupts its underlying resources but still waits for everything to
+   * exit cleanly.
+   * </p>
+   */
   @Override
   public void stop(Context context) throws LifecycleException {
     logger.debug("Channel driver stopping:{}", this);
@@ -98,6 +144,7 @@ public class ChannelDriver implements Li
     if (!driverThreadResult.equals(LifecycleState.START)
         || driverThread.getLastException() != null) {
       lifecycleState = LifecycleState.ERROR;
+      lastException = driverThread.getLastException();
     } else {
       lifecycleState = LifecycleState.STOP;
     }
@@ -129,6 +176,29 @@ public class ChannelDriver implements Li
     this.sink = sink;
   }
 
+  public Exception getLastException() {
+    return lastException;
+  }
+
+  /**
+   * <p>
+   * The thread that does the actual work of opening, closing, and running the
+   * shuffle loop (i.e. the loop that takes {@link Event}s from the
+   * {@link EventSource} and gives them to the {@link EventSink}.
+   * </p>
+   * <p>
+   * Because this class extends {@link Thread} it doesn't fully implement the
+   * {@link LifecycleAware} interface. That said, it does use
+   * {@link LifecycleState} (or at least some of the states from it) to indicate
+   * its current status. This is kind of gross and should be revisited. Ideally,
+   * this would become a Runnable so we could decouple thread lifetime from
+   * instances.
+   * </p>
+   * <p>
+   * This class is not meant for any users other than {@link ChannelDriver}
+   * itself.
+   * </p>
+   */
   private static class ChannelDriverThread extends Thread {
 
     private EventSource source;