You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:19 UTC
svn commit: r1156888 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
Author: esammer
Date: Fri Aug 12 00:47:18 2011
New Revision: 1156888
URL: http://svn.apache.org/viewvc?rev=1156888&view=rev
Log:
- Refactored to use ChannelDriverThread directly. Not sure this is good, but I wanted to try it.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java?rev=1156888&r1=1156887&r2=1156888&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/LogicalNode.java Fri Aug 12 00:47:18 2011
@@ -1,7 +1,7 @@
package org.apache.flume.core;
+import org.apache.flume.core.ChannelDriver.ChannelDriverThread;
import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
@@ -25,13 +25,13 @@ import com.google.common.base.Preconditi
* <p>
* LogicalNode implements the {@link LifecycleAware} interface. Both the
* {@link #start(Context)} and {@link #stop(Context)} methods are asynchronous
- * but do block while starting / stopping the underlying infrastructure
- * (i.e. ChannelDriver). Both methods may also be interrupted. In the case of
- * start, an interruption will force the logical node to attempt to clean up
- * resources which may involve stopping the source and sink (which can in turn
- * block). An interrupt to stop will, in turn, interrupt the underlying
- * thread(s). In both cases though, the logical node will continue to block on
- * cleanup to prevent nasty issues with subsequent restarts.
+ * but do block while starting / stopping the underlying infrastructure (i.e.
+ * ChannelDriver). Both methods may also be interrupted. In the case of start,
+ * an interruption will force the logical node to attempt to clean up resources
+ * which may involve stopping the source and sink (which can in turn block). An
+ * interrupt to stop will, in turn, interrupt the underlying thread(s). In both
+ * cases though, the logical node will continue to block on cleanup to prevent
+ * nasty issues with subsequent restarts.
* </p>
* <p>
* Example usage:
@@ -60,7 +60,7 @@ public class LogicalNode implements Life
private EventSource source;
private EventSink sink;
- private ChannelDriver driver;
+ private ChannelDriverThread driver;
private LifecycleState lifecycleState;
@@ -79,57 +79,94 @@ public class LogicalNode implements Life
"Logical node source can not be null");
Preconditions.checkState(sink != null, "Logical node sink can not be null");
- driver = new ChannelDriver(name + "-channelDriver");
+ driver = new ChannelDriverThread(name + "-channelDriver");
driver.setSource(source);
driver.setSink(sink);
- boolean reached = false;
+ driver.start();
- try {
- driver.start(context);
+ while (!driver.getLifecycleState().equals(LifecycleState.START)
+ && !driver.getLifecycleState().equals(LifecycleState.ERROR)) {
- reached = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
- LifecycleState.START, LifecycleState.ERROR });
- } catch (InterruptedException e) {
- logger
- .error("Interrupted while attempting to start the logical node driver. Stopping it.");
- driver.stop(context);
- lifecycleState = LifecycleState.ERROR;
- throw e;
- }
+ logger.debug("Waiting for driver to start");
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger
+ .error("Interrupted while waiting for driver to start. Shutting down.");
+ lifecycleState = LifecycleState.ERROR;
+
+ driver.setShouldStop(true);
+
+ /*
+ * We refuse to return with outstanding resources so we go into a cycle
+ * where we interrupt the driver thread and block on it. If we're
+ * interrupted again while waiting for it, we warn, but we still refuse
+ * to give up on it. Sorry, caller; we don't know what the source or
+ * sink is doing so we have to give them time!
+ */
+ while (driver.isAlive()) {
+ logger.debug("Interrupting driver");
+
+ driver.interrupt();
+
+ logger.debug("Waiting for driver to stop");
+ try {
+ driver.join();
+ } catch (InterruptedException e1) {
+ logger
+ .warn(
+ "Interrupted while waiting for driver to stop. This almost certainly means something awful is happening in the source or sink. Report this error. Interrupting it again!",
+ e1);
+ }
+ }
- if (reached) {
- lifecycleState = driver.getLifecycleState();
+ return;
+ }
}
+
+ lifecycleState = driver.getLifecycleState();
}
@Override
- public void stop(Context context) throws LifecycleException, InterruptedException {
+ public void stop(Context context) throws LifecycleException,
+ InterruptedException {
+
logger.info("Stopping logical node:{}", this);
- boolean reached = false;
+ driver.setShouldStop(true);
- try {
- driver.stop(context);
+ while (driver.isAlive()) {
+ logger.debug("Waiting for driver to stop");
- reached = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
- LifecycleState.STOP, LifecycleState.ERROR });
- } catch (InterruptedException e) {
- logger.error("Interrupted while waiting for the driver to stop.");
- lifecycleState = LifecycleState.ERROR;
- throw e;
+ try {
+ driver.join();
+ } catch (InterruptedException e) {
+ logger
+ .error("Interrupted while waiting for driver to stop. Interrupting it.");
+ lifecycleState = LifecycleState.ERROR;
+
+ /*
+ * We refuse to return with outstanding resources so we go into a cycle
+ * where we interrupt the driver thread and block on it. If we're
+ * interrupted again while waiting for it, we warn, but we still refuse
+ * to give up on it. Sorry, caller; we don't know what the source or
+ * sink is doing so we have to give them time!
+ */
+ logger.debug("Interrupting driver");
+ driver.interrupt();
+ }
}
- if (!reached) {
- logger
- .error(
- "There's a good chance the source or sink aren't shutting down. This will lead to problems. Contact the developers! Trace:{}",
- Thread.currentThread().getStackTrace());
+ /*
+ * If we're already in an error state, preserve that, otherwise stop
+ * successfully.
+ */
+ if (!lifecycleState.equals(LifecycleState.ERROR)) {
+ lifecycleState = LifecycleState.STOP;
}
-
- /* Our state is the channel driver's state. */
- lifecycleState = driver.getLifecycleState();
}
@Override