You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by nu...@apache.org on 2010/09/22 23:57:03 UTC

svn commit: r1000251 [1/2] - in /commons/sandbox/pipeline/trunk: ./ src/main/java/org/apache/commons/pipeline/ src/main/java/org/apache/commons/pipeline/config/ src/main/java/org/apache/commons/pipeline/driver/ src/main/java/org/apache/commons/pipeline...

Author: nuttycom
Date: Wed Sep 22 21:57:01 2010
New Revision: 1000251

URL: http://svn.apache.org/viewvc?rev=1000251&view=rev
Log:
Modifications for automated driver control.

Added:
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java
      - copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java
      - copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java
      - copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java
      - copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java
      - copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
    commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml
Modified:
    commons/sandbox/pipeline/trunk/NOTICE.txt
    commons/sandbox/pipeline/trunk/pom.xml
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java
    commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java

Modified: commons/sandbox/pipeline/trunk/NOTICE.txt
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/NOTICE.txt?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/NOTICE.txt (original)
+++ commons/sandbox/pipeline/trunk/NOTICE.txt Wed Sep 22 21:57:01 2010
@@ -1,4 +1,4 @@
-Apache Commons Pipeline
+Apache Jakarta Commons Pipeline
 Copyright 2004-2006 The Apache Software Foundation
 
 This product includes software developed by

Modified: commons/sandbox/pipeline/trunk/pom.xml
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/pom.xml?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/pom.xml (original)
+++ commons/sandbox/pipeline/trunk/pom.xml Wed Sep 22 21:57:01 2010
@@ -82,6 +82,15 @@ limitations under the License.
           <locales>en</locales>
         </configuration>
       </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+            <excludes>
+                <exclude>**/driver/control/**</exclude>
+                <exclude>**/Abstract*Test.java</exclude>
+            </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
   <repositories>

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java Wed Sep 22 21:57:01 2010
@@ -24,6 +24,8 @@ package org.apache.commons.pipeline;
  * for subsequent stages. Each {@link StageDriver} implementation will
  * ordinarily provide a custom Feeder implementation that integrates receiving
  * objects with its internal stage processing workflow.
+ *
+ *
  */
 public interface Feeder {
     /**

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java Wed Sep 22 21:57:01 2010
@@ -24,10 +24,11 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+
 import org.apache.commons.digester.Digester;
 import org.apache.commons.digester.RuleSet;
-import org.apache.commons.pipeline.PipelineCreationException;
 import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.PipelineCreationException;
 import org.xml.sax.SAXException;
 
 

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java Wed Sep 22 21:57:01 2010
@@ -142,6 +142,14 @@ public class ThreadPoolStageDriver exten
             startSignal.countDown();
             
             log.debug("Worker threads for stage " + stage + " started.");
+            
+            //the following appears to be superfluous, since the state was already set to running.
+//            //wait to ensure that the stage starts up correctly
+//            try {
+//                while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
+//            } catch (InterruptedException e) {
+//                throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
+//            }
         } else {
             throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
         }
@@ -290,7 +298,7 @@ public class ThreadPoolStageDriver exten
                 recordFatalError(e);
                 setState(ERROR);
             } catch (InterruptedException e) {
-                log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e);
+                log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier",e);
                 recordFatalError(e);
                 setState(ERROR);
             } finally {

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.ArrayList;
+import java.util.EventObject;
+import java.util.List;
+import org.apache.commons.pipeline.PipelineLifecycleJob;
+import org.apache.commons.pipeline.StageEventListener;
+
+/**
+ *
+ */
+public abstract class AbstractDriverController implements StageEventListener {
+    
+     /**
+     * This list contains the PriorityStageDriver instances that are being
+     * managed by the controller.
+     */
+    protected List<PrioritizableStageDriver> drivers = new ArrayList<PrioritizableStageDriver>();
+    
+    protected List<StageProcessTimingEvent> events = new ArrayList<StageProcessTimingEvent>();    
+        
+    protected DriverControlStrategy driverControl;
+	
+   /** Creates a new instance of AbstractDriverController */
+    public AbstractDriverController() {
+    }
+
+    
+    public void addManagedStageDriver(PrioritizableStageDriver driver) {
+        this.drivers.add(driver);
+    }
+
+    
+    public synchronized void notify(EventObject ev) {
+        if (ev instanceof StageProcessTimingEvent) {
+            events.add((StageProcessTimingEvent) ev);
+            notifyAll();
+        }
+    }
+
+    
+    public void setDriverControlStrategy(DriverControlStrategy driverControl) {
+    	this.driverControl = driverControl;
+    }
+    
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+/**
+ *
+ */
+public abstract class AbstractPrioritizableStageDriver extends AbstractStageDriver implements PrioritizableStageDriver {
+
+    /** Creates a new instance of AbstractPriorityStageDriver */
+    public AbstractPrioritizableStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
+        super(stage, context, faultTolerance);
+    }
+    
+    protected void process(Object obj) throws StageException {
+        long start = System.currentTimeMillis();
+        this.stage.process(obj);
+        context.raise(new StageProcessTimingEvent(this.stage, System.currentTimeMillis() - start));
+    }
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.*;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+
+public class BalancedPoolStageDriver extends AbstractPrioritizableStageDriver {
+    public enum Runnability {RUNNABLE, STOPPABLE, NOT_RUNNABLE};
+    
+    private Log log = LogFactory.getLog(BalancedPoolStageDriver.class);
+    
+    //signal telling threads to start polling queue
+    private final CountDownLatch startSignal;
+    
+    //counter for worker identity assignment
+    private int nextWorkerId = 0;
+    
+    //queue of worker instances - each is associated with a running thread
+    private final Queue<BalancedWorker> workers = new ConcurrentLinkedQueue<BalancedWorker>();
+    
+    //number of threads to start initially
+    private int initialThreads;
+    
+    //wait timeout to ensure deadlock cannot occur on thread termination
+    private long timeout;
+    
+    //units for timeout wait
+    private TimeUnit timeoutTimeUnit;
+    
+    //feeder instance for the stage
+    private final SwitchingFeeder feeder;
+    
+    /**
+     * This Feeder implementation will switch between synchronous and multithreaded
+     * processing depending upon how many worker threads are available.
+     */
+    private class SwitchingFeeder implements Feeder {
+        //queue to hold data to be processed.
+        private final BlockingQueue queue;
+        
+        public SwitchingFeeder(BlockingQueue queue) {
+            this.queue = queue;
+        }
+        
+        public void feed(Object obj) {
+            synchronized(BalancedPoolStageDriver.this) {
+                if (    !isInState(RUNNING, STOP_REQUESTED)
+                        || workers.size() > 1
+                        || (workers.size() == 1 && workers.peek().runnability == Runnability.RUNNABLE)) {
+                    try {
+                        if (log.isDebugEnabled()) log.debug(stage + ": Queueing object: " + obj);
+                        this.queue.put(obj);
+                    } catch (InterruptedException e) {
+                        throw new Error("Assertion failure: thread interrupted while attempting to enqueue data object.", e);
+                    }
+                    
+                    return; //short circuit out of here
+                }
+            }
+            
+            try {
+                if (log.isDebugEnabled()) log.debug(stage + ":Processing object directly: " + obj);
+                BalancedPoolStageDriver.this.process(obj);
+            } catch (StageException e) {
+                recordProcessingException(obj, e);
+                if (faultTolerance == FaultTolerance.NONE) throw fatalError(e);
+            }
+        }
+    };
+    
+    /**
+     * This StageDriver implementation runs stage processing in a pool of threads
+     *
+     */
+    public BalancedPoolStageDriver(Stage stage, StageContext context, BlockingQueueFactory queueFactory,
+            int initialThreads, FaultTolerance faultTolerance,
+            long timeout, TimeUnit timeoutTimeUnit) {
+        super(stage, context, faultTolerance);
+        
+        this.feeder = new SwitchingFeeder(queueFactory.createQueue());
+        this.startSignal = new CountDownLatch(1);
+        this.initialThreads = initialThreads;
+        this.timeout = timeout;
+        this.timeoutTimeUnit = timeoutTimeUnit;
+    }
+    
+    /**
+     * Accessor method for this stage's Feeder.
+     * @return the Feeder for this stage.
+     */
+    public Feeder getFeeder() {
+        return this.feeder;
+    }
+    
+    public void start() throws StageException {
+        if (this.currentState == STOPPED) {
+            setState(STARTED);
+            
+            if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
+            stage.preprocess();
+            if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
+            
+            log.debug("Starting worker threads for stage " + stage + ".");
+            this.addWorkers(initialThreads);
+            
+            // let threads know they can start
+            testAndSetState(STARTED, RUNNING);
+            startSignal.countDown();
+            
+            log.debug("Worker threads for stage " + stage + " started.");
+        } else {
+            throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
+        }
+        
+    }
+    
+    public void finish() throws StageException {
+        if (this.currentState == STOPPED) {
+            throw new IllegalStateException("The driver is not currently running.");
+        }
+        
+        try {
+            //it may be the case that finish() is called when the driver is still in the process
+            //of starting up, so it is necessary to wait to enter the running state before
+            //a stop can be requested
+            while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(this.timeout);
+            
+            //ask the worker threads to shut down
+            testAndSetState(RUNNING, STOP_REQUESTED);
+            
+            if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + ".");
+            while (!workers.isEmpty()) {
+                BalancedWorker worker = workers.remove();
+                worker.awaitCompletion();
+            }
+            if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
+            
+            //transition into finished state (not used internally?)
+            testAndSetState(STOP_REQUESTED, FINISHED);
+            
+            //do not run postprocessing if the driver is in an error state
+            if (this.currentState != ERROR) {
+                if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
+                this.stage.postprocess();
+                if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
+            }
+        } catch (StageException e) {
+            log.error("An error occurred during postprocessing of stage " + stage , e);
+            recordFatalError(e);
+            setState(ERROR);
+        } catch (InterruptedException e) {
+            throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
+        } finally {
+            if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
+            stage.release();
+            if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
+        }
+        
+        testAndSetState(FINISHED, STOPPED);
+    }
+    
+    /**
+     * This method obtains a lock to set the current state of processing
+     * to error, records the error and returns a RuntimeException encapsulating
+     * the specified throwable.
+     */
+    private RuntimeException fatalError(Throwable t) {
+        try {
+            setState(ERROR);
+            this.recordFatalError(t);
+            stage.release();
+            this.notifyAll();
+        } catch (Exception e) {
+            this.recordFatalError(e);
+        }
+        
+        return new RuntimeException("Fatal error halted processing of stage: " + stage);
+    }
+    
+    /**
+     *
+     */
+    private synchronized void addWorkers(int count) {
+        while (count-- > 0) {
+            BalancedWorker worker = new BalancedWorker(nextWorkerId, this.feeder.queue);
+            Thread workerThread = new Thread(worker);
+            workers.add(worker);
+            nextWorkerId++;
+            workerThread.start();
+        }
+    }
+    
+    /**
+     *
+     */
+    private synchronized void removeWorkers(int count) throws InterruptedException {
+        while (count-- > 0 && !workers.isEmpty()) {
+            if (workers.size() > 1) {
+            BalancedWorker worker = workers.remove();
+                worker.deactivate(false);
+            worker.awaitCompletion();
+            } else {
+                BalancedWorker worker = workers.peek();
+                worker.deactivate(true);
+            }
+        }
+    }
+    
+    /**
+     * Increases the priority of the managed stage by increasing the number of
+     * threads in which the stage is running.
+     */
+    public void increasePriority(double amount) {
+        this.addWorkers((int) amount);
+    }
+    
+    /**
+     * Decreases the priority of the managed stage by decreasing the number of
+     * threads in which the stage is running.
+     */
+    public void decreasePriority(double amount) {
+        try {
+            this.removeWorkers((int) amount);
+        } catch (InterruptedException e) {
+            throw new Error("Assertion failure: interrupted while awaiting worker thread stop for pool size reduction.", e);
+        }
+    }
+    
+    public double getPriority()
+    {
+        return getWorkerCount();
+    }
+    
+    /**
+     * Return the nuber of worker threads currently allocated.
+     */
+    public synchronized int getWorkerCount() {
+        return this.workers.size();
+    }
+    
+    /**
+     * The worker thread
+     */
+    private class BalancedWorker implements Runnable {
+        private volatile Runnability runnability = Runnability.RUNNABLE;
+        private final int workerId;
+        private final BlockingQueue queue;
+        private final CountDownLatch doneSignal;
+        
+        public BalancedWorker(int workerId, BlockingQueue queue) {
+            this.workerId  = workerId;
+            this.queue = queue;
+            this.doneSignal = new CountDownLatch(1);
+        }
+        
+        public void run() {
+            try {
+                BalancedPoolStageDriver.this.startSignal.await();
+                
+                running: while (runnability != Runnability.NOT_RUNNABLE && currentState != ERROR) {
+                    try {
+                        Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
+                        if (obj == null) {
+                            if (currentState == STOP_REQUESTED || runnability == Runnability.STOPPABLE) break running;
+                            //else continue running;
+                        } else {
+                            try {
+                                if (log.isDebugEnabled()) log.debug(stage + ": processing asynchronously: " + obj);
+                                BalancedPoolStageDriver.this.process(obj);
+                            } catch (StageException e) {
+                                recordProcessingException(obj, e);
+                                if (faultTolerance == NONE) throw e;
+                            } catch (RuntimeException e) {
+                                recordProcessingException(obj, e);
+                                if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
+                            }
+                        }
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Worker thread " + this.workerId + " unexpectedly interrupted while waiting on data for stage " + stage, e);
+                    }
+                }
+            } catch (StageException e) {
+                log.error("An error occurred in the stage " + stage + " (workerID: " + this.workerId + ")", e);
+                recordFatalError(e);
+                setState(ERROR);
+            } catch (InterruptedException e) {
+                log.error("Stage " + stage + " (workerId: " + workerId + ") interrupted while waiting for barrier", e);
+                recordFatalError(e);
+                setState(ERROR);
+            } finally {
+                doneSignal.countDown();
+            }
+        }
+        
+        public void deactivate(boolean waitForQueue) {
+            if (waitForQueue) {
+                this.runnability = Runnability.STOPPABLE;
+            } else {
+                this.runnability = Runnability.NOT_RUNNABLE;
+            }
+        }
+        
+        public void awaitCompletion() throws InterruptedException {
+            this.doneSignal.await();
+        }
+    }
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriverFactory;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+
+/**
+ *
+ */
+public class BalancedPoolStageDriverFactory implements PrioritizableStageDriverFactory<BalancedPoolStageDriver> {
+    
+    /** Creates a new instance of BalancedPoolStageDriverFactory */
+    public BalancedPoolStageDriverFactory() {
+    }
+
+    public BalancedPoolStageDriver createStageDriver(Stage stage, StageContext context) {
+        return new BalancedPoolStageDriver(stage, context, queueFactory, initialThreads, faultTolerance, timeout, timeoutTimeUnit);
+    }
+
+    /**
+     * Holds value of property initialThreads.
+     */
+    private int initialThreads = 0;
+
+    /**
+     * Getter for property initialThreads.
+     * @return Value of property initialThreads.
+     */
+    public int getInitialThreads() {
+        return this.initialThreads;
+    }
+
+    /**
+     * Setter for property initialThreads.
+     * @param initialThreads New value of property initialThreads.
+     */
+    public void setInitialThreads(int initialThreads) {
+        this.initialThreads = initialThreads;
+    }
+
+    /**
+     * Holds value of property queueFactory.
+     */
+    private BlockingQueueFactory queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory();
+
+    /**
+     * Getter for property queueFactory.
+     * @return Value of property queueFactory.
+     */
+    public BlockingQueueFactory getQueueFactory() {
+        return this.queueFactory;
+    }
+
+    /**
+     * Setter for property queueFactory.
+     * @param queueFactory New value of property queueFactory.
+     */
+    public void setQueueFactory(BlockingQueueFactory queueFactory) {
+        this.queueFactory = queueFactory;
+    }
+
+    /**
+     * Holds value of property faultTolerance.
+     */
+    private FaultTolerance faultTolerance = FaultTolerance.NONE;
+
+    /**
+     * Getter for property faultTolerance.
+     * @return Value of property faultTolerance.
+     */
+    public FaultTolerance getFaultTolerance() {
+        return this.faultTolerance;
+    }
+
+    /**
+     * Setter for property faultTolerance.
+     * @param faultTolerance New value of property faultTolerance.
+     */
+    public void setFaultTolerance(FaultTolerance faultTolerance) {
+        this.faultTolerance = faultTolerance;
+    }
+
+    /**
+     * Holds value of property timeout.
+     */
+    private long timeout = 500;
+
+    /**
+     * Getter for property timeout.
+     * @return Value of property timeout.
+     */
+    public long getTimeout() {
+        return this.timeout;
+    }
+
+    /**
+     * Setter for property timeout.
+     * @param timeout New value of property timeout.
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
+     * Holds value of property timeoutTimeUnit.
+     */
+    private TimeUnit timeoutTimeUnit = TimeUnit.MILLISECONDS;
+
+    /**
+     * Getter for property timeoutTimeUnit.
+     * @return Value of property timeoutTimeUnit.
+     */
+    public TimeUnit getTimeoutTimeUnit() {
+        return this.timeoutTimeUnit;
+    }
+
+    /**
+     * Setter for property timeoutTimeUnit.
+     * @param timeoutTimeUnit New value of property timeoutTimeUnit.
+     */
+    public void setTimeoutTimeUnit(TimeUnit timeoutTimeUnit) {
+        this.timeoutTimeUnit = timeoutTimeUnit;
+    }
+    
+    /**
+     * Sets the initial priority of the driver instance in an implementation-specific
+     * manner.
+     * @param priority An arbitrary priority value. In this implementation, this
+     * corresponds directly to the number of threads initially assigned to the
+     * managed stage.
+     */
+    public void setInitialPriority(double priority) {
+        this.setInitialThreads((int) priority);
+    }
+    
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.ArrayList;
+import java.util.EventObject;
+import java.util.List;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.StageEventListener;
+import org.apache.commons.pipeline.PipelineLifecycleJob;
+
+/**
+ *
+ */
+public class CountingDriverController extends AbstractDriverController implements PipelineLifecycleJob {
+    
+    // flag used to signal that controller thread should stop
+    private volatile boolean running;
+    
+    /** Creates a new instance of AbstractPriorityController */
+    public CountingDriverController() { }
+    
+    public void onStart(Pipeline pipeline) {
+        if (pipeline != null) pipeline.registerListener(CountingDriverController.this);
+        running = true;
+        
+        new Thread() {
+            public void run() {
+                while (running) {
+                    List<StageProcessTimingEvent> eventsToHandle;
+                    synchronized(CountingDriverController.this) {
+                        while (events.size() < minimumEventsToHandle && running) {
+                            try {
+                                CountingDriverController.this.wait();
+                            } catch (InterruptedException e) {
+                                throw new Error("Assertion failure: interrupted while waiting for events", e);
+                            }
+                        }
+                        
+                        eventsToHandle = events;
+                        events = new ArrayList<StageProcessTimingEvent>();
+                    }
+                    
+                    driverControl.handleEvents(drivers, eventsToHandle);
+                }
+            }
+        }.start();
+    }
+    
+    /**
+     * Holds value of property minimumEventsToHandle.
+     */
+    private int minimumEventsToHandle;
+    
+    /**
+     * Getter for property minimumEventsToHandle.
+     * @return Value of property minimumEventsToHandle.
+     */
+    public int getMinimumEventsToHandle() {
+        return this.minimumEventsToHandle;
+    }
+    
+    /**
+     * Setter for property minimumEventsToHandle.
+     * @param minimumEventsToHandle New value of property minimumEventsToHandle.
+     */
+    public void setMinimumEventsToHandle(int minimumEventsToHandle) {
+        this.minimumEventsToHandle = minimumEventsToHandle;
+    }
+    
+    public void onFinish(Pipeline pipeline) {
+        this.running = false;
+    }
+}

Copied: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java?p2=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -15,19 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.commons.pipeline.testFramework;
+package org.apache.commons.pipeline.driver.control;
 
-import java.util.ArrayList;
 import java.util.List;
-import org.apache.commons.pipeline.*;
 
-/**
- * This feeder simply adds the received objects to a list.
- */
-public class TestFeeder implements Feeder {
-    public List<Object> receivedValues = new ArrayList<Object>();
-
-    public void feed(Object obj) {
-        this.receivedValues.add(obj);
-    }
-}
\ No newline at end of file
+public interface DriverControlStrategy {
+    public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events);
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.pipeline.Stage;
+
+/**
+ * An implementation of DriverControlStrategy that measures stage execution times
+ * and increases thread counts that are taking longer than other stages on average
+ *
+ * @author mirror
+ */
+public class EqualizingDriverControlStrategy implements DriverControlStrategy {
+    
+    private static class Tuple {
+        private int count = 0;
+        private long duration = 0;
+        
+        Tuple() { }
+        
+        public void add(long duration) {
+            count++;
+            this.duration += duration;
+        }
+    }
+    
+    /** Creates a new instance of EqualizingDriverControlStrategy */
+    public EqualizingDriverControlStrategy() {
+    }
+    
+    public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
+        if (events.isEmpty()) return;
+        
+        Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+        long total = 0;
+        for (StageProcessTimingEvent ev : events) {
+            Tuple tuple = timings.get((Stage) ev.getSource());
+            if (tuple == null) {
+                tuple = new Tuple();
+                timings.put((Stage) ev.getSource(), tuple);
+            }
+            
+            tuple.add(ev.getLatency());
+            total += ev.getLatency();
+        }
+        
+        //System.out.println("Events handled: " + events.size());
+        System.out.print("Stage latencies: ");
+        for (Map.Entry<Stage,Tuple> entry : timings.entrySet()) {
+            System.out.print(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; ");
+        }
+        System.out.println();
+        //System.out.println("Total latency: " + total);
+        
+        double mean = total / events.size();
+        //System.out.println("Mean latency: " + mean);
+        
+        for (PrioritizableStageDriver driver : drivers) {
+            Tuple tuple = timings.get(driver.getStage());
+            if (tuple != null) {
+            long averageDuration = tuple.duration / tuple.count;
+            if (averageDuration > mean + allowableDelta) {
+                    System.out.println("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+                driver.increasePriority(1);
+            } else if (averageDuration < mean - allowableDelta) {
+                driver.decreasePriority(1);
+                    System.out.println("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+                }
+            }
+        }
+    }
+
+    /**
+     * Holds value of property allowableDelta.
+     */
+    private long allowableDelta;
+
+    /**
+     * Getter for property allowableDelta.
+     * @return Value of property allowableDelta.
+     */
+    public long getAllowableDelta() {
+        return this.allowableDelta;
+    }
+
+    /**
+     * Setter for property allowableDelta.
+     * @param allowableDelta New value of property allowableDelta.
+     */
+    public void setAllowableDelta(long allowableDelta) {
+        this.allowableDelta = allowableDelta;
+    }
+    
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+/**
+ * This {@link StageDriver} implementation uses a pool of threads
+ * to process objects from an input queue.
+ */
+public class ExecutorStageDriver extends AbstractPrioritizableStageDriver {
+    private final Log log = LogFactory.getLog(ExecutorStageDriver.class);
+    
+    //executor service for parallel processing
+    private final ThreadPoolExecutor threadPoolExecutor;
+    
+    //executor service for synchronous processing
+    private final Executor directExecutor = new Executor(){
+        public void execute(Runnable r) { r.run(); }
+    };
+    
+    //signal that indicates it's okay to process objects
+    private final CountDownLatch startSignal;
+    
+    //reference to the executor that is currently in use by the feeder
+    private volatile Executor executor;
+    
+    //maximum number of threads in the pool
+    private int maxThreads;
+    
+    //average number of threads in the pool
+    private int coreThreads;
+    
+    //feeder used to feed data to this stage's queue
+    private final Feeder feeder = new Feeder() {
+        public void feed(final Object obj) {
+            if (isInState(ERROR)) throw new IllegalStateException("Stage " + stage + " is in state ERROR and is hence unable to process data.");
+            
+            executor.execute(new Runnable() {
+                public void run() {
+                    try {
+                        startSignal.await();
+                        ExecutorStageDriver.this.process(obj);
+                    } catch (InterruptedException e) {
+                        throw new Error("Assertion failure: interrupted while awaiting start signal.", e);
+                    } catch (StageException e) {
+                        recordProcessingException(obj, e);
+                        if (faultTolerance == NONE) setState(ERROR);
+                    } catch (RuntimeException e) {
+                        recordProcessingException(obj, e);
+                        if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
+                    }
+                }
+            });
+        }
+    };
+    
+    /**
+     * Creates a new ExecutorStageDriver.
+     *
+     * @param stage The stage that the driver will run
+     * @param context the context in which to run the stage
+     * @param queue The object queue to use for storing objects prior to processing. The
+     * default is {@link LinkedBlockingQueue}
+     * @param timeout The amount of time, in milliseconds, that the worker thread
+     * will wait before checking the processing state if no objects are available
+     * in the thread's queue.
+     * @param faultTolerance Flag determining the behavior of the driver when
+     * an error is encountered in execution of {@link Stage#process(Object)}.
+     * If this is set to false, any exception thrown during {@link Stage#process(Object)}
+     * will cause the worker thread to halt without executing {@link Stage#postprocess()}
+     * ({@link Stage#release()} will be called.)
+     * @param numThreads Number of threads that will be simultaneously reading from queue
+     */
+    public ExecutorStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance, int coreThreads, int maxThreads) {
+        super(stage, context, faultTolerance);
+        this.threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
+        this.threadPoolExecutor.setCorePoolSize(coreThreads);
+        this.threadPoolExecutor.setMaximumPoolSize(maxThreads);
+        this.threadPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+            public void rejectedExecution(Runnable command, ThreadPoolExecutor exec) {
+                ExecutorStageDriver.this.directExecutor.execute(command);
+            }
+        });
+        
+        this.executor = (maxThreads == 0) ? this.directExecutor : this.threadPoolExecutor;
+        this.startSignal = new CountDownLatch(1);
+    }
+    
+    /**
+     * Return the Feeder used to feed data to the queue of objects to be processed.
+     * @return The feeder for objects processed by this driver's stage.
+     */
+    public Feeder getFeeder() {
+        return this.feeder;
+    }
+    
+    /**
+     * Start the processing of the stage. Creates threads to poll items
+     * from queue.
+     * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
+     */
+    public synchronized void start() throws StageException {
+        if (this.currentState == STOPPED) {
+            setState(STARTED);
+            if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
+            this.stage.preprocess();
+            if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
+            
+            // let threads know they can start
+            testAndSetState(STARTED, RUNNING);
+            this.startSignal.countDown();
+        }
+    }
+    
+    /**
+     * Causes processing to shut down gracefully. Waits until all worker threads
+     * have completed.
+     * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
+     */
+    public synchronized void finish() throws StageException {
+        try {
+            testAndSetState(RUNNING, STOP_REQUESTED);
+            this.threadPoolExecutor.shutdown();
+            testAndSetState(STOP_REQUESTED, STOPPED);
+            
+            while (!this.threadPoolExecutor.isShutdown() && this.currentState != ERROR) this.wait();
+            
+            if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
+            this.stage.postprocess();
+            if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Unexpectedly interrupted while awaiting thread pool shutdown.", e);
+        } finally {
+            if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
+            stage.release();
+            if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
+            
+            testAndSetState(STOPPED, FINISHED);
+        }
+    }
+    
+    public synchronized void increasePriority(double amount) {
+        this.maxThreads += amount;
+        this.coreThreads += (int) (amount / 1.5);
+        this.threadPoolExecutor.setCorePoolSize(coreThreads);
+        this.threadPoolExecutor.setMaximumPoolSize(maxThreads);
+        if (this.executor == this.directExecutor || maxThreads > 0) {
+            this.executor = this.threadPoolExecutor;
+        }
+    }
+    
+    public synchronized void decreasePriority(double amount) {
+        this.maxThreads = (amount / 1.5 > maxThreads) ? 0 : maxThreads - (int) (amount / 1.5);
+        this.coreThreads = (amount > coreThreads) ? 0 : coreThreads - (int) amount;
+        this.threadPoolExecutor.setCorePoolSize(coreThreads);
+        this.threadPoolExecutor.setMaximumPoolSize(maxThreads);
+        if (maxThreads == 0) {
+            this.executor = this.directExecutor;
+        }
+    }
+    
+    public double getPriority()
+    {
+        return maxThreads;
+    }
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriverFactory;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+/**
+ *
+ */
+public class ExecutorStageDriverFactory implements PrioritizableStageDriverFactory<ExecutorStageDriver> {
+    
+    /** Creates a new instance of ExecutorStageDriverFactory */
+    public ExecutorStageDriverFactory() {
+    }
+
+    public ExecutorStageDriver createStageDriver(Stage stage, StageContext context) {
+        return new ExecutorStageDriver(stage, context, faultTolerance, coreThreads, maxThreads);
+    }
+
+    /**
+     * Holds value of property faultTolerance.
+     */
+    private FaultTolerance faultTolerance;
+
+    /**
+     * Getter for property faultTolerance.
+     * @return Value of property faultTolerance.
+     */
+    public FaultTolerance getFaultTolerance() {
+        return this.faultTolerance;
+    }
+
+    /**
+     * Setter for property faultTolerance.
+     * @param faultTolerance New value of property faultTolerance.
+     */
+    public void setFaultTolerance(FaultTolerance faultTolerance) {
+        this.faultTolerance = faultTolerance;
+    }
+
+    /**
+     * Holds value of property coreThreads.
+     */
+    private int coreThreads;
+
+    /**
+     * Getter for property coreThreads.
+     * @return Value of property coreThreads.
+     */
+    public int getCoreThreads() {
+        return this.coreThreads;
+    }
+
+    /**
+     * Setter for property coreThreads.
+     * @param coreThreads New value of property coreThreads.
+     */
+    public void setCoreThreads(int coreThreads) {
+        this.coreThreads = coreThreads;
+    }
+
+    /**
+     * Holds value of property maxThreads.
+     */
+    private int maxThreads;
+
+    /**
+     * Getter for property maxThreads.
+     * @return Value of property maxThreads.
+     */
+    public int getMaxThreads() {
+        return this.maxThreads;
+    }
+
+    /**
+     * Setter for property maxThreads.
+     * @param maxThreads New value of property maxThreads.
+     */
+    public void setMaxThreads(int maxThreads) {
+        this.maxThreads = maxThreads;
+    }
+    
+    public void setInitialPriority(double priority) {
+        this.setCoreThreads((int) priority);
+        this.setMaxThreads((int) priority);
+    }
+    
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.pipeline.Stage;
+
+/**
+ * An implementation of DriverControlStrategy that every so often experimentally
+ * increases and decreases priorities to see if performance is improved.  If
+ * a performance improvement is found, additional experiments are done in the
+ * same direction
+ *
+ * @author braeckel
+ */
+public class ExperimentalDriverControlStrategy implements DriverControlStrategy {
+    /**
+     * The minimum time difference (in percent) between different analyses of a stage for
+     * modifications to take place.  In other words, if a stage is stable with
+     * its current priority, no changes are made
+     */
+    private int minDifferencePercent = 3;
+
+
+    private enum Action {
+        Decrease { void execute( PrioritizableStageDriver driver ){driver.decreasePriority( 1 ); } },
+        Increase { void execute( PrioritizableStageDriver driver ){driver.increasePriority( 1 ); } },
+        None { void execute( PrioritizableStageDriver driver ){ /*do nothing*/ } };
+
+        abstract void execute(PrioritizableStageDriver driver);
+    }
+
+    private class Tuple {
+        private int count = 0;
+        private long duration = 0;
+        private Action lastAction = Action.None;
+
+        Tuple() { }
+
+        public void add(long duration) {
+            count++;
+            this.duration += duration;
+        }
+    }
+
+    private Map<Stage, Tuple> lastTimings = new HashMap<Stage,Tuple>();
+
+    /** Creates a new instance of EqualizingDriverControlStrategy */
+    public ExperimentalDriverControlStrategy() {
+    }
+
+    public ExperimentalDriverControlStrategy( int minDifferencePercent ){
+        if( minDifferencePercent < 0 || minDifferencePercent > 100 )
+        {
+          throw new IllegalArgumentException( "Minimum difference percent must be between 0 and 100" );
+        }
+        this.minDifferencePercent = minDifferencePercent;
+    }
+
+    public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
+        Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+        for (StageProcessTimingEvent ev : events) {
+            Tuple tuple = timings.get(ev.getSource());
+            if (tuple == null) {
+                tuple = new Tuple();
+                timings.put((Stage) ev.getSource(), tuple);
+            }
+
+            tuple.add(ev.getLatency());
+        }
+
+        for (PrioritizableStageDriver driver : drivers) {
+            Tuple mostRecentTiming = timings.get(driver.getStage());
+            Tuple previousTiming = lastTimings.get( driver.getStage() );
+            double avgMostRecentDuration = mostRecentTiming.duration / mostRecentTiming.count;
+            //first time around, try increasing priority
+            if( previousTiming == null )
+            {
+                mostRecentTiming.lastAction = Action.Increase;
+                driver.increasePriority( 1 );
+            }
+
+            if( previousTiming != null ){
+                double avgPreviousTiming = previousTiming.duration / previousTiming.count;
+                //if the performance has decreased significantly...
+                double timingDifference = avgPreviousTiming - avgMostRecentDuration;
+
+                System.out.println( "Performance went from "+avgPreviousTiming + " to "+avgMostRecentDuration +"("+timingDifference+")");
+                //if the timing difference was significant enough to work with...
+                double minDifference = avgPreviousTiming * (minDifferencePercent / 100.0);
+                if( Math.abs( timingDifference ) >= minDifference )
+                {
+                    //if the diff is positive, we have a performance improvement
+                    if( timingDifference > 0 )
+                    {
+                        //continue whatever we did last time to try and get further
+                        //improvement
+                        if( previousTiming.lastAction == Action.Increase ){
+                            driver.increasePriority( 1 );
+                            mostRecentTiming.lastAction = Action.Increase;
+                        }
+                        else if( previousTiming.lastAction == Action.Decrease ){
+                            driver.decreasePriority( 1 );
+                            mostRecentTiming.lastAction = Action.Decrease;
+                        }
+                        //there was no last action.  Try a random action
+                        else{
+                            System.out.println( "Significant performance change without a previous action: RANDOM action");
+                            Action randomAction = getRandomAction();
+                            mostRecentTiming.lastAction = randomAction;
+                            randomAction.execute( driver );
+                        }
+                    }
+                    //there was a performance degradation, reverse our last step
+                    else
+                    {
+                        //reverse whatever we did last time to try and get further
+                        //improvement
+                        if( previousTiming.lastAction == Action.Increase ){
+                            driver.decreasePriority( 1 );
+                            mostRecentTiming.lastAction = Action.Decrease;
+                        }
+                        else if( previousTiming.lastAction == Action.Decrease ){
+                            driver.increasePriority( 1 );
+                            mostRecentTiming.lastAction = Action.Increase;
+                        }
+                        //there was no last action.  Try a random action
+                        else{
+                            System.out.println( "Significant performance change without a previous action: RANDOM action");
+                            Action randomAction = getRandomAction();
+                            mostRecentTiming.lastAction = randomAction;
+                            randomAction.execute( driver );
+                        }
+                    }
+                }
+                else{
+                    mostRecentTiming.lastAction = Action.None;
+                }
+            }
+
+            System.out.println( "Action="+mostRecentTiming.lastAction+", current priority="+driver.getPriority() );
+            //take our most recent timings and roll them into the previous timings
+            lastTimings.put( driver.getStage(), mostRecentTiming );
+        }
+    }
+
+    private Action getRandomAction()
+    {
+        int val = new Random().nextInt();
+        if( val < 0 ) val *= -1;
+        int actionVal = val % 3;
+        switch( actionVal ){
+            case 0: return Action.None;
+            case 1: return Action.Increase;
+            case 2: return Action.Decrease;
+            default: throw new IllegalStateException();
+        }
+    }
+
+    /**
+     * Holds value of property allowableDelta.
+     */
+    private long allowableDelta;
+
+    /**
+     * Getter for property allowableDelta.
+     * @return Value of property allowableDelta.
+     */
+    public long getAllowableDelta() {
+        return this.allowableDelta;
+    }
+
+    /**
+     * Setter for property allowableDelta.
+     * @param allowableDelta New value of property allowableDelta.
+     */
+    public void setAllowableDelta(long allowableDelta) {
+        this.allowableDelta = allowableDelta;
+    }
+
+}

Copied: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java?p2=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java Wed Sep 22 21:57:01 2010
@@ -15,19 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.commons.pipeline.testFramework;
 
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.commons.pipeline.driver.control;
+
 import org.apache.commons.pipeline.*;
 
 /**
- * This feeder simply adds the received objects to a list.
+ *
  */
-public class TestFeeder implements Feeder {
-    public List<Object> receivedValues = new ArrayList<Object>();
-
-    public void feed(Object obj) {
-        this.receivedValues.add(obj);
-    }
-}
\ No newline at end of file
+public interface PrioritizableStageDriver extends StageDriver{
+    public void increasePriority(double amount);
+    public void decreasePriority(double amount);
+    public double getPriority();
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriverFactory;
+
+/**
+ * <p>This interface represents a factory that is used by a {@link Pipeline} to create
+ * a driver for a {@link Stage} when that stage is added to the pipeline. The factory
+ * pattern is used here to ensure that each stage is run by a unique driver
+ * instance.</p>
+ *
+ * <p>In order to guarantee that PrioritizableStageDriverFactory instances can be used
+ * effectively in configuration frameworks, each PrioritizableStageDriverFactory implementation
+ * <em>must</em> provide a no-argument constructor.
+ */
+public interface PrioritizableStageDriverFactory<T extends PrioritizableStageDriver> extends StageDriverFactory<T> {
+    /**
+     * This method is used to create a driver that will run the specified stage
+     * in the specified context.
+     * @param stage The stage to be run by the newly created driver.
+     * @param context The context in which the stage will be run
+     * @return The newly created driver
+     */
+    public T createStageDriver(Stage stage, StageContext context);
+        
+    /**
+     * Sets the initial priority of the driver to be created.
+     */
+    public void setInitialPriority(double priority);
+}

Copied: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java?p2=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java Wed Sep 22 21:57:01 2010
@@ -15,19 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.commons.pipeline.testFramework;
 
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.pipeline.*;
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.EventObject;
+import org.apache.commons.pipeline.Stage;
 
 /**
- * This feeder simply adds the received objects to a list.
+ *
  */
-public class TestFeeder implements Feeder {
-    public List<Object> receivedValues = new ArrayList<Object>();
-
-    public void feed(Object obj) {
-        this.receivedValues.add(obj);
+public class StageProcessTimingEvent extends EventObject {
+    private long latency;
+    
+    /** Creates a new instance of StageProcessTimingEvent */
+    public StageProcessTimingEvent(Stage source, long latency) {
+        super(source);
+        this.latency = latency;
     }
-}
\ No newline at end of file
+
+    public long getLatency() {
+        return latency;
+    }    
+}

Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.pipeline.Stage;
+
+/**
+ * An implementation of DriverControlStrategy that gauges performance by the
+ * number of queued objects for each stage.  Those stages with consistently high 
+ * numbers of queued objects have threads added.
+ */
+public class WallClockThresholdDriverControlStrategy implements DriverControlStrategy {
+    /**
+     * The threshold at which a stage is considered to be functioning correctly.
+     * If the time taken to process a stage is on average greater than this 
+     * number, more priority is added to the stage
+     */
+    private long thresholdMs = 500;
+    
+    private class Timing {
+        private int count = 0;
+        private long duration = 0;
+        
+        Timing() { }
+        
+        public void add(long duration) {
+            count++;
+            this.duration += duration;
+        }
+    }
+    
+    /** Creates a new instance of EqualizingDriverControlStrategy */
+    public WallClockThresholdDriverControlStrategy() {
+    }
+    
+    public WallClockThresholdDriverControlStrategy( int thresholdMs ){
+        this.thresholdMs = thresholdMs;
+    }
+    
+    public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
+        Map<Stage, Timing> timings = new HashMap<Stage,Timing>();
+        long total = 0;
+        for (StageProcessTimingEvent ev : events) {
+            Timing timing = timings.get((Stage) ev.getSource());
+            if (timing == null) {
+                timing = new Timing();
+                timings.put((Stage) ev.getSource(), timing);
+            }
+            
+            timing.add(ev.getLatency());
+            total += ev.getLatency();
+        }
+        
+        long mean = total / timings.size();
+        
+        for (PrioritizableStageDriver driver : drivers) {
+            Timing timing = timings.get(driver.getStage());
+            long averageDuration = timing.duration / timing.count;
+            if( averageDuration >= thresholdMs )
+            {
+                driver.increasePriority( 1 );
+            }
+        }
+    }
+
+    /**
+     * Holds value of property allowableDelta.
+     */
+    private long allowableDelta;
+
+    /**
+     * Getter for property allowableDelta.
+     * @return Value of property allowableDelta.
+     */
+    public long getAllowableDelta() {
+        return this.allowableDelta;
+    }
+
+    /**
+     * Setter for property allowableDelta.
+     * @param allowableDelta New value of property allowableDelta.
+     */
+    public void setAllowableDelta(long allowableDelta) {
+        this.allowableDelta = allowableDelta;
+    }
+    
+}

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java Wed Sep 22 21:57:01 2010
@@ -19,6 +19,7 @@ package org.apache.commons.pipeline.stag
 
 import java.util.Collection;
 import java.util.Collections;
+
 import org.apache.commons.pipeline.validation.ConsumedTypes;
 import org.apache.commons.pipeline.validation.ProducesConsumed;
 

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java Wed Sep 22 21:57:01 2010
@@ -19,6 +19,7 @@ package org.apache.commons.pipeline.stag
 
 import java.io.File;
 import java.util.regex.Pattern;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pipeline.StageException;

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java Wed Sep 22 21:57:01 2010
@@ -24,8 +24,9 @@ import java.io.OutputStream;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashSet;
-import java.util.regex.Pattern;
 import java.util.Set;
+import java.util.regex.Pattern;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.net.ftp.FTPClient;

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java Wed Sep 22 21:57:01 2010
@@ -21,7 +21,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import org.apache.commons.pipeline.stage.BaseStage;
+
 import org.apache.commons.pipeline.StageException;
 
 /**

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java Wed Sep 22 21:57:01 2010
@@ -25,6 +25,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+
 import org.apache.commons.pipeline.StageContext;
 import org.apache.commons.pipeline.StageEventListener;
 import org.apache.commons.pipeline.StageException;

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java Wed Sep 22 21:57:01 2010
@@ -17,11 +17,9 @@
 
 package org.apache.commons.pipeline.stage;
 
-import org.apache.commons.pipeline.StageException;
-import org.apache.commons.pipeline.stage.BaseStage;
-import java.util.Queue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.StageException;
 
 
 /**

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java Wed Sep 22 21:57:01 2010
@@ -23,10 +23,9 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Queue;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.stage.BaseStage;
 import org.apache.commons.pipeline.StageException;
 
 /**

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java Wed Sep 22 21:57:01 2010
@@ -19,7 +19,7 @@ package org.apache.commons.pipeline.test
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.*;
+import org.apache.commons.pipeline.StageException;
 import org.apache.commons.pipeline.validation.ConsumedTypes;
 import org.apache.commons.pipeline.validation.ProducesConsumed;