You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:27 UTC

svn commit: r1156869 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java

Author: esammer
Date: Fri Aug 12 00:46:26 2011
New Revision: 1156869

URL: http://svn.apache.org/viewvc?rev=1156869&view=rev
Log:
- ChannelDriverThread now properly detects (separate) source and sink failures, records the exception, and exits cleanly to the ChannelDriver (its parent). - Added a big fat warning about my ugly repurposing of LifecycleState for something that isn't LifecycleAware. I'm gross.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156869&r1=1156868&r2=1156869&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:46:26 2011
@@ -35,9 +35,9 @@ public class ChannelDriver implements Li
     driverThread.setSource(source);
     driverThread.setSink(sink);
 
-    driverThread.start();
-
     lifecycleState = LifecycleState.START;
+
+    driverThread.start();
   }
 
   @Override
@@ -53,12 +53,26 @@ public class ChannelDriver implements Li
         driverThread.join(1000);
       } catch (InterruptedException e) {
         logger
-            .debug("Interrupted while waiting for driver thread to shutdown. Interrupting it and stopping.");
+            .debug("Interrupted while waiting for driver thread to shutdown. Interrupting it.");
         driverThread.interrupt();
       }
     }
 
-    lifecycleState = LifecycleState.STOP;
+    /*
+     * FIXME: We repurpose LifecycleState for the driver thread, but we don't
+     * actually use all phases of the lifecycle because we don't have a hook to
+     * know when the thread has successfully stopped. This means we treat a
+     * START state to mean successful exit and ERROR to mean something bad.
+     * You've been warned.
+     */
+    LifecycleState driverThreadResult = driverThread.getLifecycleState();
+
+    if (!driverThreadResult.equals(LifecycleState.START)
+        || driverThread.getLastException() != null) {
+      lifecycleState = LifecycleState.ERROR;
+    } else {
+      lifecycleState = LifecycleState.STOP;
+    }
   }
 
   @Override
@@ -97,6 +111,9 @@ public class ChannelDriver implements Li
     private EventSink sink;
     private Context context;
 
+    private LifecycleState lifecycleState;
+    private Exception lastException;
+
     private long totalEvents;
     private long discardedEvents;
     private long nullEvents;
@@ -112,6 +129,7 @@ public class ChannelDriver implements Li
       nullEvents = 0;
       successfulEvents = 0;
 
+      lifecycleState = LifecycleState.IDLE;
       shouldStop = false;
     }
 
@@ -122,15 +140,38 @@ public class ChannelDriver implements Li
       Preconditions.checkState(source != null, "Source can not be null");
       Preconditions.checkState(sink != null, "Sink can not be null");
 
+      lifecycleState = LifecycleState.START;
+
       try {
         sink.open(context);
-        source.open(context);
       } catch (InterruptedException e) {
         logger.debug("Interrupted while opening source / sink.", e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
         shouldStop = true;
+        return;
       } catch (LifecycleException e) {
         logger.error("Failed to open source / sink. Exception follows.", e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
+        shouldStop = true;
+        return;
+      }
+
+      try {
+        source.open(context);
+      } catch (InterruptedException e) {
+        logger.debug("Interrupted while opening source:{}", source, e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
+        shouldStop = true;
+        return;
+      } catch (LifecycleException e) {
+        logger.error("Failed to open source:{} Exception follows.", source, e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
         shouldStop = true;
+        return;
       }
 
       while (!shouldStop) {
@@ -147,6 +188,8 @@ public class ChannelDriver implements Li
           }
         } catch (InterruptedException e) {
           logger.debug("Received an interrupt while moving events - stopping");
+          lastException = e;
+          lifecycleState = LifecycleState.ERROR;
           shouldStop = true;
         } catch (MessageDeliveryException e) {
           logger.debug("Unable to deliver event:{} (may be null)", event);
@@ -159,19 +202,33 @@ public class ChannelDriver implements Li
 
       try {
         source.close(context);
+      } catch (InterruptedException e) {
+        logger.debug("Interrupted while closing source:{}.", source, e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
+      } catch (LifecycleException e) {
+        logger.error("Failed to close source:{} Exception follows.", source, e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
+      }
+
+      try {
         sink.close(context);
       } catch (InterruptedException e) {
-        logger.debug(
-            "Interrupted while closing source / sink. Just going to continue.",
-            e);
+        logger.debug("Interrupted while closing sink:{}.", sink, e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
       } catch (LifecycleException e) {
-        logger.error("Failed to open source / sink. Exception follows.", e);
+        logger.error("Failed to close sink:{} Exception follows.", sink, e);
+        lastException = e;
+        lifecycleState = LifecycleState.ERROR;
       }
 
-      logger.debug("Channel driver thread exiting cleanly");
+      logger.debug("Channel driver thread exiting with state:{}",
+          lifecycleState);
       logger
           .info(
-              "Event metrics - totalEvents:{} successfulEvents:{} nullEvents:{} discardedEvents:{}",
+              "Logical node ended. Event metrics - totalEvents:{} successfulEvents:{} nullEvents:{} discardedEvents:{}",
               new Object[] { totalEvents, successfulEvents, nullEvents,
                   discardedEvents });
     }
@@ -188,6 +245,14 @@ public class ChannelDriver implements Li
       this.shouldStop = shouldStop;
     }
 
+    public LifecycleState getLifecycleState() {
+      return lifecycleState;
+    }
+
+    public Exception getLastException() {
+      return lastException;
+    }
+
   }
 
 }