You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:48:25 UTC
svn commit: r1156908 - in
/incubator/flume/branches/flume-728/flume-ng-node/src:
main/java/org/apache/flume/node/nodemanager/ test/java/org/apache/flume/node/
Author: esammer
Date: Fri Aug 12 00:48:25 2011
New Revision: 1156908
URL: http://svn.apache.org/viewvc?rev=1156908&view=rev
Log:
- Added a DefaultNodeManager which is NodeConfigurationAware and monitored by the newly added NodeStatusMonitor.
Added:
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1156908&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Aug 12 00:48:25 2011
@@ -0,0 +1,142 @@
+package org.apache.flume.node.nodemanager;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventSink;
+import org.apache.flume.EventSource;
+import org.apache.flume.LogicalNode;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.NodeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
+ implements NodeConfigurationAware {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(DefaultLogicalNodeManager.class);
+
+ private SourceFactory sourceFactory;
+ private SinkFactory sinkFactory;
+
+ private ScheduledExecutorService monitorService;
+ private LifecycleState lifecycleState;
+
+ public DefaultLogicalNodeManager() {
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ public void onNodeConfigurationChanged(NodeConfiguration nodeConfiguration) {
+ logger.info("Node configuration change:{}", nodeConfiguration);
+
+ /*
+ * FIXME: Decide if nodeConfiguration is worth applying. We can't trust the
+ * caller to know our config.
+ */
+
+ EventSource source = null;
+ EventSink sink = null;
+
+ try {
+ source = sourceFactory.create(nodeConfiguration.getSourceDefinition());
+ } catch (InstantiationException e) {
+ logger
+ .error(
+ "Failed to apply configuration:{} because of source failure:{} - retaining old configuration",
+ nodeConfiguration, e.getMessage());
+ return;
+ }
+
+ try {
+ sink = sinkFactory.create(nodeConfiguration.getSinkDefinition());
+ } catch (InstantiationException e) {
+ logger
+ .error(
+ "Failed to apply configuration:{} because of sink failure:{} - retaining old configuration",
+ nodeConfiguration, e.getMessage());
+ return;
+ }
+
+ LogicalNode newLogicalNode = new LogicalNode();
+
+ newLogicalNode.setName(nodeConfiguration.getName());
+ newLogicalNode.setSource(source);
+ newLogicalNode.setSink(sink);
+
+ add(newLogicalNode);
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException,
+ InterruptedException {
+
+ logger.info("Node manager starting");
+
+ NodeStatusMonitor statusMonitor = new NodeStatusMonitor();
+
+ statusMonitor.setNodeManager(this);
+
+ monitorService = Executors.newScheduledThreadPool(1);
+ monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
+
+ for (LogicalNode node : getNodes()) {
+ try {
+ node.start(context);
+ } catch (LifecycleException e) {
+ logger.error("Failed to start logical node:{}", node);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while starting logical node:{}", node);
+ lifecycleState = LifecycleState.ERROR;
+ throw e;
+ }
+ }
+
+ logger.debug("Node manager started");
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException,
+ InterruptedException {
+
+ logger.info("Node manager stopping");
+
+ for (LogicalNode node : getNodes()) {
+ try {
+ node.stop(context);
+ } catch (LifecycleException e) {
+ logger.error("Failed to stop logical node:{}", node);
+ } catch (InterruptedException e) {
+ logger
+ .error(
+ "Interrupted while stopping logical node:{} - Continuing shutdown anyway!",
+ node);
+ }
+ }
+
+ monitorService.shutdown();
+
+ while (!monitorService.isTerminated()) {
+ logger.debug("Waiting for node status monitor to shutdown");
+ monitorService.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ logger.debug("Node manager stopped");
+
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java?rev=1156908&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java Fri Aug 12 00:48:25 2011
@@ -0,0 +1,87 @@
+package org.apache.flume.node.nodemanager;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeStatusMonitor implements Runnable {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(NodeStatusMonitor.class);
+
+ private Map<String, Status> stati;
+
+ private NodeManager nodeManager;
+
+ public NodeStatusMonitor() {
+ stati = new HashMap<String, Status>();
+ }
+
+ @Override
+ public void run() {
+ updateStatus();
+ }
+
+ public void updateStatus() {
+ Set<LogicalNode> nodes = nodeManager.getNodes();
+
+ logger.debug("Checking stati of all nodes");
+
+ for (LogicalNode node : nodes) {
+ updateStatus(node);
+ }
+ }
+
+ public void updateStatus(LogicalNode node) {
+ logger.debug("Checking stati of node:{}", node);
+
+ Long now = System.currentTimeMillis();
+ Status status = stati.get(node.getName());
+
+ if (status == null) {
+ status = new Status();
+
+ status.firstSeen = now;
+ }
+
+ if (!node.getLifecycleState().equals(status.state)) {
+ logger.debug(
+ "Detected state change: node:{} - {} (lastSeen:{}) -> {}",
+ new Object[] { node.getName(), status.state, status.lastSeen,
+ node.getLifecycleState() });
+ status.lastStateChange = now;
+ }
+
+ status.lastSeen = now;
+ status.state = node.getLifecycleState();
+
+ stati.put(node.getName(), status);
+ }
+
+ @Override
+ public String toString() {
+ return "{ stati:" + stati + " nodeManager:" + nodeManager + " }";
+ }
+
+ public NodeManager getNodeManager() {
+ return nodeManager;
+ }
+
+ public void setNodeManager(NodeManager nodeManager) {
+ this.nodeManager = nodeManager;
+ }
+
+ public static class Status {
+ public Long firstSeen;
+ public Long lastSeen;
+ public LifecycleState state;
+ public Long lastStateChange;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java?rev=1156908&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java Fri Aug 12 00:48:25 2011
@@ -0,0 +1,66 @@
+package org.apache.flume.node;
+
+import org.apache.flume.Context;
+import org.apache.flume.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDefaultLogicalNodeManager {
+
+ private NodeManager nodeManager;
+
+ @Before
+ public void setUp() {
+ nodeManager = new DefaultLogicalNodeManager();
+ }
+
+ @Test
+ public void testLifecycle() throws LifecycleException, InterruptedException {
+ Context context = new Context();
+
+ nodeManager.start(context);
+ Assert.assertTrue("Node manager didn't reach START or ERROR",
+ LifecycleController.waitForOneOf(nodeManager,
+ LifecycleState.START_OR_ERROR, 5000));
+
+ nodeManager.stop(context);
+ Assert.assertTrue("Node manager didn't reach STOP or ERROR",
+ LifecycleController.waitForOneOf(nodeManager,
+ LifecycleState.STOP_OR_ERROR, 5000));
+ }
+
+ @Test
+ public void testLifecycleWithNodes() throws LifecycleException,
+ InterruptedException {
+
+ Context context = new Context();
+
+ LogicalNode node = new LogicalNode();
+
+ node.setName("test node");
+ node.setSource(new SequenceGeneratorSource());
+ node.setSink(new NullSink());
+
+ nodeManager.add(node);
+
+ nodeManager.start(context);
+ Assert.assertTrue("Node manager didn't reach START or ERROR",
+ LifecycleController.waitForOneOf(nodeManager,
+ LifecycleState.START_OR_ERROR, 5000));
+
+ Thread.sleep(5000);
+
+ nodeManager.stop(context);
+ Assert.assertTrue("Node manager didn't reach STOP or ERROR",
+ LifecycleController.waitForOneOf(nodeManager,
+ LifecycleState.STOP_OR_ERROR, 5000));
+ }
+
+}