You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:34 UTC
svn commit: r1156893 - in
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core:
ChannelDriver.java ChannelDriverThread.java
Author: esammer
Date: Fri Aug 12 00:47:33 2011
New Revision: 1156893
URL: http://svn.apache.org/viewvc?rev=1156893&view=rev
Log:
- Moved ChannelDriverThread out of ChannelDriver.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriverThread.java
Removed:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriverThread.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriverThread.java?rev=1156893&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriverThread.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriverThread.java Fri Aug 12 00:47:33 2011
@@ -0,0 +1,215 @@
+package org.apache.flume.core;
+
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * The thread that does the actual work of opening, closing, and running the
+ * shuffle loop (i.e. the loop that takes {@link Event}s from the
+ * {@link EventSource} and gives them to the {@link EventSink}.
+ * </p>
+ * <p>
+ * Because this class extends {@link Thread} it doesn't fully implement the
+ * {@link LifecycleAware} interface. That said, it does use
+ * {@link LifecycleState} (or at least some of the states from it) to indicate
+ * its current status. This is kind of gross and should be revisited. Ideally,
+ * this would become a Runnable so we could decouple thread lifetime from
+ * instances.
+ * </p>
+ * <p>
+ * This class is not meant for any users other than {@link ChannelDriver}
+ * itself.
+ * </p>
+ */
+public class ChannelDriverThread extends Thread {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(ChannelDriverThread.class);
+
+ private EventSource source;
+ private EventSink sink;
+ private Context context;
+
+ volatile private LifecycleState lifecycleState;
+ volatile private Exception lastException;
+
+ private long totalEvents;
+ private long discardedEvents;
+ private long nullEvents;
+ private long successfulEvents;
+
+ volatile private boolean shouldStop;
+
+ public ChannelDriverThread(String name) {
+ super(name);
+
+ totalEvents = 0;
+ discardedEvents = 0;
+ nullEvents = 0;
+ successfulEvents = 0;
+
+ lifecycleState = LifecycleState.IDLE;
+ shouldStop = false;
+ }
+
+ @Override
+ public void run() {
+ logger.debug("Channel driver thread running");
+
+ Preconditions.checkState(source != null, "Source can not be null");
+ Preconditions.checkState(sink != null, "Sink can not be null");
+
+ lifecycleState = LifecycleState.START;
+
+ /*
+ * Developer note: We purposefully separate source and sink open and close
+ * try / catch blocks so we can provide slightly better error messaging and
+ * recovery. Please resist the urge to combine them. The ordering of sink
+ * open, source open, source close, sink close is deliberate as well.
+ */
+
+ try {
+ sink.open(context);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while opening sink. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ shouldStop = true;
+ return;
+ } catch (LifecycleException e) {
+ logger.error("Failed to open sink. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ shouldStop = true;
+ return;
+ }
+
+ try {
+ source.open(context);
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while opening source. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ shouldStop = true;
+
+ /* FIXME: This is gross. Factor this out. */
+ try {
+ sink.close(context);
+ } catch (InterruptedException e1) {
+ logger
+ .error("Interrupted while trying to close the sink (because we were interrupted while trying to open the source)");
+ } catch (LifecycleException e1) {
+ logger
+ .error(
+ "While cleaning up after \"{}\" failed to close the sink down - {}",
+ e.getMessage(), e.toString());
+ }
+
+ return;
+ } catch (LifecycleException e) {
+ logger.error("Failed to open source. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ shouldStop = true;
+
+ try {
+ sink.close(context);
+ } catch (InterruptedException e1) {
+ logger
+ .error("Interrupted while trying to close the sink (because we were cleaning up from a lifecycle exception from opening the source.)");
+ } catch (LifecycleException e1) {
+ logger
+ .error(
+ "While cleaning up after \"{}\" failed to close the sink down - {}",
+ e.getMessage(), e.toString());
+ }
+
+ return;
+ }
+
+ while (!shouldStop) {
+ Event<?> event = null;
+
+ try {
+ event = source.next(context);
+
+ if (event != null) {
+ sink.append(context, event);
+ successfulEvents++;
+ } else {
+ nullEvents++;
+ }
+ } catch (InterruptedException e) {
+ logger.debug("Received an interrupt while moving events - stopping");
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ shouldStop = true;
+ } catch (EventDeliveryException e) {
+ logger.debug("Unable to deliver event:{} (may be null) - Reason:{}",
+ event, e.getMessage());
+ discardedEvents++;
+ /* FIXME: Handle dead messages. */
+ }
+
+ totalEvents++;
+ }
+
+ try {
+ source.close(context);
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while closing source. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ } catch (LifecycleException e) {
+ logger.error("Failed to close source. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ }
+
+ try {
+ sink.close(context);
+ } catch (InterruptedException e) {
+ logger.debug("Interrupted while closing sink. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ } catch (LifecycleException e) {
+ logger.error("Failed to close sink. Exception follows.", e);
+ lastException = e;
+ lifecycleState = LifecycleState.ERROR;
+ }
+
+ logger.debug("Channel driver thread exiting with state:{}", lifecycleState);
+ logger
+ .info(
+ "Logical node ended. Event metrics - totalEvents:{} successfulEvents:{} nullEvents:{} discardedEvents:{}",
+ new Object[] { totalEvents, successfulEvents, nullEvents,
+ discardedEvents });
+ }
+
+ public void setSource(EventSource source) {
+ this.source = source;
+ }
+
+ public void setSink(EventSink sink) {
+ this.sink = sink;
+ }
+
+ public void setShouldStop(boolean shouldStop) {
+ this.shouldStop = shouldStop;
+ }
+
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ public Exception getLastException() {
+ return lastException;
+ }
+
+}
\ No newline at end of file