You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/13 02:47:00 UTC
svn commit: r1157307 -
/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Author: esammer
Date: Sat Aug 13 00:46:59 2011
New Revision: 1157307
URL: http://svn.apache.org/viewvc?rev=1157307&view=rev
Log:
- Ported DefaultLogicalNodeMangaer to LifecycleSupervisor rather
than implementing a specific version itself. WORKS.
Modified:
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1157307&r1=1157306&r2=1157307&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Sat Aug 13 00:46:59 2011
@@ -1,9 +1,7 @@
package org.apache.flume.node.nodemanager;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.EventSink;
@@ -13,13 +11,14 @@ import org.apache.flume.SinkFactory;
import org.apache.flume.SourceFactory;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
import org.apache.flume.node.NodeConfiguration;
import org.apache.flume.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
implements NodeConfigurationAware {
@@ -30,11 +29,13 @@ public class DefaultLogicalNodeManager e
private SourceFactory sourceFactory;
private SinkFactory sinkFactory;
+ private LifecycleSupervisor nodeSupervisor;
private ExecutorService commandProcessorService;
private ScheduledExecutorService monitorService;
private LifecycleState lifecycleState;
public DefaultLogicalNodeManager() {
+ nodeSupervisor = new LifecycleSupervisor();
lifecycleState = LifecycleState.IDLE;
}
@@ -85,7 +86,8 @@ public class DefaultLogicalNodeManager e
"You can not add nodes to a manager that hasn't been started");
if (super.add(node)) {
- startNode(node);
+ nodeSupervisor.supervise(node,
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
return true;
}
@@ -101,6 +103,9 @@ public class DefaultLogicalNodeManager e
if (super.remove(node)) {
stopNode(node);
+ nodeSupervisor.setDesiredState(node, LifecycleState.STOP);
+ nodeSupervisor.unsupervise(node);
+
return true;
}
@@ -133,19 +138,23 @@ public class DefaultLogicalNodeManager e
logger.info("Node manager starting");
- NodeStatusMonitor statusMonitor = new NodeStatusMonitor();
-
- statusMonitor.setNodeManager(this);
-
- commandProcessorService = Executors
- .newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
- "nodeManager-commandProcessor-%d").build());
- monitorService = Executors.newScheduledThreadPool(
- 1,
- new ThreadFactoryBuilder().setNameFormat(
- "nodeManager-monitorService-%d").build());
+ /*
+ * NodeStatusMonitor statusMonitor = new NodeStatusMonitor();
+ *
+ * statusMonitor.setNodeManager(this);
+ *
+ * commandProcessorService = Executors .newCachedThreadPool(new
+ * ThreadFactoryBuilder().setNameFormat(
+ * "nodeManager-commandProcessor-%d").build()); monitorService =
+ * Executors.newScheduledThreadPool( 1, new
+ * ThreadFactoryBuilder().setNameFormat(
+ * "nodeManager-monitorService-%d").build());
+ *
+ * monitorService.scheduleAtFixedRate(statusMonitor, 0, 3,
+ * TimeUnit.SECONDS);
+ */
- monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
+ nodeSupervisor.start(context);
logger.debug("Node manager started");
@@ -158,23 +167,23 @@ public class DefaultLogicalNodeManager e
logger.info("Node manager stopping");
- for (LogicalNode node : getNodes()) {
- stopNode(node);
- }
-
- monitorService.shutdown();
-
- while (!monitorService.isTerminated()) {
- logger.debug("Waiting for node status monitor to shutdown");
- monitorService.awaitTermination(1, TimeUnit.SECONDS);
- }
-
- commandProcessorService.shutdown();
+ /*
+ * for (LogicalNode node : getNodes()) { stopNode(node); }
+ *
+ * monitorService.shutdown();
+ *
+ * while (!monitorService.isTerminated()) {
+ * logger.debug("Waiting for node status monitor to shutdown");
+ * monitorService.awaitTermination(1, TimeUnit.SECONDS); }
+ *
+ * commandProcessorService.shutdown();
+ *
+ * while (!commandProcessorService.isTerminated()) {
+ * logger.debug("Waiting for command processor to shutdown");
+ * commandProcessorService.awaitTermination(1, TimeUnit.SECONDS); }
+ */
- while (!commandProcessorService.isTerminated()) {
- logger.debug("Waiting for command processor to shutdown");
- commandProcessorService.awaitTermination(1, TimeUnit.SECONDS);
- }
+ nodeSupervisor.stop(context);
logger.debug("Node manager stopped");