You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by nu...@apache.org on 2010/09/22 23:57:03 UTC

svn commit: r1000251 [2/2] - in /commons/sandbox/pipeline/trunk: ./ src/main/java/org/apache/commons/pipeline/ src/main/java/org/apache/commons/pipeline/config/ src/main/java/org/apache/commons/pipeline/driver/ src/main/java/org/apache/commons/pipeline...

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java Wed Sep 22 21:57:01 2010
@@ -19,7 +19,8 @@ package org.apache.commons.pipeline.test
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.commons.pipeline.*;
+
+import org.apache.commons.pipeline.Feeder;
 
 /**
  * This feeder simply adds the received objects to a list.

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java Wed Sep 22 21:57:01 2010
@@ -20,6 +20,7 @@ package org.apache.commons.pipeline.test
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pipeline.Stage;

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java Wed Sep 22 21:57:01 2010
@@ -18,6 +18,7 @@
 package org.apache.commons.pipeline.validation;
 
 import java.util.List;
+
 import org.apache.commons.pipeline.Pipeline;
 import org.apache.commons.pipeline.Stage;
 import org.apache.commons.pipeline.StageDriverFactory;

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java Wed Sep 22 21:57:01 2010
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+
 import org.apache.commons.pipeline.Pipeline;
 import org.apache.commons.pipeline.Stage;
 import org.apache.commons.pipeline.StageDriverFactory;

Modified: commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml (original)
+++ commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml Wed Sep 22 21:57:01 2010
@@ -26,31 +26,29 @@ limitations under the License.
     </properties>
     <body>
         <p>
-            A tutorial on some of the Basics needed to use the Apache Commons Pipeline 
+            This tutorial covers some of the basics needed to use the Apache Commons Pipeline 
             workflow framework. The target audience for this document consists of developers
-            who will need to assemble existing stages or write their own stages. The
-            pipeline provides a Java class library intended to make it easy to use and reuse
-            stages as modular processing blocks.
+            who will need to assemble existing stages or write their own stages. 
         </p>
         <section name="Pipeline Structure">
             <p>
-            <b>Stages</b> in a pipeline represent the logical the steps needed
-            to process data. Each represents a single high level processing
-            concept such as finding files, reading a file format, computing a
-            product from the data, or writing data to a database. The primary
-            advantage of using the Pipeline framework and building the
-            processing steps into stages is the reusablility of the stages in 
-            other pipelines.
+            <b>Stages</b> in a pipeline represent logical units of work in 
+            a data processing workflow. Each stage implements a single high level 
+            processing concept such as finding files, reading a file format, computing a
+            product from the data, or writing data to a database. Commons-pipeline
+            is built from the ground up with parallel processing in mind, so stages
+            are intended to run concurrently.
             </p>
             <p>
                 <img src="images/BasicPipeline.png" alt="A basic pipeline"/>
             </p>
             <p>
-                A <b>Pipeline</b> is built up from stages which can pass data on
-                to subsequent stages. The arrows above that are labelled
-                <b>&quot;EMIT&quot;</b> show the data output of one stage being
-                passed to the next stage. At the code level, there is an
-                <code>emit()</code> method that sends data to the next stage.
+                A <b>Pipeline</b> is built up from stages that act as filters which
+                process incoming data and pass results on to subsequent stages. 
+                In the diagram above, the arrows labelled <b>&quot;EMIT&quot;</b> represent 
+                the data output of one stage being passed to the next. The base class for 
+                most Stage implementations, <code>BaseStage</code> supplies a convenience method
+                <code>emit()</code> that is used to send data to the next stage in the pipeline.
                 The data flow starts at the left, where there is an arrow
                 labelled <b>&quot;FEED&quot;</b>. The FEED starts off the
                 pipeline and is usually set up by a configuration file,
@@ -125,10 +123,10 @@ limitations under the License.
                 <li> One object fed into a stage does not always 
                 produce one object out.
                 <ul>
-                    <li> Stages that do not pass on (emit)  any objecs are referred to as
-                    <b>terminal stages</b>. Avoid creating this type of stage, since they limit your
-                    possibilities when building pipelines. (This is easy to do, one line of code
-                    passes data to the next stage.) 
+                    <li> Stages that do not pass on (emit)  any objecs are
+                    referred to as <b>terminal stages</b>. These should be
+                    avoided, since they limit your possibilities when building
+                    pipelines from existing stage components. 
                     </li>
                     <li> Stages that send objects on to more than one subsequent
                     stage are called <b>branching stages</b>. 

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java Wed Sep 22 21:57:01 2010
@@ -20,8 +20,11 @@ package org.apache.commons.pipeline;
 import java.io.File;
 import java.io.InputStream;
 import java.util.ResourceBundle;
+
 import javax.xml.parsers.DocumentBuilderFactory;
-import junit.framework.*;
+
+import junit.framework.TestCase;
+
 import org.apache.log4j.xml.DOMConfigurator;
 import org.w3c.dom.Document;
 

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java Wed Sep 22 21:57:01 2010
@@ -18,7 +18,6 @@
 package org.apache.commons.pipeline.driver;
 
 import org.apache.commons.pipeline.AbstractLoggingTestCase;
-import static org.apache.commons.pipeline.StageDriver.State.*;
 import org.apache.commons.pipeline.testFramework.TestFeeder;
 import org.apache.commons.pipeline.testFramework.TestStage;
 import org.apache.commons.pipeline.testFramework.TestStageContext;

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java Wed Sep 22 21:57:01 2010
@@ -18,9 +18,10 @@
 package org.apache.commons.pipeline.driver;
 
 import junit.framework.TestCase;
+
 import org.apache.commons.pipeline.StageDriver;
-import org.apache.commons.pipeline.testFramework.FaultingTestStage;
 import org.apache.commons.pipeline.StageDriverFactory;
+import org.apache.commons.pipeline.testFramework.FaultingTestStage;
 import org.apache.commons.pipeline.testFramework.TestFeeder;
 import org.apache.commons.pipeline.testFramework.TestStage;
 import org.apache.commons.pipeline.testFramework.TestStageContext;

Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import junit.framework.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.StageDriver.State;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.*;
+import org.apache.commons.pipeline.testFramework.TestStage;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+import org.apache.commons.pipeline.testFramework.TestFeeder;
+import org.apache.commons.pipeline.testFramework.TestStageContext;
+
+public class BalancedPoolStageDriverTest extends AbstractStageDriverTest {
+    private Log log;
+    
+    public BalancedPoolStageDriverTest(String testName) {
+        super(testName);
+        this.log = LogFactory.getLog(BalancedPoolStageDriverTest.class);
+    }
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+    
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+    
+    /**
+     * Test of getFeeder method, of class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+     */
+    public void testGetFeeder() {
+        System.out.println("testGetFeeder");
+        BalancedPoolStageDriver instance = new BalancedPoolStageDriver(stage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+        Feeder feeder = instance.getFeeder();
+        assertNotNull(feeder);
+    }
+    
+    /**
+     * Test of finish method, of class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+     */
+    public void testFinish() throws Exception {
+        System.out.println("testStart");
+        BalancedPoolStageDriver instance = new BalancedPoolStageDriver(stage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+        instance.start(); //must start the driver before we can tell it to finish
+        instance.finish();
+        
+        assertEquals(0, instance.getWorkerCount());
+        assertEquals(State.STOPPED, instance.getState());
+    }
+    
+    /**
+     * Test of decreasePriority method, of class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+     */
+    public void testAlterPriority() throws Exception {
+        System.out.println("decreasePriority");
+        
+        BalancedPoolStageDriver instance = new BalancedPoolStageDriver(stage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+        assertSame("Driver should not be initialized in state " + instance.getState(), instance.getState(), State.STOPPED);
+        instance.start();
+        assertSame("Driver is not running after instance start.", instance.getState(), State.RUNNING);
+        
+        int threads = instance.getWorkerCount();
+        instance.increasePriority(2);
+        assertEquals( 2, instance.getWorkerCount() );
+        
+        threads = instance.getWorkerCount();
+        instance.decreasePriority(1);
+        assertEquals( 1, instance.getWorkerCount() );
+        
+        assertNotSame("Driver has unexpectedly stopped.", instance.getState(), State.STOPPED);
+        instance.finish();
+        assertSame("Driver failed to shut down correctly", instance.getState(), State.STOPPED);
+    }
+    
+    /**
+     * Integration test of combined feed/priority change for objects.
+     */
+    public void testSingleStage() throws Exception {
+        TestStage stage = new TestStage(0);
+        PrioritizableStageDriverTestUtils.testSingleStage(this, new BalancedPoolStageDriverFactory(), stage);
+    }
+    
+    /**
+     * Integration test of combined feed/priority change for objects.
+     */
+    public void testMultiStage() throws Exception {
+        log.debug("testMultiStage -------------------------------------------");
+        TestStage[] stages = {
+            new TestStage(0),
+            new CPUBoundTestStage(1, 50),
+            new IOBoundTestStage(2, 50, 250)
+        };
+        PrioritizableStageDriverTestUtils.testMultiStage(this, new BalancedPoolStageDriverFactory(), 30, stages);
+    }
+    
+    public void testCPUBound() throws Exception {
+        TestStage cpuStage = new CPUBoundTestStage( 1, 50 );
+        PrioritizableStageDriverTestUtils.testSingleStage(this, new BalancedPoolStageDriverFactory(), cpuStage );
+    }
+    
+    public void testIOBound() throws Exception {
+        TestStage ioStage = new IOBoundTestStage( 1, 50, 250 );
+        PrioritizableStageDriverTestUtils.testSingleStage( this, new BalancedPoolStageDriverFactory(), ioStage );
+    }
+    
+    /**
+     * Test of increasePriority method, sof class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+     */
+    public void testMultiThreadExecution() throws Exception {
+        System.out.println("multiThreadExecution");
+        
+        //start with multiple threads
+        int count = 5;
+        final Set threadNames = Collections.synchronizedSet(new HashSet());
+        TestStage threadNameTrackingStage = new TestStage( 1 ) {
+            public void process( Object obj ) throws StageException {
+                super.process( obj );
+                threadNames.add( Thread.currentThread().getName() );
+                //yield thread control
+                Thread.currentThread().yield();
+            }
+        };
+        
+        BalancedPoolStageDriver instance = new BalancedPoolStageDriver(threadNameTrackingStage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+        instance.increasePriority( count );
+        assertEquals( count, instance.getWorkerCount() );
+        
+        TestStageContext context = new TestStageContext();
+        TestFeeder terminalFeeder = new TestFeeder();
+        
+        context.registerDownstreamFeeder(threadNameTrackingStage, terminalFeeder);
+        threadNameTrackingStage.init(context);
+        
+        instance.start();
+        
+        //ensure that with 1000 fed objects all the threads get exercised
+        int fedObjCount = 100;
+        for( int i = 0; i < fedObjCount; i++ ) {
+            instance.getFeeder().feed( i );
+        }
+        
+        instance.finish();
+        
+        assertEquals( count, threadNames.size() );
+    }
+    
+}

Copied: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java?p2=commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java Wed Sep 22 21:57:01 2010
@@ -15,35 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.commons.pipeline.testFramework;
+package org.apache.commons.pipeline.driver.control;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.*;
-import org.apache.commons.pipeline.validation.ConsumedTypes;
-import org.apache.commons.pipeline.validation.ProducesConsumed;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.testFramework.TestStage;
 
-/**
- * This stage will generate {@link StageException}s for every other object this
- * stage processes. By design, the even numbered objects will cause a <CODE>StageException</CODE>
- * to be thrown (counting the first object as 1).
- */
-@ConsumedTypes(Object.class)
-@ProducesConsumed
-public class FaultingTestStage extends TestStage {
-    private Log log = LogFactory.getLog(FaultingTestStage.class);
-    private int counter = 0;
+
+class CPUBoundTestStage extends TestStage {
+    private Log log = LogFactory.getLog(CPUBoundTestStage.class);
+    private long consume;
     
-    public FaultingTestStage(int index) {
-        super(index);
+    public CPUBoundTestStage(int id, long consume) {
+        super( id );
+        this.consume = consume;
     }
     
     public void process(Object obj) throws StageException {
-        if (++counter % 2 == 0) {
-            log.error("Planned fault in stage " + this + ".");
-            throw new StageException(this, "Planned fault in stage " + super.getIndex() + ".");
-        }
-        
-        super.process(obj);
+        super.process( obj );
+        long startTime = System.currentTimeMillis();
+        double val = PrioritizableStageDriverTestUtils.consumeNCubed( consume );
+        log.debug( "CPU stage took " + (System.currentTimeMillis() - startTime) + " ms to produce value " + val);
     }
-}
\ No newline at end of file
+}

Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.pipeline.Stage;
+
+public class EqualizingDriverControlStrategyTest extends TestCase {
+    
+    public EqualizingDriverControlStrategyTest(String testName) {
+        super(testName);
+    }
+
+    protected void setUp() throws Exception {
+    }
+
+    protected void tearDown() throws Exception {
+    }
+
+    /**
+     * Test of handleEvents method, of class org.apache.commons.pipeline.driver.control.EqualizingDriverControlStrategy.
+     */
+    public void testHandleEvents() {
+        System.out.println("handleEvents");
+        
+        List<PrioritizableStageDriver> drivers = null;
+        List<StageProcessTimingEvent> events = null;
+        EqualizingDriverControlStrategy instance = new EqualizingDriverControlStrategy();
+        
+        instance.handleEvents(drivers, events);
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+
+    /**
+     * Test of getAllowableDelta method, of class org.apache.commons.pipeline.driver.control.EqualizingDriverControlStrategy.
+     */
+    public void testGetAllowableDelta() {
+        System.out.println("getAllowableDelta");
+        
+        EqualizingDriverControlStrategy instance = new EqualizingDriverControlStrategy();
+        
+        long expResult = 0L;
+        long result = instance.getAllowableDelta();
+        assertEquals(expResult, result);
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+
+    /**
+     * Test of setAllowableDelta method, of class org.apache.commons.pipeline.driver.control.EqualizingDriverControlStrategy.
+     */
+    public void testSetAllowableDelta() {
+        System.out.println("setAllowableDelta");
+        
+        long allowableDelta = 0L;
+        EqualizingDriverControlStrategy instance = new EqualizingDriverControlStrategy();
+        
+        instance.setAllowableDelta(allowableDelta);
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+    
+}

Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.*;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+public class ExecutorStageDriverTest extends TestCase {
+    
+    public ExecutorStageDriverTest(String testName) {
+        super(testName);
+    }
+
+    protected void setUp() throws Exception {
+    }
+
+    protected void tearDown() throws Exception {
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite(ExecutorStageDriverTest.class);
+        
+        return suite;
+    }
+
+    /**
+     * Test of getFeeder method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+     */
+    public void testGetFeeder() {
+        System.out.println("getFeeder");
+        
+        ExecutorStageDriver instance = null;
+        
+        Feeder expResult = null;
+        Feeder result = instance.getFeeder();
+        assertEquals(expResult, result);
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+
+    /**
+     * Test of start method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+     */
+    public void testStart() throws Exception {
+        System.out.println("start");
+        
+        ExecutorStageDriver instance = null;
+        
+        instance.start();
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+
+    /**
+     * Test of finish method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+     */
+    public void testFinish() throws Exception {
+        System.out.println("finish");
+        
+        ExecutorStageDriver instance = null;
+        
+        instance.finish();
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+
+    /**
+     * Test of increasePriority method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+     */
+    public void testIncreasePriority() {
+        System.out.println("increasePriority");
+        
+        double amount = 0.0;
+        ExecutorStageDriver instance = null;
+        
+        instance.increasePriority(amount);
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+
+    /**
+     * Test of decreasePriority method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+     */
+    public void testDecreasePriority() {
+        System.out.println("decreasePriority");
+        
+        double amount = 0.0;
+        ExecutorStageDriver instance = null;
+        
+        instance.decreasePriority(amount);
+        
+        // TODO review the generated test code and remove the default call to fail.
+        fail("The test case is a prototype.");
+    }
+    
+}

Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class ExperimentalDriverControlStrategyTest extends TestCase {
+
+    public ExperimentalDriverControlStrategyTest(String testName) {
+        super(testName);
+    }
+
+    protected void setUp() throws Exception {
+    }
+
+    protected void tearDown() throws Exception {
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite( ExperimentalDriverControlStrategyTest.class );
+
+        return suite;
+    }
+
+    public void testCPUBoundControl() throws Exception
+    {
+        System.out.println( "Experimental: testCPUBoundControl");
+        CountingDriverController controller = new CountingDriverController();
+        controller.setMinimumEventsToHandle( 50 );
+        controller.setDriverControlStrategy( new ExperimentalDriverControlStrategy( 5 ) );
+        PrioritizableStageDriverTestUtils.testDriverControllerCPUBound(
+                this,
+                new BalancedPoolStageDriverFactory(),
+                controller );
+    }
+
+    public void testIOBoundControl() throws Exception
+    {
+        System.out.println( "Experimental: testIOBoundControl");
+        CountingDriverController controller = new CountingDriverController();
+        controller.setMinimumEventsToHandle( 50 );
+        controller.setDriverControlStrategy( new ExperimentalDriverControlStrategy( 5 ) );
+        PrioritizableStageDriverTestUtils.testDriverControllerIOBound(
+                this,
+                new BalancedPoolStageDriverFactory(),
+                controller );
+    }
+}

Copied: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java?p2=commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java Wed Sep 22 21:57:01 2010
@@ -15,35 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.commons.pipeline.testFramework;
+package org.apache.commons.pipeline.driver.control;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.*;
-import org.apache.commons.pipeline.validation.ConsumedTypes;
-import org.apache.commons.pipeline.validation.ProducesConsumed;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.testFramework.TestStage;
 
-/**
- * This stage will generate {@link StageException}s for every other object this
- * stage processes. By design, the even numbered objects will cause a <CODE>StageException</CODE>
- * to be thrown (counting the first object as 1).
- */
-@ConsumedTypes(Object.class)
-@ProducesConsumed
-public class FaultingTestStage extends TestStage {
-    private Log log = LogFactory.getLog(FaultingTestStage.class);
-    private int counter = 0;
+
+class IOBoundTestStage extends TestStage {
+    private Log log = LogFactory.getLog(IOBoundTestStage.class);
+    private long consume;
+    private long sleeptime;
     
-    public FaultingTestStage(int index) {
-        super(index);
+    public IOBoundTestStage(int id, long consume, long sleeptime) {
+        super( id );
+        this.consume = consume;
+        this.sleeptime = sleeptime;
     }
     
     public void process(Object obj) throws StageException {
-        if (++counter % 2 == 0) {
-            log.error("Planned fault in stage " + this + ".");
-            throw new StageException(this, "Planned fault in stage " + super.getIndex() + ".");
+        super.process( obj );
+        try {
+            long startTime = System.currentTimeMillis();
+            Thread.currentThread().sleep( sleeptime );
+            double total = PrioritizableStageDriverTestUtils.consumeNCubed( consume );
+            Thread.currentThread().sleep( sleeptime );
+            log.debug( "IO stage took " + (System.currentTimeMillis() - startTime) + " ms");
+        }  catch (InterruptedException ex) {
+            ex.printStackTrace();
         }
         
-        super.process(obj);
     }
-}
\ No newline at end of file
+}

Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+import org.apache.commons.pipeline.PipelineLifecycleJob;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.testFramework.TestFeeder;
+import org.apache.commons.pipeline.testFramework.TestStage;
+import org.apache.commons.pipeline.testFramework.TestStageContext;
+
+public class PrioritizableStageDriverTestUtils {
+    
+    /** Creates a new instance of PrioritizableStageDriverTestUtils */
+    private PrioritizableStageDriverTestUtils() {
+    }
+
+    /**
+     * Tests a stage driver created by the provided PrioritizableStageDriverFactory
+     * with a single test stage.
+     */
+    public static void testSingleStage(TestCase test, PrioritizableStageDriverFactory driverFactory, TestStage stage) throws Exception {
+        TestStageContext context = new TestStageContext();
+        TestFeeder terminalFeeder = new TestFeeder();
+
+        context.registerDownstreamFeeder(stage, terminalFeeder);
+        stage.init(context);
+
+        PrioritizableStageDriver instance = driverFactory.createStageDriver(stage, context);
+
+        instance.start();
+        instance.getFeeder().feed("Hello, world!");
+        instance.increasePriority(1);
+        instance.getFeeder().feed("How are you?");
+        instance.increasePriority(1);
+        instance.getFeeder().feed("Feeling blue?");
+        instance.decreasePriority(2);
+        instance.getFeeder().feed("There's naught to do,");
+        instance.decreasePriority(1);
+        instance.getFeeder().feed("but sing about shoes!");
+        instance.finish();
+
+        test.assertEquals("Incorrect processed object count from stage.", 5, stage.processedObjects.size());
+        test.assertEquals("Incorrect final processed object count.", 5, terminalFeeder.receivedValues.size());
+        for (int i = 0; i < 5; i++) {
+            test.assertTrue("Received value " + i + " is not a String!", terminalFeeder.receivedValues.get(i) instanceof String);
+        }
+    }
+
+    /**
+     * Tests a stage driver created by the provided StageDriverFactory
+     * with a set of three test stages.
+     */
+    public static void testMultiStage(TestCase test, PrioritizableStageDriverFactory driverFactory, int objectsToFeed, TestStage... stages) throws Exception {
+        TestStageContext context = new TestStageContext();
+        TestFeeder terminalFeeder = new TestFeeder();
+
+        List<PrioritizableStageDriver> drivers = new ArrayList<PrioritizableStageDriver>();
+        for (TestStage stage : stages) {
+            drivers.add(driverFactory.createStageDriver(stage, context));
+            stage.init(context);
+        }
+
+        for (int i = 0; i < drivers.size(); i++) {
+            if (i < drivers.size() - 1) {
+                context.registerDownstreamFeeder(stages[i], drivers.get(i+1).getFeeder());
+            } else {
+                context.registerDownstreamFeeder(stages[i], terminalFeeder);
+    }
+        }
+
+        Random random = new Random(0);
+        for (StageDriver driver : drivers) driver.start();
+        for( int i = 0; i < objectsToFeed; i++ ) {
+            drivers.get(0).getFeeder().feed( i );
+
+            //randomly permute driver priority
+            if (random.nextBoolean()) {
+                drivers.get(random.nextInt(drivers.size())).increasePriority(1);
+            } else {
+                drivers.get(random.nextInt(drivers.size())).decreasePriority(1);
+            }
+        }
+        for (StageDriver driver : drivers) driver.finish();
+
+        for (TestStage stage : stages) {
+            test.assertEquals("Incorrect processed object count from stage " + stage, objectsToFeed, stage.processedObjects.size());
+        }
+
+        test.assertEquals("Incorrect final processed object count.", objectsToFeed, terminalFeeder.receivedValues.size());
+    }
+
+    /**
+     * Tests that the given PrioritizableStageDriverFactory priority is driven
+     * towards 0 given a number of CPU tasks and a controller
+     */
+    public static void testDriverControllerCPUBound(TestCase test, PrioritizableStageDriverFactory driverFactory, AbstractDriverController controller ) throws Exception {
+        TestStage stage = new CPUBoundTestStage(0,300);
+        TestFeeder terminalFeeder = new TestFeeder();
+        TestStageContext context = new TestStageContext();
+        context.registerDownstreamFeeder(stage, terminalFeeder);
+        context.registerListener(controller);
+
+        stage.init(context);
+
+        //start the drivercontroller
+        if( controller instanceof PipelineLifecycleJob ) {
+            ((PipelineLifecycleJob) controller).onStart( null );
+        }
+
+        PrioritizableStageDriver instance = driverFactory.createStageDriver(stage, context);
+    
+        controller.addManagedStageDriver( instance );
+        long initialPriority = 2;
+        instance.increasePriority( initialPriority );
+        test.assertEquals( initialPriority, Math.round( instance.getPriority()));
+
+        instance.start();
+        int numFeeds = 500;
+        for( int i = 0; i < numFeeds; i++ ) {
+            instance.getFeeder().feed("Hello, world!");
+        }
+
+        long startTime = System.currentTimeMillis();
+        long feedTime = 15000;
+        while( (System.currentTimeMillis()-startTime)<feedTime)
+        {
+            instance.getFeeder().feed("Hello, world!");
+            Thread.currentThread().sleep( 200 );
+        }
+        test.assertTrue( "Controller should have decreased priority below "+
+                initialPriority+", instead was "+instance.getPriority(),
+                instance.getPriority() < initialPriority );
+        instance.finish();
+    }
+
+    /**
+     * Tests that the given PrioritizableStageDriverFactory priority is driven
+     * up given a number of IO tasks and a controller
+     */
+    public static void testDriverControllerIOBound(TestCase test, PrioritizableStageDriverFactory driverFactory, AbstractDriverController controller ) throws Exception {
+        TestStage stage = new IOBoundTestStage(0,0,150);
+        TestFeeder terminalFeeder = new TestFeeder();
+        TestStageContext context = new TestStageContext();
+        context.registerDownstreamFeeder(stage, terminalFeeder);
+        context.registerListener(controller);
+
+        stage.init(context);
+
+        //start the drivercontroller
+        if( controller instanceof PipelineLifecycleJob ) {
+            ((PipelineLifecycleJob) controller).onStart( null );
+        }
+
+        PrioritizableStageDriver instance = driverFactory.createStageDriver(stage, context);
+
+        controller.addManagedStageDriver( instance );
+        long initialPriority = 1;
+        instance.increasePriority( initialPriority );
+        test.assertEquals( initialPriority, Math.round( instance.getPriority()));
+
+        instance.start();
+        long startTime = System.currentTimeMillis();
+        long feedTime = 20000;
+        while( (System.currentTimeMillis()-startTime)<feedTime)
+        {
+            instance.getFeeder().feed("Hello, world!");
+            Thread.currentThread().sleep( 100 );
+        }
+
+        test.assertTrue( "Controller should have increased priority above "+
+                initialPriority+", instead was "+instance.getPriority(),
+                instance.getPriority() > initialPriority );
+
+        instance.finish();
+    }
+
+    /**
+     * Consume resources in O(n^2) fashion
+     *
+     * Return the aggregate value so the sqrt operation is less likely to be
+     * optimized by the JRE
+     */
+    public static double consumeNSquared( long operations ){
+        double aggregate = 0;
+        for( int i = 0; i < operations; i++ ) {
+            for( int j = 0; j < operations; j++ ) {
+                aggregate += Math.sqrt( i + j );
+            }
+        }
+        return aggregate;
+    }
+
+    /**
+     * Consume resources in O(n^3) fashion
+     *
+     * Return the aggregate value so the sqrt operation is less likely to be
+     * optimized by the JRE
+     */
+    public static double consumeNCubed( long operations ){
+        double aggregate = 0;
+        for( int i = 0; i < operations; i++ ) {
+            for( int j = 0; j < operations; j++ ) {
+                for( int k = 0; k < operations; k++ ) {
+                    aggregate += Math.sqrt( i + j );
+                }
+            }
+        }
+        return aggregate;
+    }
+}

Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.TestCase;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.testFramework.TestFeeder;
+import org.apache.commons.pipeline.testFramework.TestStage;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+import org.apache.commons.pipeline.validation.ValidationException;
+
+public class ToyBalancedPipelineTest extends TestCase {
+    public void testToyFactory() throws ValidationException, StageException {
+        Pipeline pipeline = new Pipeline();
+        CountingDriverController controller = new CountingDriverController();
+        controller.setMinimumEventsToHandle(20);
+        EqualizingDriverControlStrategy controlStrategy = new EqualizingDriverControlStrategy();
+        controller.setDriverControlStrategy(controlStrategy);
+        controlStrategy.setAllowableDelta(100);
+        pipeline.addLifecycleJob(controller);
+        TestFeeder terminalFeeder = new TestFeeder();
+        pipeline.setTerminalFeeder(terminalFeeder);
+        
+        BalancedPoolStageDriverFactory driverFactory = new BalancedPoolStageDriverFactory();
+        driverFactory.setInitialPriority(1);
+        driverFactory.setQueueFactory(new BlockingQueueFactory.LinkedBlockingQueueFactory());
+        
+        TestStage moldBody = new MoldBodyStage(1);
+        TestStage bodyPaint = new BodyPaintStage(2);
+        TestStage addWheels = new AddWheelsStage(3);
+        TestStage addAI = new AddAIStage(4);
+        
+        pipeline.addStage(moldBody, driverFactory);
+        pipeline.addStage(bodyPaint, driverFactory);
+        pipeline.addStage(addWheels, driverFactory);
+        pipeline.addStage(addAI, driverFactory);
+        
+        for (StageDriver driver : pipeline.getStageDrivers()) {
+            if (driver instanceof PrioritizableStageDriver) controller.addManagedStageDriver((PrioritizableStageDriver) driver);
+        }
+        
+        DriverMonitor m = new DriverMonitor(pipeline);
+        m.start();
+        pipeline.start();
+        
+        for (int i = 0; i < 100; i++) {
+            pipeline.getSourceFeeder().feed(new Car(i, "I'm a hunk of metal!"));
+        }
+        
+        pipeline.finish();
+        m.finish();
+        
+        assertEquals("Incorrect number of objects received.", 100, terminalFeeder.receivedValues.size());
+        for (Object obj : terminalFeeder.receivedValues) {
+            assertTrue("Object is not a car!", obj instanceof Car);
+            assertEquals("I AM ALIVE!", ((Car) obj).message);
+        }
+    }
+    
+    private class DriverMonitor extends Thread {
+        private volatile boolean done = false;
+        private Pipeline pipeline;
+        public DriverMonitor(Pipeline pipeline) {
+            this.pipeline = pipeline;
+        }
+        
+        public void run() {
+            while (!done) {
+                StringBuilder b = new StringBuilder();
+                for (StageDriver driver : pipeline.getStageDrivers()) {
+                    PrioritizableStageDriver d = (PrioritizableStageDriver) driver;
+                    b.append(d.getStage()).append(": ").append(d.getPriority()).append(";  ");
+                }
+                System.out.println("Driver priorities: " + b.toString());
+                
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {}                
+            }
+        }
+        
+        public void finish() {
+            this.done = true;
+        }
+    }
+    
+    private static class Car {
+        public int id;
+        public String message;
+        
+        public Car(int id, String message) {
+            this.id = id;
+            this.message = message;
+        }
+        
+        public String toString() {
+            return "Car " + this.id + ": " + this.message;
+        }
+    }
+    
+    
+    private static class MoldBodyStage extends CPUBoundTestStage {
+        public MoldBodyStage(int id) {
+            super(id, 50);
+        }
+        
+        public void process(Object obj) throws StageException {
+            Car car = (Car) obj;
+            if ("I'm a hunk of metal!".equals(car.message)) {
+                car.message = "Now I'm a car body!";
+                super.process(car);
+            } else {
+                throw new StageException(this, "Whoa! " + obj);
+            }
+        }
+    }
+    
+    private static class BodyPaintStage extends IOBoundTestStage {
+        public BodyPaintStage(int id) {
+            super(id, 10, 250);
+        }
+        
+        public void process(Object obj) throws StageException {
+            Car car = (Car) obj;
+            if ("Now I'm a car body!".equals(car.message)) {
+                car.message = "I've been painted!";
+                super.process(car);
+            } else {
+                throw new StageException(this, "Whoa! " + obj);
+            }
+        }
+    }
+    
+    private static class AddWheelsStage extends TestStage {
+        public AddWheelsStage(int id) {
+            super(id);
+        }
+        
+        public void process(Object obj) throws StageException {
+            Car car = (Car) obj;
+            if ("I've been painted!".equals(car.message)) {
+                car.message = "Got my wheels!";
+                super.process(car);
+            } else {
+                throw new StageException(this, "Whoa! " + obj);
+            }
+        }
+    }
+    
+    private static class AddAIStage extends CPUBoundTestStage {
+        public AddAIStage(int id) {
+            super(id, 500);
+        }
+        
+        public void process(Object obj) throws StageException {
+            Car car = (Car) obj;
+            if ("Got my wheels!".equals(car.message)) {
+                car.message = "I AM ALIVE!";
+                super.process(car);
+            } else {
+                throw new StageException(this, "Whoa! " + obj);
+            }
+        }
+    }
+    
+}

Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.*;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+public class WallClockThresholdDriverControlStrategyTest extends TestCase {
+    
+    public WallClockThresholdDriverControlStrategyTest(String testName) {
+        super(testName);
+    }
+
+    protected void setUp() throws Exception {
+    }
+
+    protected void tearDown() throws Exception {
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite(WallClockThresholdDriverControlStrategyTest.class);
+        
+        return suite;
+    }
+
+    public void testCPUBoundControl() throws Exception
+    {
+        System.out.println( "WallClock: testCPUBoundControl");
+        CountingDriverController controller = new CountingDriverController();
+        controller.setMinimumEventsToHandle( 10 );
+        controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );
+        PrioritizableStageDriverTestUtils.testDriverControllerCPUBound( 
+                this, 
+                new BalancedPoolStageDriverFactory(), 
+                controller );
+    }
+    
+    public void testIOBoundControl() throws Exception
+    {
+        System.out.println( "WallClock: testIOBoundControl");
+        CountingDriverController controller = new CountingDriverController();
+        controller.setMinimumEventsToHandle( 10 );
+        controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );
+        PrioritizableStageDriverTestUtils.testDriverControllerIOBound( 
+                this, 
+                new BalancedPoolStageDriverFactory(), 
+                controller );
+    }
+}

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java Wed Sep 22 21:57:01 2010
@@ -19,9 +19,9 @@ package org.apache.commons.pipeline.stag
 
 import java.util.ArrayList;
 import java.util.List;
-import junit.framework.*;
-import org.apache.commons.pipeline.testFramework.TestFeeder;
-import org.apache.commons.pipeline.testFramework.TestStageContext;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
 
 /**
  * Test cases for AddToCollectionStage

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java Wed Sep 22 21:57:01 2010
@@ -17,9 +17,8 @@
 
 package org.apache.commons.pipeline.stage;
 
-import junit.framework.*;
-import org.apache.commons.pipeline.testFramework.TestFeeder;
-import org.apache.commons.pipeline.testFramework.TestStageContext;
+import junit.framework.Test;
+import junit.framework.TestSuite;
 
 /**
  * Test cases for DynamicLookupStaticMethodStage.

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java Wed Sep 22 21:57:01 2010
@@ -21,7 +21,11 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.net.URL;
-import junit.framework.*;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
 import org.apache.commons.pipeline.testFramework.TestFeeder;
 import org.apache.commons.pipeline.testFramework.TestStageContext;
 

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java Wed Sep 22 21:57:01 2010
@@ -18,7 +18,9 @@
 package org.apache.commons.pipeline.stage;
 
 import java.io.InputStream;
-import junit.framework.*;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
 
 /**
  * Test cases for InputStreamLineBreakStage.

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java Wed Sep 22 21:57:01 2010
@@ -18,9 +18,10 @@
 
 package org.apache.commons.pipeline.stage;
 
-import junit.framework.*;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
 
 /**
  *

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java Wed Sep 22 21:57:01 2010
@@ -18,10 +18,9 @@
 package org.apache.commons.pipeline.stage;
 
 import java.lang.reflect.Method;
-import java.util.ArrayList;
-import junit.framework.*;
-import org.apache.commons.pipeline.Pipeline;
-import org.apache.commons.pipeline.driver.SynchronousStageDriver;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
 
 
 /**

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java Wed Sep 22 21:57:01 2010
@@ -49,6 +49,8 @@ public class KeyWaitBufferStageTest exte
      * data waiting for notify() to be called with an appropriate event.
      */
     public void testProcessAndNotify() throws Exception {
+        System.out.println("notify");
+        
         String obj = "Hello, World!";
         KeyFactory<Object,Integer> keyFactory = new KeyFactory.HashKeyFactory();
         EventObject ev = new KeyAvailableEvent<Integer>(this, keyFactory.generateKey(obj));

Added: commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml (added)
+++ commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml Wed Sep 22 21:57:01 2010
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+-->
+
+<pipeline>
+    <listener className="org.apache.commons.pipeline.listener.ObjectProcessedEventCounter"/>
+    
+    <driverFactory className="org.apache.commons.pipeline.driver.control.BalancedPoolStageDriverFactory" id="f1">
+        <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
+                  capacity="10" fair="false"/>
+    </driverFactory>
+    
+    <lifecycleJob className="org.apache.commons.pipeline.driver.control.CountingDriverController">
+        
+    </lifecycleJob>
+    
+    <env>
+        <object className="java.util.Date" key="testDate"/>
+        <value key="testEnvVar">Hello, World!</value>
+    </env>
+    
+    <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="f1"
+           filePattern=".*\.java" />
+    
+    <stage className="org.apache.commons.pipeline.stage.LogStage" driverFactoryId="f1" />
+    
+    <stage className="org.apache.commons.pipeline.stage.RaiseEventStage" driverFactoryId="f1" />
+    
+    <stage className="org.apache.commons.pipeline.stage.RaiseKeyAvailableEventStage" driverFactoryId="f1">
+        <property propName="keyFactory" className="org.apache.commons.pipeline.util.KeyFactory$HashKeyFactory"/>      
+    </stage> 
+    
+    <stage className="org.apache.commons.pipeline.stage.KeyWaitBufferStage" driverFactoryId="f1">
+        <property propName="keyFactory" className="org.apache.commons.pipeline.util.KeyFactory$HashKeyFactory"/>      
+        <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
+                  capacity="10" fair="false"/>
+    </stage>
+    
+    <stage className="org.apache.commons.pipeline.stage.LogStage" driverFactoryId="f1" />
+    
+    <feed>
+        <value>src/main/java</value>
+        <value>src/test/java</value>    
+    </feed>
+</pipeline>