You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/17 03:00:43 UTC

svn commit: r1158484 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java

Author: esammer
Date: Wed Aug 17 01:00:43 2011
New Revision: 1158484

URL: http://svn.apache.org/viewvc?rev=1158484&view=rev
Log:
- Fixed a case where a ChannelDriverThread could die and go unnoticed until the
  LogicalNode was stopped. Added a monitor thread that updates LogicalNode state to ERROR if
  the ChannelDriver state becomes ERROR at any point.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java?rev=1158484&r1=1158483&r2=1158484&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java Wed Aug 17 01:00:43 2011
@@ -1,5 +1,9 @@
 package org.apache.flume;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -7,6 +11,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * <p>
@@ -60,8 +65,9 @@ public class LogicalNode implements Life
   private EventSink sink;
 
   private ChannelDriverThread driver;
+  private ScheduledExecutorService driverMonitorService;
 
-  private LifecycleState lifecycleState;
+  private volatile LifecycleState lifecycleState;
 
   public LogicalNode() {
     lifecycleState = LifecycleState.IDLE;
@@ -71,6 +77,8 @@ public class LogicalNode implements Life
   public void start(Context context) throws LifecycleException,
       InterruptedException {
 
+    lifecycleState = LifecycleState.IDLE;
+
     logger.info("Starting logical node:{}", this);
 
     Preconditions.checkState(name != null, "Logical node name can not be null");
@@ -79,6 +87,10 @@ public class LogicalNode implements Life
     Preconditions.checkState(sink != null, "Logical node sink can not be null");
 
     driver = new ChannelDriverThread("logicalNode-" + name + "-driver");
+    driverMonitorService = Executors.newScheduledThreadPool(
+        1,
+        new ThreadFactoryBuilder().setNameFormat(
+            "logicalNode-" + name + "-driverMonitor-%d").build());
 
     driver.setSource(source);
     driver.setSink(sink);
@@ -104,6 +116,18 @@ public class LogicalNode implements Life
     }
 
     lifecycleState = driver.getLifecycleState();
+
+    /* Once the driver is started, watch it for changes. */
+    driverMonitorService.scheduleAtFixedRate(new Runnable() {
+
+      @Override
+      public void run() {
+        if (driver.getLifecycleState().equals(LifecycleState.ERROR)) {
+          lifecycleState = LifecycleState.ERROR;
+        }
+      }
+
+    }, 0, 3, TimeUnit.SECONDS);
   }
 
   @Override
@@ -115,17 +139,32 @@ public class LogicalNode implements Life
 
     logger.info("Stopping logical node:{}", this);
 
-    driver.setShouldStop(true);
+    if (driver.getLifecycleState().equals(LifecycleState.START)) {
+      driver.setShouldStop(true);
 
-    while (driver.isAlive()) {
-      logger.debug("Waiting for driver to stop");
+      while (driver.isAlive()) {
+        logger.debug("Waiting for driver to stop");
 
-      /* If we're interrupted during a stop, we just fail. */
+        /* If we're interrupted during a stop, we just fail. */
+        try {
+          driver.join();
+        } catch (InterruptedException e) {
+          logger
+              .error("Interrupted while waiting for driver thread to stop", e);
+          lifecycleState = LifecycleState.ERROR;
+          break;
+        }
+      }
+    }
+
+    driverMonitorService.shutdown();
+
+    while (!driverMonitorService.isTerminated()) {
       try {
-        driver.join();
+        driverMonitorService.awaitTermination(1, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
-        logger.error("Interrupted while waiting for driver thread to stop", e);
-        lifecycleState = LifecycleState.ERROR;
+        logger
+            .debug("Interrupted while waiting for driver monitor service to shutdown - just exiting");
         break;
       }
     }
@@ -137,6 +176,8 @@ public class LogicalNode implements Life
     if (!lifecycleState.equals(LifecycleState.ERROR)) {
       lifecycleState = LifecycleState.STOP;
     }
+
+    logger.debug("Logical node {} stopped with state:{}", name, lifecycleState);
   }
 
   @Override