You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:48:25 UTC

svn commit: r1156908 - in /incubator/flume/branches/flume-728/flume-ng-node/src: main/java/org/apache/flume/node/nodemanager/ test/java/org/apache/flume/node/

Author: esammer
Date: Fri Aug 12 00:48:25 2011
New Revision: 1156908

URL: http://svn.apache.org/viewvc?rev=1156908&view=rev
Log:
- Added a DefaultNodeManager which is NodeConfigurationAware and monitored by the newly added NodeStatusMonitor.

Added:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1156908&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Fri Aug 12 00:48:25 2011
@@ -0,0 +1,142 @@
+package org.apache.flume.node.nodemanager;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventSink;
+import org.apache.flume.EventSource;
+import org.apache.flume.LogicalNode;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SourceFactory;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.NodeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
+    implements NodeConfigurationAware {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(DefaultLogicalNodeManager.class);
+
+  private SourceFactory sourceFactory;
+  private SinkFactory sinkFactory;
+
+  private ScheduledExecutorService monitorService;
+  private LifecycleState lifecycleState;
+
+  public DefaultLogicalNodeManager() {
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public void onNodeConfigurationChanged(NodeConfiguration nodeConfiguration) {
+    logger.info("Node configuration change:{}", nodeConfiguration);
+
+    /*
+     * FIXME: Decide if nodeConfiguration is worth applying. We can't trust the
+     * caller to know our config.
+     */
+
+    EventSource source = null;
+    EventSink sink = null;
+
+    try {
+      source = sourceFactory.create(nodeConfiguration.getSourceDefinition());
+    } catch (InstantiationException e) {
+      logger
+          .error(
+              "Failed to apply configuration:{} because of source failure:{} - retaining old configuration",
+              nodeConfiguration, e.getMessage());
+      return;
+    }
+
+    try {
+      sink = sinkFactory.create(nodeConfiguration.getSinkDefinition());
+    } catch (InstantiationException e) {
+      logger
+          .error(
+              "Failed to apply configuration:{} because of sink failure:{} - retaining old configuration",
+              nodeConfiguration, e.getMessage());
+      return;
+    }
+
+    LogicalNode newLogicalNode = new LogicalNode();
+
+    newLogicalNode.setName(nodeConfiguration.getName());
+    newLogicalNode.setSource(source);
+    newLogicalNode.setSink(sink);
+
+    add(newLogicalNode);
+  }
+
+  @Override
+  public void start(Context context) throws LifecycleException,
+      InterruptedException {
+
+    logger.info("Node manager starting");
+
+    NodeStatusMonitor statusMonitor = new NodeStatusMonitor();
+
+    statusMonitor.setNodeManager(this);
+
+    monitorService = Executors.newScheduledThreadPool(1);
+    monitorService.scheduleAtFixedRate(statusMonitor, 0, 3, TimeUnit.SECONDS);
+
+    for (LogicalNode node : getNodes()) {
+      try {
+        node.start(context);
+      } catch (LifecycleException e) {
+        logger.error("Failed to start logical node:{}", node);
+      } catch (InterruptedException e) {
+        logger.error("Interrupted while starting logical node:{}", node);
+        lifecycleState = LifecycleState.ERROR;
+        throw e;
+      }
+    }
+
+    logger.debug("Node manager started");
+
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop(Context context) throws LifecycleException,
+      InterruptedException {
+
+    logger.info("Node manager stopping");
+
+    for (LogicalNode node : getNodes()) {
+      try {
+        node.stop(context);
+      } catch (LifecycleException e) {
+        logger.error("Failed to stop logical node:{}", node);
+      } catch (InterruptedException e) {
+        logger
+            .error(
+                "Interrupted while stopping logical node:{} - Continuing shutdown anyway!",
+                node);
+      }
+    }
+
+    monitorService.shutdown();
+
+    while (!monitorService.isTerminated()) {
+      logger.debug("Waiting for node status monitor to shutdown");
+      monitorService.awaitTermination(1, TimeUnit.SECONDS);
+    }
+
+    logger.debug("Node manager stopped");
+
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java?rev=1156908&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeStatusMonitor.java Fri Aug 12 00:48:25 2011
@@ -0,0 +1,87 @@
+package org.apache.flume.node.nodemanager;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeStatusMonitor implements Runnable {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(NodeStatusMonitor.class);
+
+  private Map<String, Status> stati;
+
+  private NodeManager nodeManager;
+
+  public NodeStatusMonitor() {
+    stati = new HashMap<String, Status>();
+  }
+
+  @Override
+  public void run() {
+    updateStatus();
+  }
+
+  public void updateStatus() {
+    Set<LogicalNode> nodes = nodeManager.getNodes();
+
+    logger.debug("Checking stati of all nodes");
+
+    for (LogicalNode node : nodes) {
+      updateStatus(node);
+    }
+  }
+
+  public void updateStatus(LogicalNode node) {
+    logger.debug("Checking stati of node:{}", node);
+
+    Long now = System.currentTimeMillis();
+    Status status = stati.get(node.getName());
+
+    if (status == null) {
+      status = new Status();
+
+      status.firstSeen = now;
+    }
+
+    if (!node.getLifecycleState().equals(status.state)) {
+      logger.debug(
+          "Detected state change: node:{} - {} (lastSeen:{}) -> {}",
+          new Object[] { node.getName(), status.state, status.lastSeen,
+              node.getLifecycleState() });
+      status.lastStateChange = now;
+    }
+
+    status.lastSeen = now;
+    status.state = node.getLifecycleState();
+
+    stati.put(node.getName(), status);
+  }
+
+  @Override
+  public String toString() {
+    return "{ stati:" + stati + " nodeManager:" + nodeManager + " }";
+  }
+
+  public NodeManager getNodeManager() {
+    return nodeManager;
+  }
+
+  public void setNodeManager(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+  }
+
+  public static class Status {
+    public Long firstSeen;
+    public Long lastSeen;
+    public LifecycleState state;
+    public Long lastStateChange;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java?rev=1156908&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java Fri Aug 12 00:48:25 2011
@@ -0,0 +1,66 @@
+package org.apache.flume.node;
+
+import org.apache.flume.Context;
+import org.apache.flume.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDefaultLogicalNodeManager {
+
+  private NodeManager nodeManager;
+
+  @Before
+  public void setUp() {
+    nodeManager = new DefaultLogicalNodeManager();
+  }
+
+  @Test
+  public void testLifecycle() throws LifecycleException, InterruptedException {
+    Context context = new Context();
+
+    nodeManager.start(context);
+    Assert.assertTrue("Node manager didn't reach START or ERROR",
+        LifecycleController.waitForOneOf(nodeManager,
+            LifecycleState.START_OR_ERROR, 5000));
+
+    nodeManager.stop(context);
+    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
+        LifecycleController.waitForOneOf(nodeManager,
+            LifecycleState.STOP_OR_ERROR, 5000));
+  }
+
+  @Test
+  public void testLifecycleWithNodes() throws LifecycleException,
+      InterruptedException {
+
+    Context context = new Context();
+
+    LogicalNode node = new LogicalNode();
+
+    node.setName("test node");
+    node.setSource(new SequenceGeneratorSource());
+    node.setSink(new NullSink());
+
+    nodeManager.add(node);
+
+    nodeManager.start(context);
+    Assert.assertTrue("Node manager didn't reach START or ERROR",
+        LifecycleController.waitForOneOf(nodeManager,
+            LifecycleState.START_OR_ERROR, 5000));
+
+    Thread.sleep(5000);
+
+    nodeManager.stop(context);
+    Assert.assertTrue("Node manager didn't reach STOP or ERROR",
+        LifecycleController.waitForOneOf(nodeManager,
+            LifecycleState.STOP_OR_ERROR, 5000));
+  }
+
+}