You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/16 01:00:42 UTC

svn commit: r1158050 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java

Author: esammer
Date: Mon Aug 15 23:00:41 2011
New Revision: 1158050

URL: http://svn.apache.org/viewvc?rev=1158050&view=rev
Log:
- Centralized failure transition logic.
- ChannelDriverThread now uses CounterGroup rather than a collection of longs.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java?rev=1158050&r1=1158049&r2=1158050&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java Mon Aug 15 23:00:41 2011
@@ -39,20 +39,15 @@ public class ChannelDriverThread extends
   volatile private LifecycleState lifecycleState;
   volatile private Exception lastException;
 
-  private long totalEvents;
-  private long discardedEvents;
-  private long nullEvents;
-  private long successfulEvents;
+  private CounterGroup counterGroup;
 
   volatile private boolean shouldStop;
 
   public ChannelDriverThread(String name) {
     super(name);
 
-    totalEvents = 0;
-    discardedEvents = 0;
-    nullEvents = 0;
-    successfulEvents = 0;
+    counterGroup = new CounterGroup();
+    counterGroup.setName(name);
 
     lifecycleState = LifecycleState.IDLE;
     shouldStop = false;
@@ -78,15 +73,11 @@ public class ChannelDriverThread extends
       sink.open(context);
     } catch (InterruptedException e) {
       logger.error("Interrupted while opening sink. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
-      shouldStop = true;
+      fail(e, true);
       return;
     } catch (LifecycleException e) {
       logger.error("Failed to open sink. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
-      shouldStop = true;
+      fail(e, true);
       return;
     }
 
@@ -94,9 +85,7 @@ public class ChannelDriverThread extends
       source.open(context);
     } catch (InterruptedException e) {
       logger.debug("Interrupted while opening source. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
-      shouldStop = true;
+      fail(e, true);
 
       /* FIXME: This is gross. Factor this out. */
       try {
@@ -114,9 +103,7 @@ public class ChannelDriverThread extends
       return;
     } catch (LifecycleException e) {
       logger.error("Failed to open source. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
-      shouldStop = true;
+      fail(e, true);
 
       try {
         sink.close(context);
@@ -141,55 +128,52 @@ public class ChannelDriverThread extends
 
         if (event != null) {
           sink.append(context, event);
-          successfulEvents++;
+          counterGroup.incrementAndGet("driver.events.successful");
         } else {
-          nullEvents++;
+          counterGroup.incrementAndGet("driver.events.null");
         }
       } catch (InterruptedException e) {
         logger.debug("Received an interrupt while moving events - stopping");
-        lastException = e;
-        lifecycleState = LifecycleState.ERROR;
-        shouldStop = true;
+        fail(e, true);
       } catch (EventDeliveryException e) {
         logger.debug("Unable to deliver event:{} (may be null) - Reason:{}",
             event, e.getMessage());
-        discardedEvents++;
+        counterGroup.incrementAndGet("driver.events.rejected");
         /* FIXME: Handle dead messages. */
       }
 
-      totalEvents++;
+      counterGroup.incrementAndGet("driver.events.total");
     }
 
     try {
       source.close(context);
     } catch (InterruptedException e) {
       logger.debug("Interrupted while closing source. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
+      fail(e, false);
     } catch (LifecycleException e) {
       logger.error("Failed to close source. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
+      fail(e, false);
     }
 
     try {
       sink.close(context);
     } catch (InterruptedException e) {
       logger.debug("Interrupted while closing sink. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
+      fail(e, false);
     } catch (LifecycleException e) {
       logger.error("Failed to close sink. Exception follows.", e);
-      lastException = e;
-      lifecycleState = LifecycleState.ERROR;
+      fail(e, false);
     }
 
     logger.debug("Channel driver thread exiting with state:{}", lifecycleState);
-    logger
-        .info(
-            "Logical node ended. Event metrics - totalEvents:{} successfulEvents:{} nullEvents:{} discardedEvents:{}",
-            new Object[] { totalEvents, successfulEvents, nullEvents,
-                discardedEvents });
+    logger.info("Logical node ended. Event metrics - {}", counterGroup);
+  }
+
+  public void fail(Exception lastException, boolean shouldStop) {
+    this.lastException = lastException;
+    this.shouldStop = shouldStop;
+
+    lifecycleState = LifecycleState.ERROR;
   }
 
   public void setSource(EventSource source) {