You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:19 UTC

svn commit: r1156888 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java

Author: esammer
Date: Fri Aug 12 00:47:18 2011
New Revision: 1156888

URL: http://svn.apache.org/viewvc?rev=1156888&view=rev
Log:
- Refactored to use ChannelDriverThread directly. Not sure this is good, but I wanted to try it.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java?rev=1156888&r1=1156887&r2=1156888&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java Fri Aug 12 00:47:18 2011
@@ -1,7 +1,7 @@
 package org.apache.flume.core;
 
+import org.apache.flume.core.ChannelDriver.ChannelDriverThread;
 import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
@@ -25,13 +25,13 @@ import com.google.common.base.Preconditi
  * <p>
  * LogicalNode implements the {@link LifecycleAware} interface. Both the
  * {@link #start(Context)} and {@link #stop(Context)} methods are asynchronous
- * but do block while starting / stopping the underlying infrastructure
- * (i.e. ChannelDriver). Both methods may also be interrupted. In the case of
- * start, an interruption will force the logical node to attempt to clean up
- * resources which may involve stopping the source and sink (which can in turn
- * block). An interrupt to stop will, in turn, interrupt the underlying
- * thread(s). In both cases though, the logical node will continue to block on
- * cleanup to prevent nasty issues with subsequent restarts.
+ * but do block while starting / stopping the underlying infrastructure (i.e.
+ * ChannelDriver). Both methods may also be interrupted. In the case of start,
+ * an interruption will force the logical node to attempt to clean up resources
+ * which may involve stopping the source and sink (which can in turn block). An
+ * interrupt to stop will, in turn, interrupt the underlying thread(s). In both
+ * cases though, the logical node will continue to block on cleanup to prevent
+ * nasty issues with subsequent restarts.
  * </p>
  * <p>
  * Example usage:
@@ -60,7 +60,7 @@ public class LogicalNode implements Life
   private EventSource source;
   private EventSink sink;
 
-  private ChannelDriver driver;
+  private ChannelDriverThread driver;
 
   private LifecycleState lifecycleState;
 
@@ -79,57 +79,94 @@ public class LogicalNode implements Life
         "Logical node source can not be null");
     Preconditions.checkState(sink != null, "Logical node sink can not be null");
 
-    driver = new ChannelDriver(name + "-channelDriver");
+    driver = new ChannelDriverThread(name + "-channelDriver");
 
     driver.setSource(source);
     driver.setSink(sink);
 
-    boolean reached = false;
+    driver.start();
 
-    try {
-      driver.start(context);
+    while (!driver.getLifecycleState().equals(LifecycleState.START)
+        && !driver.getLifecycleState().equals(LifecycleState.ERROR)) {
 
-      reached = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
-          LifecycleState.START, LifecycleState.ERROR });
-    } catch (InterruptedException e) {
-      logger
-          .error("Interrupted while attempting to start the logical node driver. Stopping it.");
-      driver.stop(context);
-      lifecycleState = LifecycleState.ERROR;
-      throw e;
-    }
+      logger.debug("Waiting for driver to start");
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        logger
+            .error("Interrupted while waiting for driver to start. Shutting down.");
+        lifecycleState = LifecycleState.ERROR;
+
+        driver.setShouldStop(true);
+
+        /*
+         * We refuse to return with outstanding resources so we go into a cycle
+         * where we interrupt the driver thread and block on it. If we're
+         * interrupted again while waiting for it, we warn, but we still refuse
+         * to give up on it. Sorry, caller; we don't know what the source or
+         * sink is doing so we have to give them time!
+         */
+        while (driver.isAlive()) {
+          logger.debug("Interrupting driver");
+
+          driver.interrupt();
+
+          logger.debug("Waiting for driver to stop");
+          try {
+            driver.join();
+          } catch (InterruptedException e1) {
+            logger
+                .warn(
+                    "Interrupted while waiting for driver to stop. This almost certainly means something awful is happening in the source or sink. Report this error. Interrupting it again!",
+                    e1);
+          }
+        }
 
-    if (reached) {
-      lifecycleState = driver.getLifecycleState();
+        return;
+      }
     }
+
+    lifecycleState = driver.getLifecycleState();
   }
 
   @Override
-  public void stop(Context context) throws LifecycleException, InterruptedException {
+  public void stop(Context context) throws LifecycleException,
+      InterruptedException {
+
     logger.info("Stopping logical node:{}", this);
 
-    boolean reached = false;
+    driver.setShouldStop(true);
 
-    try {
-      driver.stop(context);
+    while (driver.isAlive()) {
+      logger.debug("Waiting for driver to stop");
 
-      reached = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
-          LifecycleState.STOP, LifecycleState.ERROR });
-    } catch (InterruptedException e) {
-      logger.error("Interrupted while waiting for the driver to stop.");
-      lifecycleState = LifecycleState.ERROR;
-      throw e;
+      try {
+        driver.join();
+      } catch (InterruptedException e) {
+        logger
+            .error("Interrupted while waiting for driver to stop. Interrupting it.");
+        lifecycleState = LifecycleState.ERROR;
+
+        /*
+         * We refuse to return with outstanding resources so we go into a cycle
+         * where we interrupt the driver thread and block on it. If we're
+         * interrupted again while waiting for it, we warn, but we still refuse
+         * to give up on it. Sorry, caller; we don't know what the source or
+         * sink is doing so we have to give them time!
+         */
+        logger.debug("Interrupting driver");
+        driver.interrupt();
+      }
     }
 
-    if (!reached) {
-      logger
-          .error(
-              "There's a good chance the source or sink aren't shutting down. This will lead to problems. Contact the developers! Trace:{}",
-              Thread.currentThread().getStackTrace());
+    /*
+     * If we're already in an error state, preserve that, otherwise stop
+     * successfully.
+     */
+    if (!lifecycleState.equals(LifecycleState.ERROR)) {
+      lifecycleState = LifecycleState.STOP;
     }
-
-    /* Our state is the channel driver's state. */
-    lifecycleState = driver.getLifecycleState();
   }
 
   @Override