You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:28 UTC
svn commit: r1156891 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/test/java/org/apache/flume/core/
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/
flume-ng-node/src/test/java/org/apache/flume/node/
Author: esammer
Date: Fri Aug 12 00:47:28 2011
New Revision: 1156891
URL: http://svn.apache.org/viewvc?rev=1156891&view=rev
Log:
- WIP: Started moving everything to use channel driver thread directly - an experiment. - Check point to try a different approached based on internal conversations.
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java
- copied, changed from r1156890, incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
Removed:
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java
Copied: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java (from r1156890, incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java)
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java?p2=incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java&p1=incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java&r1=1156890&r2=1156891&rev=1156891&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java Fri Aug 12 00:47:28 2011
@@ -1,5 +1,6 @@
package org.apache.flume.core;
+import org.apache.flume.core.ChannelDriver.ChannelDriverThread;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
@@ -11,16 +12,16 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TestChannelDriver {
+public class TestChannelDriverThread {
private static final Logger logger = LoggerFactory
- .getLogger(TestChannelDriver.class);
+ .getLogger(TestChannelDriverThread.class);
- private ChannelDriver driver;
+ private ChannelDriverThread driver;
@Before
public void setUp() {
- driver = new ChannelDriver("test-channel-driver");
+ driver = new ChannelDriverThread("test-channel-driver");
}
@Test
@@ -97,7 +98,7 @@ public class TestChannelDriver {
Context context = new Context();
- driver.start(context);
+ driver.start();
LifecycleController.waitForOneOf(driver, new LifecycleState[] {
LifecycleState.START, LifecycleState.ERROR }, 5000);
@@ -220,10 +221,10 @@ public class TestChannelDriver {
Assert.assertEquals(Long.valueOf(1), sourceCounters.get("open"));
Assert.assertEquals(Long.valueOf(1), sourceCounters.get("close"));
- Assert.assertTrue(sourceCounters.get("next") > 0);
+ Assert.assertEquals(Long.valueOf(0), sourceCounters.get("next"));
Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
Assert.assertEquals(Long.valueOf(1), sinkCounters.get("close"));
- Assert.assertTrue(sinkCounters.get("append") > 0);
+ Assert.assertEquals(Long.valueOf(0), sinkCounters.get("append"));
Assert.assertEquals(
"Source next() events didn't match sink append() events",
sourceCounters.get("next"), sinkCounters.get("append"));
Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java?rev=1156891&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java Fri Aug 12 00:47:28 2011
@@ -0,0 +1,95 @@
+package org.apache.flume.node.nodemanager;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBasedLogicalNodeManager extends AbstractLogicalNodeManager {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(MemoryBasedLogicalNodeManager.class);
+
+ private LifecycleState lifecycleState;
+ private ExecutorService executorService;
+
+ public MemoryBasedLogicalNodeManager() {
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ public boolean add(final LogicalNode node) {
+ if (super.add(node)) {
+ final Context context = new Context();
+
+ executorService.submit(new Callable<Exception>() {
+
+ @Override
+ public Exception call() throws Exception {
+ node.start(context);
+
+ boolean reached = LifecycleController
+ .waitForOneOf(node, new LifecycleState[] { LifecycleState.START,
+ LifecycleState.ERROR });
+
+ if (!reached) {
+
+ }
+
+ return null;
+ }
+ });
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException,
+ InterruptedException {
+
+ executorService = Executors.newFixedThreadPool(5);
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public void stop(Context context) throws LifecycleException,
+ InterruptedException {
+
+ executorService.shutdown();
+
+ while (!executorService.isTerminated()) {
+ logger.debug("Waiting for spawn / despawn service to stop");
+ try {
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger
+ .warn("Interrupted while waiting for spawn / despawn service to stop. Interrupting outstanding jobs may cause problems! Report this to the developers!");
+
+ executorService.shutdownNow();
+ lifecycleState = LifecycleState.ERROR;
+ }
+ }
+
+ if (!lifecycleState.equals(LifecycleState.ERROR)) {
+ lifecycleState = LifecycleState.STOP;
+ }
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+
+}
Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1156891&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Fri Aug 12 00:47:28 2011
@@ -0,0 +1,166 @@
+package org.apache.flume.node;
+
+import junit.framework.Assert;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAbstractLogicalNodeManager {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestAbstractLogicalNodeManager.class);
+
+ private AbstractLogicalNodeManager nodeManager;
+
+ @Before
+ public void setUp() {
+ nodeManager = new AbstractLogicalNodeManager() {
+
+ private LifecycleState lifecycleState = LifecycleState.IDLE;
+
+ @Override
+ public void stop(Context context) throws LifecycleException,
+ InterruptedException {
+
+ for (LogicalNode node : getNodes()) {
+ node.stop(context);
+
+ boolean reached = LifecycleController
+ .waitForOneOf(node, new LifecycleState[] { LifecycleState.STOP,
+ LifecycleState.ERROR });
+
+ if (!reached) {
+ logger.error(
+ "Unable to stop logical node:{} This *will* cause failures.",
+ node);
+ }
+
+ if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
+ lifecycleState = LifecycleState.ERROR;
+ }
+ }
+
+ lifecycleState = LifecycleState.STOP;
+ }
+
+ @Override
+ public void start(Context context) throws LifecycleException,
+ InterruptedException {
+
+ for (LogicalNode node : getNodes()) {
+ node.start(context);
+
+ boolean reached = LifecycleController
+ .waitForOneOf(node, new LifecycleState[] { LifecycleState.START,
+ LifecycleState.ERROR });
+
+ if (!reached) {
+ logger.error(
+ "Unable to stop logical node:{} This *will* cause failures.",
+ node);
+ }
+
+ if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
+ lifecycleState = LifecycleState.ERROR;
+ }
+ }
+
+ lifecycleState = LifecycleState.START;
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+ };
+ }
+
+ @Test
+ public void testEmptyLifecycle() throws LifecycleException,
+ InterruptedException {
+ Context context = new Context();
+
+ nodeManager.start(context);
+ boolean reached = LifecycleController.waitForOneOf(nodeManager,
+ new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR });
+
+ Assert.assertTrue(reached);
+ Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
+
+ nodeManager.stop(context);
+ reached = LifecycleController.waitForOneOf(nodeManager,
+ new LifecycleState[] { LifecycleState.STOP, LifecycleState.ERROR });
+
+ Assert.assertTrue(reached);
+ Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
+ }
+
+ @Test
+ public void testLifecycle() throws LifecycleException, InterruptedException {
+
+ Context context = new Context();
+
+ LogicalNode node = new LogicalNode();
+ node.setName("test");
+ node.setSource(new SequenceGeneratorSource());
+ node.setSink(new NullSink());
+
+ nodeManager.add(node);
+
+ nodeManager.start(context);
+ boolean reached = LifecycleController.waitForOneOf(nodeManager,
+ new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR });
+
+ Assert.assertTrue(reached);
+ Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
+
+ nodeManager.stop(context);
+ reached = LifecycleController.waitForOneOf(nodeManager,
+ new LifecycleState[] { LifecycleState.STOP, LifecycleState.ERROR });
+
+ Assert.assertTrue(reached);
+ Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
+ }
+
+ @Test
+ public void testRapidLifecycleFlapping() throws LifecycleException,
+ InterruptedException {
+
+ LogicalNode node = new LogicalNode();
+ node.setName("test");
+ node.setSource(new SequenceGeneratorSource());
+ node.setSink(new NullSink());
+
+ nodeManager.add(node);
+
+ for (int i = 0; i < 10; i++) {
+ Context context = new Context();
+
+ nodeManager.start(context);
+ boolean reached = LifecycleController.waitForOneOf(nodeManager,
+ new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR });
+
+ Assert.assertTrue(reached);
+ Assert
+ .assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
+
+ nodeManager.stop(context);
+ reached = LifecycleController.waitForOneOf(nodeManager,
+ new LifecycleState[] { LifecycleState.STOP, LifecycleState.ERROR });
+
+ Assert.assertTrue(reached);
+ Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
+ }
+ }
+
+}