You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:48:49 UTC
svn commit: r1156916 - in
/incubator/flume/branches/flume-728/flume-ng-node/src:
main/java/org/apache/flume/node/nodemanager/ test/java/org/apache/flume/node/
Author: esammer
Date: Fri Aug 12 00:48:48 2011
New Revision: 1156916
URL: http://svn.apache.org/viewvc?rev=1156916&view=rev
Log:
- DefaultLogicalNodeManager now accepts async commands to start / stop nodes. - Updated unit tests to push it.
Modified:
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1156916&r1=1156915&r2=1156916&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Aug 12 00:48:48 2011
@@ -14,9 +14,11 @@ import org.apache.flume.SourceFactory;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.node.NodeConfiguration;
+import org.apache.flume.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
@@ -78,6 +80,54 @@ public class DefaultLogicalNodeManager e
}
@Override
+ public boolean add(LogicalNode node) {
+ Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
+ "You can not add nodes to a manager that hasn't been started");
+
+ if (super.add(node)) {
+ startNode(node);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean remove(LogicalNode node) {
+ Preconditions.checkState(getLifecycleState().equals(LifecycleState.START),
+ "You can not remove nodes from a manager that hasn't been started");
+
+ if (super.remove(node)) {
+ stopNode(node);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ public void startNode(LogicalNode node) {
+ NodeStartCommand task = new NodeStartCommand();
+
+ task.context = new Context();
+ task.node = node;
+ task.nodeManager = this;
+
+ commandProcessorService.submit(task);
+ }
+
+ public void stopNode(LogicalNode node) {
+ NodeRemoveCommand task = new NodeRemoveCommand();
+
+ task.context = new Context();
+ task.node = node;
+ task.nodeManager = this;
+
+ commandProcessorService.submit(task);
+ }
+
+ @Override
public void start(Context context) throws LifecycleException,
InterruptedException {
@@ -97,18 +147,6 @@ public class DefaultLogicalNodeManager e
monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
- for (LogicalNode node : getNodes()) {
- try {
- node.start(context);
- } catch (LifecycleException e) {
- logger.error("Failed to start logical node:{}", node);
- } catch (InterruptedException e) {
- logger.error("Interrupted while starting logical node:{}", node);
- lifecycleState = LifecycleState.ERROR;
- throw e;
- }
- }
-
logger.debug("Node manager started");
lifecycleState = LifecycleState.START;
@@ -121,18 +159,7 @@ public class DefaultLogicalNodeManager e
logger.info("Node manager stopping");
for (LogicalNode node : getNodes()) {
- try {
- if (node.getLifecycleState().equals(LifecycleState.START)) {
- node.stop(context);
- }
- } catch (LifecycleException e) {
- logger.error("Failed to stop logical node:{}", node);
- } catch (InterruptedException e) {
- logger
- .error(
- "Interrupted while stopping logical node:{} - Continuing shutdown anyway!",
- node);
- }
+ stopNode(node);
}
monitorService.shutdown();
@@ -159,4 +186,54 @@ public class DefaultLogicalNodeManager e
return lifecycleState;
}
+ public static class NodeStartCommand implements Runnable {
+
+ public Context context;
+ public LogicalNode node;
+ public NodeManager nodeManager;
+
+ @Override
+ public void run() {
+ if (node.getLifecycleState().equals(LifecycleState.START)) {
+ logger.warn("Ignoring an attempt to start a running node:{}", node);
+ return;
+ }
+
+ try {
+ node.start(context);
+ } catch (LifecycleException e) {
+ logger.error("Failed to start node:" + node + " Exception follows.", e);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted while starting node:" + node
+ + " Almost certainly shutting down (or a serious bug)");
+ }
+ }
+
+ }
+
+ public static class NodeRemoveCommand implements Runnable {
+
+ public Context context;
+ public LogicalNode node;
+ public NodeManager nodeManager;
+
+ @Override
+ public void run() {
+ if (!node.getLifecycleState().equals(LifecycleState.START)) {
+ logger.warn("Ignoring an attempt to stop a non-running node:{}", node);
+ return;
+ }
+
+ try {
+ node.stop(context);
+ } catch (LifecycleException e) {
+ logger.error("Failed to stop node:" + node + " Exception follows.", e);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted while starting node:" + node
+ + " Almost certainly shutting down (or a serious bug)");
+ }
+ }
+
+ }
+
}
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java?rev=1156916&r1=1156915&r2=1156916&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java Fri Aug 12 00:48:48 2011
@@ -51,10 +51,13 @@ public class NodeStatusMonitor implement
}
if (!node.getLifecycleState().equals(status.state)) {
- logger.debug(
- "Detected state change: node:{} - {} (lastSeen:{}) -> {}",
- new Object[] { node.getName(), status.state, status.lastSeen,
- node.getLifecycleState() });
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Detected state change: node:{} - {} (lastSeen:{}) -> {}",
+ new Object[] { node.getName(), status.state, status.lastSeen,
+ node.getLifecycleState() });
+ }
+
status.lastStateChange = now;
}
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java?rev=1156916&r1=1156915&r2=1156916&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java Fri Aug 12 00:48:48 2011
@@ -1,5 +1,8 @@
package org.apache.flume.node;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.flume.Context;
import org.apache.flume.LogicalNode;
import org.apache.flume.lifecycle.LifecycleController;
@@ -42,19 +45,56 @@ public class TestDefaultLogicalNodeManag
Context context = new Context();
- LogicalNode node = new LogicalNode();
+ nodeManager.start(context);
+ Assert.assertTrue("Node manager didn't reach START or ERROR",
+ LifecycleController.waitForOneOf(nodeManager,
+ LifecycleState.START_OR_ERROR, 5000));
+
+ for (int i = 0; i < 3; i++) {
+ LogicalNode node = new LogicalNode();
+
+ node.setName("test-node-" + i);
+ node.setSource(new SequenceGeneratorSource());
+ node.setSink(new NullSink());
+
+ nodeManager.add(node);
+ }
+
+ Thread.sleep(5000);
+
+ nodeManager.stop(context);
+ Assert.assertTrue("Node manager didn't reach STOP or ERROR",
+ LifecycleController.waitForOneOf(nodeManager,
+ LifecycleState.STOP_OR_ERROR, 5000));
+ }
+
+ @Test
+ public void testNodeStartStops() throws LifecycleException,
+ InterruptedException {
+
+ Set<LogicalNode> testNodes = new HashSet<LogicalNode>();
+
+ for (int i = 0; i < 30; i++) {
+ LogicalNode node = new LogicalNode();
- node.setName("test node");
- node.setSource(new SequenceGeneratorSource());
- node.setSink(new NullSink());
+ node.setName("test-node-" + i);
+ node.setSource(new SequenceGeneratorSource());
+ node.setSink(new NullSink());
- nodeManager.add(node);
+ testNodes.add(node);
+ }
+
+ Context context = new Context();
nodeManager.start(context);
Assert.assertTrue("Node manager didn't reach START or ERROR",
LifecycleController.waitForOneOf(nodeManager,
LifecycleState.START_OR_ERROR, 5000));
+ for (LogicalNode node : testNodes) {
+ nodeManager.add(node);
+ }
+
Thread.sleep(5000);
nodeManager.stop(context);