You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:20 UTC
svn commit: r1156867 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
Author: esammer
Date: Fri Aug 12 00:46:20 2011
New Revision: 1156867
URL: http://svn.apache.org/viewvc?rev=1156867&view=rev
Log:
- ChannelDriverThread now properly opens / closes sources / sinks.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156867&r1=1156866&r2=1156867&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:46:20 2011
@@ -122,6 +122,17 @@ public class ChannelDriver implements Li
Preconditions.checkState(source != null, "Source can not be null");
Preconditions.checkState(sink != null, "Sink can not be null");
+ try {
+ sink.open(context);
+ source.open(context);
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while opening source / sink.", e);
+ shouldStop = true;
+ } catch (LifecycleException e) {
+ logger.error("Failed to open source / sink. Exception follows.", e);
+ shouldStop = true;
+ }
+
while (!shouldStop) {
Event<?> event = null;
@@ -146,6 +157,17 @@ public class ChannelDriver implements Li
totalEvents++;
}
+ try {
+ source.close(context);
+ sink.close(context);
+ } catch (InterruptedException e) {
+ logger.debug(
+ "Interrupted while closing source / sink. Just going to continue.",
+ e);
+ } catch (LifecycleException e) {
+ logger.error("Failed to open source / sink. Exception follows.", e);
+ }
+
logger.debug("Channel driver thread exiting cleanly");
logger
.info(