You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:47:28 UTC

svn commit: r1156891 - in /incubator/flume/branches/flume-728: flume-ng-core/src/test/java/org/apache/flume/core/ flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/ flume-ng-node/src/test/java/org/apache/flume/node/

Author: esammer
Date: Fri Aug 12 00:47:28 2011
New Revision: 1156891

URL: http://svn.apache.org/viewvc?rev=1156891&view=rev
Log:
- WIP: Started moving everything to use channel driver thread directly - an experiment. - Check point to try a different approached based on internal conversations.

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java
      - copied, changed from r1156890, incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
Removed:
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java

Copied: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java (from r1156890, incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java)
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java?p2=incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java&p1=incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java&r1=1156890&r2=1156891&rev=1156891&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriverThread.java Fri Aug 12 00:47:28 2011
@@ -1,5 +1,6 @@
 package org.apache.flume.core;
 
+import org.apache.flume.core.ChannelDriver.ChannelDriverThread;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -11,16 +12,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestChannelDriver {
+public class TestChannelDriverThread {
 
   private static final Logger logger = LoggerFactory
-      .getLogger(TestChannelDriver.class);
+      .getLogger(TestChannelDriverThread.class);
 
-  private ChannelDriver driver;
+  private ChannelDriverThread driver;
 
   @Before
   public void setUp() {
-    driver = new ChannelDriver("test-channel-driver");
+    driver = new ChannelDriverThread("test-channel-driver");
   }
 
   @Test
@@ -97,7 +98,7 @@ public class TestChannelDriver {
 
     Context context = new Context();
 
-    driver.start(context);
+    driver.start();
 
     LifecycleController.waitForOneOf(driver, new LifecycleState[] {
         LifecycleState.START, LifecycleState.ERROR }, 5000);
@@ -220,10 +221,10 @@ public class TestChannelDriver {
 
     Assert.assertEquals(Long.valueOf(1), sourceCounters.get("open"));
     Assert.assertEquals(Long.valueOf(1), sourceCounters.get("close"));
-    Assert.assertTrue(sourceCounters.get("next") > 0);
+    Assert.assertEquals(Long.valueOf(0), sourceCounters.get("next"));
     Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
     Assert.assertEquals(Long.valueOf(1), sinkCounters.get("close"));
-    Assert.assertTrue(sinkCounters.get("append") > 0);
+    Assert.assertEquals(Long.valueOf(0), sinkCounters.get("append"));
     Assert.assertEquals(
         "Source next() events didn't match sink append() events",
         sourceCounters.get("next"), sinkCounters.get("append"));

Added: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java?rev=1156891&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/MemoryBasedLogicalNodeManager.java Fri Aug 12 00:47:28 2011
@@ -0,0 +1,95 @@
+package org.apache.flume.node.nodemanager;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBasedLogicalNodeManager extends AbstractLogicalNodeManager {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(MemoryBasedLogicalNodeManager.class);
+
+  private LifecycleState lifecycleState;
+  private ExecutorService executorService;
+
+  public MemoryBasedLogicalNodeManager() {
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  public boolean add(final LogicalNode node) {
+    if (super.add(node)) {
+      final Context context = new Context();
+
+      executorService.submit(new Callable<Exception>() {
+
+        @Override
+        public Exception call() throws Exception {
+          node.start(context);
+
+          boolean reached = LifecycleController
+              .waitForOneOf(node, new LifecycleState[] { LifecycleState.START,
+                  LifecycleState.ERROR });
+
+          if (!reached) {
+            
+          }
+
+          return null;
+        }
+      });
+
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public void start(Context context) throws LifecycleException,
+      InterruptedException {
+
+    executorService = Executors.newFixedThreadPool(5);
+
+    lifecycleState = LifecycleState.START;
+  }
+
+  @Override
+  public void stop(Context context) throws LifecycleException,
+      InterruptedException {
+
+    executorService.shutdown();
+
+    while (!executorService.isTerminated()) {
+      logger.debug("Waiting for spawn / despawn service to stop");
+      try {
+        executorService.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        logger
+            .warn("Interrupted while waiting for spawn / despawn service to stop. Interrupting outstanding jobs may cause problems! Report this to the developers!");
+
+        executorService.shutdownNow();
+        lifecycleState = LifecycleState.ERROR;
+      }
+    }
+
+    if (!lifecycleState.equals(LifecycleState.ERROR)) {
+      lifecycleState = LifecycleState.STOP;
+    }
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1156891&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Fri Aug 12 00:47:28 2011
@@ -0,0 +1,166 @@
+package org.apache.flume.node;
+
+import junit.framework.Assert;
+
+import org.apache.flume.core.Context;
+import org.apache.flume.core.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestAbstractLogicalNodeManager {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestAbstractLogicalNodeManager.class);
+
+  private AbstractLogicalNodeManager nodeManager;
+
+  @Before
+  public void setUp() {
+    nodeManager = new AbstractLogicalNodeManager() {
+
+      private LifecycleState lifecycleState = LifecycleState.IDLE;
+
+      @Override
+      public void stop(Context context) throws LifecycleException,
+          InterruptedException {
+
+        for (LogicalNode node : getNodes()) {
+          node.stop(context);
+
+          boolean reached = LifecycleController
+              .waitForOneOf(node, new LifecycleState[] { LifecycleState.STOP,
+                  LifecycleState.ERROR });
+
+          if (!reached) {
+            logger.error(
+                "Unable to stop logical node:{} This *will* cause failures.",
+                node);
+          }
+
+          if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
+            lifecycleState = LifecycleState.ERROR;
+          }
+        }
+
+        lifecycleState = LifecycleState.STOP;
+      }
+
+      @Override
+      public void start(Context context) throws LifecycleException,
+          InterruptedException {
+
+        for (LogicalNode node : getNodes()) {
+          node.start(context);
+
+          boolean reached = LifecycleController
+              .waitForOneOf(node, new LifecycleState[] { LifecycleState.START,
+                  LifecycleState.ERROR });
+
+          if (!reached) {
+            logger.error(
+                "Unable to stop logical node:{} This *will* cause failures.",
+                node);
+          }
+
+          if (node.getLifecycleState().equals(LifecycleState.ERROR)) {
+            lifecycleState = LifecycleState.ERROR;
+          }
+        }
+
+        lifecycleState = LifecycleState.START;
+      }
+
+      @Override
+      public LifecycleState getLifecycleState() {
+        return lifecycleState;
+      }
+    };
+  }
+
+  @Test
+  public void testEmptyLifecycle() throws LifecycleException,
+      InterruptedException {
+    Context context = new Context();
+
+    nodeManager.start(context);
+    boolean reached = LifecycleController.waitForOneOf(nodeManager,
+        new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR });
+
+    Assert.assertTrue(reached);
+    Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
+
+    nodeManager.stop(context);
+    reached = LifecycleController.waitForOneOf(nodeManager,
+        new LifecycleState[] { LifecycleState.STOP, LifecycleState.ERROR });
+
+    Assert.assertTrue(reached);
+    Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
+  }
+
+  @Test
+  public void testLifecycle() throws LifecycleException, InterruptedException {
+
+    Context context = new Context();
+
+    LogicalNode node = new LogicalNode();
+    node.setName("test");
+    node.setSource(new SequenceGeneratorSource());
+    node.setSink(new NullSink());
+
+    nodeManager.add(node);
+
+    nodeManager.start(context);
+    boolean reached = LifecycleController.waitForOneOf(nodeManager,
+        new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR });
+
+    Assert.assertTrue(reached);
+    Assert.assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
+
+    nodeManager.stop(context);
+    reached = LifecycleController.waitForOneOf(nodeManager,
+        new LifecycleState[] { LifecycleState.STOP, LifecycleState.ERROR });
+
+    Assert.assertTrue(reached);
+    Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
+  }
+
+  @Test
+  public void testRapidLifecycleFlapping() throws LifecycleException,
+      InterruptedException {
+
+    LogicalNode node = new LogicalNode();
+    node.setName("test");
+    node.setSource(new SequenceGeneratorSource());
+    node.setSink(new NullSink());
+
+    nodeManager.add(node);
+
+    for (int i = 0; i < 10; i++) {
+      Context context = new Context();
+
+      nodeManager.start(context);
+      boolean reached = LifecycleController.waitForOneOf(nodeManager,
+          new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR });
+
+      Assert.assertTrue(reached);
+      Assert
+          .assertEquals(LifecycleState.START, nodeManager.getLifecycleState());
+
+      nodeManager.stop(context);
+      reached = LifecycleController.waitForOneOf(nodeManager,
+          new LifecycleState[] { LifecycleState.STOP, LifecycleState.ERROR });
+
+      Assert.assertTrue(reached);
+      Assert.assertEquals(LifecycleState.STOP, nodeManager.getLifecycleState());
+    }
+  }
+
+}