You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/16 01:00:42 UTC
svn commit: r1158050 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
Author: esammer
Date: Mon Aug 15 23:00:41 2011
New Revision: 1158050
URL: http://svn.apache.org/viewvc?rev=1158050&view=rev
Log:
- Centralized failure transition logic.
- ChannelDriverThread now uses CounterGroup rather than a collection of longs.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java?rev=1158050&r1=1158049&r2=1158050&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelDriverThread.java Mon Aug 15 23:00:41 2011
@@ -39,20 +39,15 @@ public class ChannelDriverThread extends
volatile private LifecycleState lifecycleState;
volatile private Exception lastException;
- private long totalEvents;
- private long discardedEvents;
- private long nullEvents;
- private long successfulEvents;
+ private CounterGroup counterGroup;
volatile private boolean shouldStop;
public ChannelDriverThread(String name) {
super(name);
- totalEvents = 0;
- discardedEvents = 0;
- nullEvents = 0;
- successfulEvents = 0;
+ counterGroup = new CounterGroup();
+ counterGroup.setName(name);
lifecycleState = LifecycleState.IDLE;
shouldStop = false;
@@ -78,15 +73,11 @@ public class ChannelDriverThread extends
sink.open(context);
} catch (InterruptedException e) {
logger.error("Interrupted while opening sink. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
- shouldStop = true;
+ fail(e, true);
return;
} catch (LifecycleException e) {
logger.error("Failed to open sink. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
- shouldStop = true;
+ fail(e, true);
return;
}
@@ -94,9 +85,7 @@ public class ChannelDriverThread extends
source.open(context);
} catch (InterruptedException e) {
logger.debug("Interrupted while opening source. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
- shouldStop = true;
+ fail(e, true);
/* FIXME: This is gross. Factor this out. */
try {
@@ -114,9 +103,7 @@ public class ChannelDriverThread extends
return;
} catch (LifecycleException e) {
logger.error("Failed to open source. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
- shouldStop = true;
+ fail(e, true);
try {
sink.close(context);
@@ -141,55 +128,52 @@ public class ChannelDriverThread extends
if (event != null) {
sink.append(context, event);
- successfulEvents++;
+ counterGroup.incrementAndGet("driver.events.successful");
} else {
- nullEvents++;
+ counterGroup.incrementAndGet("driver.events.null");
}
} catch (InterruptedException e) {
logger.debug("Received an interrupt while moving events - stopping");
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
- shouldStop = true;
+ fail(e, true);
} catch (EventDeliveryException e) {
logger.debug("Unable to deliver event:{} (may be null) - Reason:{}",
event, e.getMessage());
- discardedEvents++;
+ counterGroup.incrementAndGet("driver.events.rejected");
/* FIXME: Handle dead messages. */
}
- totalEvents++;
+ counterGroup.incrementAndGet("driver.events.total");
}
try {
source.close(context);
} catch (InterruptedException e) {
logger.debug("Interrupted while closing source. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
+ fail(e, false);
} catch (LifecycleException e) {
logger.error("Failed to close source. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
+ fail(e, false);
}
try {
sink.close(context);
} catch (InterruptedException e) {
logger.debug("Interrupted while closing sink. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
+ fail(e, false);
} catch (LifecycleException e) {
logger.error("Failed to close sink. Exception follows.", e);
- lastException = e;
- lifecycleState = LifecycleState.ERROR;
+ fail(e, false);
}
logger.debug("Channel driver thread exiting with state:{}", lifecycleState);
- logger
- .info(
- "Logical node ended. Event metrics - totalEvents:{} successfulEvents:{} nullEvents:{} discardedEvents:{}",
- new Object[] { totalEvents, successfulEvents, nullEvents,
- discardedEvents });
+ logger.info("Logical node ended. Event metrics - {}", counterGroup);
+ }
+
+ public void fail(Exception lastException, boolean shouldStop) {
+ this.lastException = lastException;
+ this.shouldStop = shouldStop;
+
+ lifecycleState = LifecycleState.ERROR;
}
public void setSource(EventSource source) {