You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/17 03:00:43 UTC
svn commit: r1158484 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java
Author: esammer
Date: Wed Aug 17 01:00:43 2011
New Revision: 1158484
URL: http://svn.apache.org/viewvc?rev=1158484&view=rev
Log:
- Fixed a case where a ChannelDriverThread could die and go unnoticed until the
LogicalNode was stopped. Added a monitor thread that updates LogicalNode state to ERROR if
the ChannelDriver state becomes ERROR at any point.
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java?rev=1158484&r1=1158483&r2=1158484&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java Wed Aug 17 01:00:43 2011
@@ -1,5 +1,9 @@
package org.apache.flume;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
@@ -7,6 +11,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* <p>
@@ -60,8 +65,9 @@ public class LogicalNode implements Life
private EventSink sink;
private ChannelDriverThread driver;
+ private ScheduledExecutorService driverMonitorService;
- private LifecycleState lifecycleState;
+ private volatile LifecycleState lifecycleState;
public LogicalNode() {
lifecycleState = LifecycleState.IDLE;
@@ -71,6 +77,8 @@ public class LogicalNode implements Life
public void start(Context context) throws LifecycleException,
InterruptedException {
+ lifecycleState = LifecycleState.IDLE;
+
logger.info("Starting logical node:{}", this);
Preconditions.checkState(name != null, "Logical node name can not be null");
@@ -79,6 +87,10 @@ public class LogicalNode implements Life
Preconditions.checkState(sink != null, "Logical node sink can not be null");
driver = new ChannelDriverThread("logicalNode-" + name + "-driver");
+ driverMonitorService = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder().setNameFormat(
+ "logicalNode-" + name + "-driverMonitor-%d").build());
driver.setSource(source);
driver.setSink(sink);
@@ -104,6 +116,18 @@ public class LogicalNode implements Life
}
lifecycleState = driver.getLifecycleState();
+
+ /* Once the driver is started, watch it for changes. */
+ driverMonitorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ if (driver.getLifecycleState().equals(LifecycleState.ERROR)) {
+ lifecycleState = LifecycleState.ERROR;
+ }
+ }
+
+ }, 0, 3, TimeUnit.SECONDS);
}
@Override
@@ -115,17 +139,32 @@ public class LogicalNode implements Life
logger.info("Stopping logical node:{}", this);
- driver.setShouldStop(true);
+ if (driver.getLifecycleState().equals(LifecycleState.START)) {
+ driver.setShouldStop(true);
- while (driver.isAlive()) {
- logger.debug("Waiting for driver to stop");
+ while (driver.isAlive()) {
+ logger.debug("Waiting for driver to stop");
- /* If we're interrupted during a stop, we just fail. */
+ /* If we're interrupted during a stop, we just fail. */
+ try {
+ driver.join();
+ } catch (InterruptedException e) {
+ logger
+ .error("Interrupted while waiting for driver thread to stop", e);
+ lifecycleState = LifecycleState.ERROR;
+ break;
+ }
+ }
+ }
+
+ driverMonitorService.shutdown();
+
+ while (!driverMonitorService.isTerminated()) {
try {
- driver.join();
+ driverMonitorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.error("Interrupted while waiting for driver thread to stop", e);
- lifecycleState = LifecycleState.ERROR;
+ logger
+ .debug("Interrupted while waiting for driver monitor service to shutdown - just exiting");
break;
}
}
@@ -137,6 +176,8 @@ public class LogicalNode implements Life
if (!lifecycleState.equals(LifecycleState.ERROR)) {
lifecycleState = LifecycleState.STOP;
}
+
+ logger.debug("Logical node {} stopped with state:{}", name, lifecycleState);
}
@Override