You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/13 02:47:00 UTC

svn commit: r1157307 - /incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Author: esammer
Date: Sat Aug 13 00:46:59 2011
New Revision: 1157307

URL: http://svn.apache.org/viewvc?rev=1157307&view=rev
Log:
- Ported DefaultLogicalNodeMangaer to LifecycleSupervisor rather
  than implementing a specific version itself. WORKS.

Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1157307&r1=1157306&r2=1157307&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Sat Aug 13 00:46:59 2011
@@ -1,9 +1,7 @@
 package org.apache.flume.node.nodemanager;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Context;
 import org.apache.flume.EventSink;
@@ -13,13 +11,14 @@ import org.apache.flume.SinkFactory;
 import org.apache.flume.SourceFactory;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.apache.flume.node.NodeConfiguration;
 import org.apache.flume.node.NodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
     implements NodeConfigurationAware {
@@ -30,11 +29,13 @@ public class DefaultLogicalNodeManager e
   private SourceFactory sourceFactory;
   private SinkFactory sinkFactory;
 
+  private LifecycleSupervisor nodeSupervisor;
   private ExecutorService commandProcessorService;
   private ScheduledExecutorService monitorService;
   private LifecycleState lifecycleState;
 
   public DefaultLogicalNodeManager() {
+    nodeSupervisor = new LifecycleSupervisor();
     lifecycleState = LifecycleState.IDLE;
   }
 
@@ -85,7 +86,8 @@ public class DefaultLogicalNodeManager e
         "You can not add nodes to a manager that hasn't been started");
 
     if (super.add(node)) {
-      startNode(node);
+      nodeSupervisor.supervise(node,
+          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
 
       return true;
     }
@@ -101,6 +103,9 @@ public class DefaultLogicalNodeManager e
     if (super.remove(node)) {
       stopNode(node);
 
+      nodeSupervisor.setDesiredState(node, LifecycleState.STOP);
+      nodeSupervisor.unsupervise(node);
+
       return true;
     }
 
@@ -133,19 +138,23 @@ public class DefaultLogicalNodeManager e
 
     logger.info("Node manager starting");
 
-    NodeStatusMonitor statusMonitor = new NodeStatusMonitor();
-
-    statusMonitor.setNodeManager(this);
-
-    commandProcessorService = Executors
-        .newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
-            "nodeManager-commandProcessor-%d").build());
-    monitorService = Executors.newScheduledThreadPool(
-        1,
-        new ThreadFactoryBuilder().setNameFormat(
-            "nodeManager-monitorService-%d").build());
+    /*
+     * NodeStatusMonitor statusMonitor = new NodeStatusMonitor();
+     * 
+     * statusMonitor.setNodeManager(this);
+     * 
+     * commandProcessorService = Executors .newCachedThreadPool(new
+     * ThreadFactoryBuilder().setNameFormat(
+     * "nodeManager-commandProcessor-%d").build()); monitorService =
+     * Executors.newScheduledThreadPool( 1, new
+     * ThreadFactoryBuilder().setNameFormat(
+     * "nodeManager-monitorService-%d").build());
+     * 
+     * monitorService.scheduleAtFixedRate(statusMonitor, 0, 3,
+     * TimeUnit.SECONDS);
+     */
 
-    monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
+    nodeSupervisor.start(context);
 
     logger.debug("Node manager started");
 
@@ -158,23 +167,23 @@ public class DefaultLogicalNodeManager e
 
     logger.info("Node manager stopping");
 
-    for (LogicalNode node : getNodes()) {
-      stopNode(node);
-    }
-
-    monitorService.shutdown();
-
-    while (!monitorService.isTerminated()) {
-      logger.debug("Waiting for node status monitor to shutdown");
-      monitorService.awaitTermination(1, TimeUnit.SECONDS);
-    }
-
-    commandProcessorService.shutdown();
+    /*
+     * for (LogicalNode node : getNodes()) { stopNode(node); }
+     * 
+     * monitorService.shutdown();
+     * 
+     * while (!monitorService.isTerminated()) {
+     * logger.debug("Waiting for node status monitor to shutdown");
+     * monitorService.awaitTermination(1, TimeUnit.SECONDS); }
+     * 
+     * commandProcessorService.shutdown();
+     * 
+     * while (!commandProcessorService.isTerminated()) {
+     * logger.debug("Waiting for command processor to shutdown");
+     * commandProcessorService.awaitTermination(1, TimeUnit.SECONDS); }
+     */
 
-    while (!commandProcessorService.isTerminated()) {
-      logger.debug("Waiting for command processor to shutdown");
-      commandProcessorService.awaitTermination(1, TimeUnit.SECONDS);
-    }
+    nodeSupervisor.stop(context);
 
     logger.debug("Node manager stopped");