You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:18 UTC
svn commit: r1156866 - in /incubator/flume/branches/flume-728/flume-ng-node:
./ src/ src/main/ src/main/java/ src/main/java/org/
src/main/java/org/apache/ src/main/java/org/apache/flume/
src/main/java/org/apache/flume/node/ src/test/ src/test/java/ src...
Author: esammer
Date: Fri Aug 12 00:46:18 2011
New Revision: 1156866
URL: http://svn.apache.org/viewvc?rev=1156866&view=rev
Log:
- Initial addition of a refactored flume node.
Added:
incubator/flume/branches/flume-728/flume-ng-node/
incubator/flume/branches/flume-728/flume-ng-node/pom.xml
incubator/flume/branches/flume-728/flume-ng-node/src/
incubator/flume/branches/flume-728/flume-ng-node/src/main/
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/
incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties
Added: incubator/flume/branches/flume-728/flume-ng-node/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/pom.xml?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/pom.xml Fri Aug 12 00:46:18 2011
@@ -0,0 +1,54 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flume</artifactId>
+ <groupId>com.cloudera</groupId>
+ <version>0.9.5-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-node</artifactId>
+ <version>0.9.5-SNAPSHOT</version>
+ <name>Flume NG Node</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.3.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/AbstractLogicalNodeManager.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,49 @@
+package org.apache.flume.node;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flume.core.LogicalNode;
+
+import com.google.common.base.Preconditions;
+
+abstract public class AbstractLogicalNodeManager implements NodeManager {
+
+ private Set<LogicalNode> nodes;
+
+ public AbstractLogicalNodeManager() {
+ nodes = new HashSet<LogicalNode>();
+ }
+
+ @Override
+ public boolean add(LogicalNode node) {
+ Preconditions.checkNotNull(node);
+
+ return nodes.add(node);
+ }
+
+ @Override
+ public boolean remove(LogicalNode node) {
+ Preconditions.checkNotNull(node);
+
+ return nodes.remove(node);
+ }
+
+ @Override
+ public Set<LogicalNode> getNodes() {
+ return new HashSet<LogicalNode>(nodes);
+ }
+
+ @Override
+ public void setNodes(Set<LogicalNode> nodes) {
+ Preconditions.checkNotNull(nodes);
+
+ this.nodes = new HashSet<LogicalNode>(nodes);
+ }
+
+ @Override
+ public String toString() {
+ return "{ nodes:" + nodes + " }";
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FileBasedLogicalNodeManager.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,37 @@
+package org.apache.flume.node;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+
+public class FileBasedLogicalNodeManager extends AbstractLogicalNodeManager {
+
+ private LifecycleState lifecycleState;
+
+ public FileBasedLogicalNodeManager() {
+ super();
+
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException {
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException {
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public void transitionTo(LifecycleState state) {
+ // Not sure if this make sense.
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,88 @@
+package org.apache.flume.node;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class FlumeNode implements LifecycleAware {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlumeNode.class);
+
+ private String name;
+ private LifecycleState lifecycleState;
+ private NodeManager nodeManager;
+ private NodeConfigurationClient configurationClient;
+
+ @Override
+ public void start(Context context) throws LifecycleException {
+ Preconditions.checkState(name != null, "Node name can not be null");
+ Preconditions.checkState(nodeManager != null,
+ "Node manager can not be null");
+
+ logger.info("Flume node starting - {}", name);
+
+ nodeManager.start(context);
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException {
+ logger.info("Flume node stopping - {}", name);
+
+ nodeManager.stop(context);
+
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public String toString() {
+ return "{ name:" + name + " nodeManager:" + nodeManager + " }";
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public NodeManager getNodeManager() {
+ return nodeManager;
+ }
+
+ public void setNodeManager(NodeManager nodeManager) {
+ this.nodeManager = nodeManager;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public void transitionTo(LifecycleState state) {
+ switch (state) {
+ case START:
+ Preconditions.checkState(lifecycleState.equals(LifecycleState.IDLE),
+ "Unable to transition from " + lifecycleState + " to " + state);
+ break;
+ case STOP:
+ Preconditions.checkState(lifecycleState.equals(LifecycleState.START),
+ "Unable to transition from " + lifecycleState + " to " + state);
+ break;
+ case ERROR:
+ break;
+ default:
+ throw new IllegalStateException("Unable to transition from "
+ + lifecycleState + " to " + state);
+ }
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeConfigurationClient.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,5 @@
+package org.apache.flume.node;
+
+public interface NodeConfigurationClient {
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,18 @@
+package org.apache.flume.node;
+
+import java.util.Set;
+
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleAware;
+
+public interface NodeManager extends LifecycleAware {
+
+ public boolean add(LogicalNode node);
+
+ public boolean remove(LogicalNode node);
+
+ public Set<LogicalNode> getNodes();
+
+ public void setNodes(Set<LogicalNode> nodes);
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java Fri Aug 12 00:46:18 2011
@@ -0,0 +1,94 @@
+package org.apache.flume.node;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFlumeNode {
+
+ private FlumeNode node;
+
+ @Before
+ public void setUp() {
+ node = new FlumeNode();
+
+ node.setName("test-node");
+ node.setNodeManager(new EmptyLogicalNodeManager());
+ }
+
+ @Test
+ public void testLifecycle() throws InterruptedException, LifecycleException {
+ Context context = new Context();
+
+ node.start(context);
+ boolean reached = LifecycleController.waitForOneOf(node,
+ new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR },
+ 5000);
+
+ Assert.assertTrue("Matched a known state", reached);
+ Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+ node.stop(context);
+ reached = LifecycleController.waitForOneOf(node, new LifecycleState[] {
+ LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+ Assert.assertTrue("Matched a known state", reached);
+ Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+ }
+
+ @Test
+ public void testAddNodes() throws InterruptedException, LifecycleException {
+ Context context = new Context();
+
+ node.start(context);
+ boolean reached = LifecycleController.waitForOneOf(node,
+ new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR },
+ 5000);
+
+ Assert.assertTrue("Matched a known state", reached);
+ Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
+
+ LogicalNode n1 = new LogicalNode();
+
+ node.getNodeManager().add(n1);
+
+ node.stop(context);
+ reached = LifecycleController.waitForOneOf(node, new LifecycleState[] {
+ LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+ Assert.assertTrue("Matched a known state", reached);
+ Assert.assertEquals(LifecycleState.STOP, node.getLifecycleState());
+ }
+
+ public static class EmptyLogicalNodeManager extends
+ AbstractLogicalNodeManager {
+
+ private LifecycleState lifecycleState;
+
+ @Override
+ public void start(Context context) throws LifecycleException {
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException {
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+ @Override
+ public void transitionTo(LifecycleState state) {
+ }
+
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties?rev=1156866&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/log4j.properties Fri Aug 12 00:46:18 2011
@@ -0,0 +1,7 @@
+log4j.rootCategory = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%l - %p] %m%n
+
+log4j.logger.org.apache.flume = DEBUG