You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:14 UTC

svn commit: r1156886 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java

Author: esammer
Date: Fri Aug 12 00:47:14 2011
New Revision: 1156886

URL: http://svn.apache.org/viewvc?rev=1156886&view=rev
Log:
- Fixed a bug where we didn't handle interrupts well in start / stop of LogicalNode. Better, but not totally there yet.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java?rev=1156886&r1=1156885&r2=1156886&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java Fri Aug 12 00:47:14 2011
@@ -9,6 +9,48 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * <p>
+ * A complete Flume node - a source, sink pair that can be run.
+ * </p>
+ * <p>
+ * A logical node in Flume is the user-configurable unit of execution. Users
+ * provide a name, source, and sink and the logical node handles the lifecycle
+ * as well as moving data from the source to the sink. The latter is referred to
+ * as
+ * <q>the shuffle loop.</q> Internally, the logical node contains a
+ * {@link ChannelDriver} which does the heavy lifting of lifecycle events (i.e.
+ * {@link #start(Context)} and {@link #stop(Context)}).
+ * </p>
+ * <p>
+ * LogicalNode implements the {@link LifecycleAware} interface. Both the
+ * {@link #start(Context)} and {@link #stop(Context)} methods are asynchronous
+ * but do block while starting / stopping the underlying infrastructure
+ * (i.e. ChannelDriver). Both methods may also be interrupted. In the case of
+ * start, an interruption will force the logical node to attempt to clean up
+ * resources which may involve stopping the source and sink (which can in turn
+ * block). An interrupt to stop will, in turn, interrupt the underlying
+ * thread(s). In both cases though, the logical node will continue to block on
+ * cleanup to prevent nasty issues with subsequent restarts.
+ * </p>
+ * <p>
+ * Example usage:
+ * </p>
+ * <code>
+ *  LogicalNode node = new LogicalNode();
+ *  Context context = new Context();
+ * 
+ *  node.setName("sequence-generating-logger");
+ *  node.setSource(new SequenceGeneratorSource());
+ *  node.setSink(new LoggerSink());
+ * 
+ *  node.start(context);
+ * 
+ *  ...do other stuff.
+ * 
+ *  node.stop(context);
+ * </code>
+ */
 public class LogicalNode implements LifecycleAware {
 
   private static final Logger logger = LoggerFactory
@@ -42,28 +84,44 @@ public class LogicalNode implements Life
     driver.setSource(source);
     driver.setSink(sink);
 
-    driver.start(context);
+    boolean reached = false;
 
-    lifecycleState = LifecycleState.START;
+    try {
+      driver.start(context);
+
+      reached = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+          LifecycleState.START, LifecycleState.ERROR });
+    } catch (InterruptedException e) {
+      logger
+          .error("Interrupted while attempting to start the logical node driver. Stopping it.");
+      driver.stop(context);
+      lifecycleState = LifecycleState.ERROR;
+      throw e;
+    }
+
+    if (reached) {
+      lifecycleState = driver.getLifecycleState();
+    }
   }
 
   @Override
-  public void stop(Context context) throws LifecycleException {
+  public void stop(Context context) throws LifecycleException, InterruptedException {
     logger.info("Stopping logical node:{}", this);
 
-    boolean complete = false;
-
-    driver.stop(context);
+    boolean reached = false;
 
     try {
-      complete = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+      driver.stop(context);
+
+      reached = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
           LifecycleState.STOP, LifecycleState.ERROR });
     } catch (InterruptedException e) {
-      logger.debug("Interrupted while waiting for the driver to stop.");
-      complete = false;
+      logger.error("Interrupted while waiting for the driver to stop.");
+      lifecycleState = LifecycleState.ERROR;
+      throw e;
     }
 
-    if (!complete) {
+    if (!reached) {
       logger
           .error(
               "There's a good chance the source or sink aren't shutting down. This will lead to problems. Contact the developers! Trace:{}",