You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:18 UTC

svn commit: r1156866 - in /incubator/flume/branches/flume-728/flume-ng-node: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/flume/ src/main/java/org/apache/flume/node/ src/test/ src/test/java/ src...

Author: esammer
Date: Fri Aug 12 00:46:18 2011
New Revision: 1156866

URL: http://svn.apache.org/viewvc?rev=1156866&view=rev
Log:
- Initial addition of a refactored flume node.

Added:
    incubator/flume/branches/flume-728/flume-ng-node/
    incubator/flume/branches/flume-728/flume-ng-node/pom.xml
    incubator/flume/branches/flume-728/flume-ng-node/src/
    incubator/flume/branches/flume-728/flume-ng-node/src/main/
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/
    incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties

Added: incubator/flume/branches/flume-728/flume-ng-node/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/pom.xml?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/pom.xml Fri Aug 12 00:46:18 2011
@@ -0,0 +1,54 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume</artifactId>
+    <groupId>com.cloudera</groupId>
+    <version>0.9.5-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-ng-node</artifactId>
+  <version>0.9.5-SNAPSHOT</version>
+  <name>Flume NG Node</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.3.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,49 @@
+package org.apache.flume.node;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flume.core.LogicalNode;
+
+import com.google.common.base.Preconditions;
+
+abstract public class AbstractLogicalNodeManager implements NodeManager {
+
+  private Set<LogicalNode> nodes;
+
+  public AbstractLogicalNodeManager() {
+    nodes = new HashSet<LogicalNode>();
+  }
+
+  @Override
+  public boolean add(LogicalNode node) {
+    Preconditions.checkNotNull(node);
+
+    return nodes.add(node);
+  }
+
+  @Override
+  public boolean remove(LogicalNode node) {
+    Preconditions.checkNotNull(node);
+
+    return nodes.remove(node);
+  }
+
+  @Override
+  public Set<LogicalNode> getNodes() {
+    return new HashSet<LogicalNode>(nodes);
+  }
+
+  @Override
+  public void setNodes(Set<LogicalNode> nodes) {
+    Preconditions.checkNotNull(nodes);
+
+    this.nodes = new HashSet<LogicalNode>(nodes);
+  }
+
+  @Override
+  public String toString() {
+    return "{ nodes:" + nodes + " }";
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,37 @@
+package org.apache.flume.node;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+
+public class FileBasedLogicalNodeManager extends AbstractLogicalNodeManager {
+
+  private LifecycleState lifecycleState;
+
+  public FileBasedLogicalNodeManager() {
+    super();
+
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public void start(Context context) throws LifecycleException {
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop(Context context) throws LifecycleException {
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  @Override
+  public void transitionTo(LifecycleState state) {
+    // Not sure if this make sense.
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,88 @@
+package org.apache.flume.node;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class FlumeNode implements LifecycleAware {
+
+  private static final Logger logger = LoggerFactory.getLogger(FlumeNode.class);
+
+  private String name;
+  private LifecycleState lifecycleState;
+  private NodeManager nodeManager;
+  private NodeConfigurationClient configurationClient;
+
+  @Override
+  public void start(Context context) throws LifecycleException {
+    Preconditions.checkState(name != null, "Node name can not be null");
+    Preconditions.checkState(nodeManager != null,
+        "Node manager can not be null");
+
+    logger.info("Flume node starting - {}", name);
+
+    nodeManager.start(context);
+
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop(Context context) throws LifecycleException {
+    logger.info("Flume node stopping - {}", name);
+
+    nodeManager.stop(context);
+
+    lifecycleState = LifecycleState.STOP;
+  }
+
+  @Override
+  public String toString() {
+    return "{ name:" + name + " nodeManager:" + nodeManager + " }";
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public NodeManager getNodeManager() {
+    return nodeManager;
+  }
+
+  public void setNodeManager(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  @Override
+  public void transitionTo(LifecycleState state) {
+    switch (state) {
+    case START:
+      Preconditions.checkState(lifecycleState.equals(LifecycleState.IDLE),
+          "Unable to transition from " + lifecycleState + " to " + state);
+      break;
+    case STOP:
+      Preconditions.checkState(lifecycleState.equals(LifecycleState.START),
+          "Unable to transition from " + lifecycleState + " to " + state);
+      break;
+    case ERROR:
+      break;
+    default:
+      throw new IllegalStateException("Unable to transition from "
+          + lifecycleState + " to " + state);
+    }
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,5 @@
+package org.apache.flume.node;
+
+public interface NodeConfigurationClient {
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,18 @@
+package org.apache.flume.node;
+
+import java.util.Set;
+
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleAware;
+
+public interface NodeManager extends LifecycleAware {
+
+  public boolean add(LogicalNode node);
+
+  public boolean remove(LogicalNode node);
+
+  public Set<LogicalNode> getNodes();
+
+  public void setNodes(Set<LogicalNode> nodes);
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,94 @@
+package org.apache.flume.node;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFlumeNode {
+
+  private FlumeNode node;
+
+  @Before
+  public void setUp() {
+    node = new FlumeNode();
+
+    node.setName("test-node");
+    node.setNodeManager(new EmptyLogicalNodeManager());
+  }
+
+  @Test
+  public void testLifecycle() throws InterruptedException, LifecycleException {
+    Context context = new Context();
+
+    node.start(context);
+    boolean reached = LifecycleController.waitForOneOf(node,
+        new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR },
+        5000);
+
+    Assert.assertTrue("Matched a known state", reached);
+    Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+    node.stop(context);
+    reached = LifecycleController.waitForOneOf(node, new LifecycleState[] {
+        LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+    Assert.assertTrue("Matched a known state", reached);
+    Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+  }
+
+  @Test
+  public void testAddNodes() throws InterruptedException, LifecycleException {
+    Context context = new Context();
+
+    node.start(context);
+    boolean reached = LifecycleController.waitForOneOf(node,
+        new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR },
+        5000);
+
+    Assert.assertTrue("Matched a known state", reached);
+    Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+    LogicalNode n1 = new LogicalNode();
+
+    node.getNodeManager().add(n1);
+
+    node.stop(context);
+    reached = LifecycleController.waitForOneOf(node, new LifecycleState[] {
+        LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+    Assert.assertTrue("Matched a known state", reached);
+    Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+  }
+
+  public static class EmptyLogicalNodeManager extends
+      AbstractLogicalNodeManager {
+
+    private LifecycleState lifecycleState;
+
+    @Override
+    public void start(Context context) throws LifecycleException {
+      lifecycleState = LifecycleState.START;
+    }
+
+    @Override
+    public void stop(Context context) throws LifecycleException {
+      lifecycleState = LifecycleState.STOP;
+    }
+
+    @Override
+    public LifecycleState getLifecycleState() {
+      return lifecycleState;
+    }
+
+    @Override
+    public void transitionTo(LifecycleState state) {
+    }
+
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties Fri Aug 12 00:46:18 2011
@@ -0,0 +1,7 @@
+log4j.rootCategory = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%l - %p] %m%n
+
+log4j.logger.org.apache.flume = DEBUG