You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:20 UTC

svn commit: r1156867 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java

Author: esammer
Date: Fri Aug 12 00:46:20 2011
New Revision: 1156867

URL: http://svn.apache.org/viewvc?rev=1156867&view=rev
Log:
- ChannelDriverThread now properly opens / closes sources / sinks.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156867&r1=1156866&r2=1156867&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:46:20 2011
@@ -122,6 +122,17 @@ public class ChannelDriver implements Li
       Preconditions.checkState(source != null, "Source can not be null");
       Preconditions.checkState(sink != null, "Sink can not be null");
 
+      try {
+        sink.open(context);
+        source.open(context);
+      } catch (InterruptedException e) {
+        logger.debug("Interrupted while opening source / sink.", e);
+        shouldStop = true;
+      } catch (LifecycleException e) {
+        logger.error("Failed to open source / sink. Exception follows.", e);
+        shouldStop = true;
+      }
+
       while (!shouldStop) {
         Event<?> event = null;
 
@@ -146,6 +157,17 @@ public class ChannelDriver implements Li
         totalEvents++;
       }
 
+      try {
+        source.close(context);
+        sink.close(context);
+      } catch (InterruptedException e) {
+        logger.debug(
+            "Interrupted while closing source / sink. Just going to continue.",
+            e);
+      } catch (LifecycleException e) {
+        logger.error("Failed to open source / sink. Exception follows.", e);
+      }
+
       logger.debug("Channel driver thread exiting cleanly");
       logger
           .info(