You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:48:49 UTC

svn commit: r1156916 - in /incubator/flume/branches/flume-728/flume-ng-node/src: main/java/org/apache/flume/node/nodemanager/ test/java/org/apache/flume/node/

Author: esammer
Date: Fri Aug 12 00:48:48 2011
New Revision: 1156916

URL: http://svn.apache.org/viewvc?rev=1156916&view=rev
Log:
- DefaultLogicalNodeManager now accepts async commands to start / stop nodes. - Updated unit tests to push it.

Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1156916&r1=1156915&r2=1156916&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Aug 12 00:48:48 2011
@@ -14,9 +14,11 @@ import org.apache.flume.SourceFactory;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.NodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
@@ -78,6 +80,54 @@ public class DefaultLogicalNodeManager e
   }
 
   @Override
+  public boolean add(LogicalNode node) {
+    Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
+        "You can not add nodes to a manager that hasn't been started");
+
+    if (super.add(node)) {
+      startNode(node);
+
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean remove(LogicalNode node) {
+    Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
+        "You can not remove nodes from a manager that hasn't been started");
+
+    if (super.remove(node)) {
+      stopNode(node);
+
+      return true;
+    }
+
+    return false;
+  }
+
+  public void startNode(LogicalNode node) {
+    NodeStartCommand task = new NodeStartCommand();
+
+    task.context = new Context();
+    task.node = node;
+    task.nodeManager = this;
+
+    commandProcessorService.submit(task);
+  }
+
+  public void stopNode(LogicalNode node) {
+    NodeRemoveCommand task = new NodeRemoveCommand();
+
+    task.context = new Context();
+    task.node = node;
+    task.nodeManager = this;
+
+    commandProcessorService.submit(task);
+  }
+
+  @Override
   public void start(Context context) throws LifecycleException,
       InterruptedException {
 
@@ -97,18 +147,6 @@ public class DefaultLogicalNodeManager e
 
     monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
 
-    for (LogicalNode node : getNodes()) {
-      try {
-        node.start(context);
-      } catch (LifecycleException e) {
-        logger.error("Failed to start logical node:{}", node);
-      } catch (InterruptedException e) {
-        logger.error("Interrupted while starting logical node:{}", node);
-        lifecycleState = LifecycleState.ERROR;
-        throw e;
-      }
-    }
-
     logger.debug("Node manager started");
 
     lifecycleState = LifecycleState.START;
@@ -121,18 +159,7 @@ public class DefaultLogicalNodeManager e
     logger.info("Node manager stopping");
 
     for (LogicalNode node : getNodes()) {
-      try {
-        if (node.getLifecycleState().equals(LifecycleState.START)) {
-          node.stop(context);
-        }
-      } catch (LifecycleException e) {
-        logger.error("Failed to stop logical node:{}", node);
-      } catch (InterruptedException e) {
-        logger
-            .error(
-                "Interrupted while stopping logical node:{} - Continuing shutdown anyway!",
-                node);
-      }
+      stopNode(node);
     }
 
     monitorService.shutdown();
@@ -159,4 +186,54 @@ public class DefaultLogicalNodeManager e
     return lifecycleState;
   }
 
+  public static class NodeStartCommand implements Runnable {
+
+    public Context context;
+    public LogicalNode node;
+    public NodeManager nodeManager;
+
+    @Override
+    public void run() {
+      if (node.getLifecycleState().equals(LifecycleState.START)) {
+        logger.warn("Ignoring an attempt to start a running node:{}", node);
+        return;
+      }
+
+      try {
+        node.start(context);
+      } catch (LifecycleException e) {
+        logger.error("Failed to start node:" + node + " Exception follows.", e);
+      } catch (InterruptedException e) {
+        logger.info("Interrupted while starting node:" + node
+            + " Almost certainly shutting down (or a serious bug)");
+      }
+    }
+
+  }
+
+  public static class NodeRemoveCommand implements Runnable {
+
+    public Context context;
+    public LogicalNode node;
+    public NodeManager nodeManager;
+
+    @Override
+    public void run() {
+      if (!node.getLifecycleState().equals(LifecycleState.START)) {
+        logger.warn("Ignoring an attempt to stop a non-running node:{}", node);
+        return;
+      }
+
+      try {
+        node.stop(context);
+      } catch (LifecycleException e) {
+        logger.error("Failed to stop node:" + node + " Exception follows.", e);
+      } catch (InterruptedException e) {
+        logger.info("Interrupted while starting node:" + node
+            + " Almost certainly shutting down (or a serious bug)");
+      }
+    }
+
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java?rev=1156916&r1=1156915&r2=1156916&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java Fri Aug 12 00:48:48 2011
@@ -51,10 +51,13 @@ public class NodeStatusMonitor implement
     }
 
     if (!node.getLifecycleState().equals(status.state)) {
-      logger.debug(
-          "Detected state change: node:{} - {} (lastSeen:{}) -> {}",
-          new Object[] { node.getName(), status.state, status.lastSeen,
-              node.getLifecycleState() });
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Detected state change: node:{} - {} (lastSeen:{}) -> {}",
+            new Object[] { node.getName(), status.state, status.lastSeen,
+                node.getLifecycleState() });
+      }
+
       status.lastStateChange = now;
     }
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java?rev=1156916&r1=1156915&r2=1156916&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java Fri Aug 12 00:48:48 2011
@@ -1,5 +1,8 @@
 package org.apache.flume.node;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.flume.Context;
 import org.apache.flume.LogicalNode;
 import org.apache.flume.lifecycle.LifecycleController;
@@ -42,19 +45,56 @@ public class TestDefaultLogicalNodeManag
 
     Context context = new Context();
 
-    LogicalNode node = new LogicalNode();
+    nodeManager.start(context);
+    Assert.assertTrue("Node manager didn't reach START or ERROR",
+        LifecycleController.waitForOneOf(nodeManager,
+            LifecycleState.START_OR_ERROR, 5000));
+
+    for (int i = 0; i < 3; i++) {
+      LogicalNode node = new LogicalNode();
+
+      node.setName("test-node-" + i);
+      node.setSource(new SequenceGeneratorSource());
+      node.setSink(new NullSink());
+
+      nodeManager.add(node);
+    }
+
+    Thread.sleep(5000);
+
+    nodeManager.stop(context);
+    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
+        LifecycleController.waitForOneOf(nodeManager,
+            LifecycleState.STOP_OR_ERROR, 5000));
+  }
+
+  @Test
+  public void testNodeStartStops() throws LifecycleException,
+      InterruptedException {
+
+    Set<LogicalNode> testNodes = new HashSet<LogicalNode>();
+
+    for (int i = 0; i < 30; i++) {
+      LogicalNode node = new LogicalNode();
 
-    node.setName("test node");
-    node.setSource(new SequenceGeneratorSource());
-    node.setSink(new NullSink());
+      node.setName("test-node-" + i);
+      node.setSource(new SequenceGeneratorSource());
+      node.setSink(new NullSink());
 
-    nodeManager.add(node);
+      testNodes.add(node);
+    }
+
+    Context context = new Context();
 
     nodeManager.start(context);
     Assert.assertTrue("Node manager didn't reach START or ERROR",
         LifecycleController.waitForOneOf(nodeManager,
             LifecycleState.START_OR_ERROR, 5000));
 
+    for (LogicalNode node : testNodes) {
+      nodeManager.add(node);
+    }
+
     Thread.sleep(5000);
 
     nodeManager.stop(context);