You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by nu...@apache.org on 2010/09/22 23:57:03 UTC
svn commit: r1000251 [1/2] - in /commons/sandbox/pipeline/trunk: ./
src/main/java/org/apache/commons/pipeline/
src/main/java/org/apache/commons/pipeline/config/
src/main/java/org/apache/commons/pipeline/driver/
src/main/java/org/apache/commons/pipeline...
Author: nuttycom
Date: Wed Sep 22 21:57:01 2010
New Revision: 1000251
URL: http://svn.apache.org/viewvc?rev=1000251&view=rev
Log:
Modifications for automated driver control.
Added:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java
- copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java
- copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java
- copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java
- copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java
- copied, changed from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml
Modified:
commons/sandbox/pipeline/trunk/NOTICE.txt
commons/sandbox/pipeline/trunk/pom.xml
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java
commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
Modified: commons/sandbox/pipeline/trunk/NOTICE.txt
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/NOTICE.txt?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/NOTICE.txt (original)
+++ commons/sandbox/pipeline/trunk/NOTICE.txt Wed Sep 22 21:57:01 2010
@@ -1,4 +1,4 @@
-Apache Commons Pipeline
+Apache Jakarta Commons Pipeline
Copyright 2004-2006 The Apache Software Foundation
This product includes software developed by
Modified: commons/sandbox/pipeline/trunk/pom.xml
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/pom.xml?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/pom.xml (original)
+++ commons/sandbox/pipeline/trunk/pom.xml Wed Sep 22 21:57:01 2010
@@ -82,6 +82,15 @@ limitations under the License.
<locales>en</locales>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/driver/control/**</exclude>
+ <exclude>**/Abstract*Test.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
</plugins>
</build>
<repositories>
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/Feeder.java Wed Sep 22 21:57:01 2010
@@ -24,6 +24,8 @@ package org.apache.commons.pipeline;
* for subsequent stages. Each {@link StageDriver} implementation will
* ordinarily provide a custom Feeder implementation that integrates receiving
* objects with its internal stage processing workflow.
+ *
+ *
*/
public interface Feeder {
/**
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java Wed Sep 22 21:57:01 2010
@@ -24,10 +24,11 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+
import org.apache.commons.digester.Digester;
import org.apache.commons.digester.RuleSet;
-import org.apache.commons.pipeline.PipelineCreationException;
import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.PipelineCreationException;
import org.xml.sax.SAXException;
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/ThreadPoolStageDriver.java Wed Sep 22 21:57:01 2010
@@ -142,6 +142,14 @@ public class ThreadPoolStageDriver exten
startSignal.countDown();
log.debug("Worker threads for stage " + stage + " started.");
+
+ //the following appears to be superfluous, since the state was already set to running.
+// //wait to ensure that the stage starts up correctly
+// try {
+// while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait();
+// } catch (InterruptedException e) {
+// throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
+// }
} else {
throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
}
@@ -290,7 +298,7 @@ public class ThreadPoolStageDriver exten
recordFatalError(e);
setState(ERROR);
} catch (InterruptedException e) {
- log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier", e);
+ log.error("Stage " + stage + " (threadID: " + threadID + ") interrupted while waiting for barrier",e);
recordFatalError(e);
setState(ERROR);
} finally {
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractDriverController.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.ArrayList;
+import java.util.EventObject;
+import java.util.List;
+import org.apache.commons.pipeline.PipelineLifecycleJob;
+import org.apache.commons.pipeline.StageEventListener;
+
+/**
+ *
+ */
+public abstract class AbstractDriverController implements StageEventListener {
+
+ /**
+ * This list contains the PriorityStageDriver instances that are being
+ * managed by the controller.
+ */
+ protected List<PrioritizableStageDriver> drivers = new ArrayList<PrioritizableStageDriver>();
+
+ protected List<StageProcessTimingEvent> events = new ArrayList<StageProcessTimingEvent>();
+
+ protected DriverControlStrategy driverControl;
+
+ /** Creates a new instance of AbstractDriverController */
+ public AbstractDriverController() {
+ }
+
+
+ public void addManagedStageDriver(PrioritizableStageDriver driver) {
+ this.drivers.add(driver);
+ }
+
+
+ public synchronized void notify(EventObject ev) {
+ if (ev instanceof StageProcessTimingEvent) {
+ events.add((StageProcessTimingEvent) ev);
+ notifyAll();
+ }
+ }
+
+
+ public void setDriverControlStrategy(DriverControlStrategy driverControl) {
+ this.driverControl = driverControl;
+ }
+
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/AbstractPrioritizableStageDriver.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.AbstractStageDriver;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+/**
+ *
+ */
+public abstract class AbstractPrioritizableStageDriver extends AbstractStageDriver implements PrioritizableStageDriver {
+
+ /** Creates a new instance of AbstractPriorityStageDriver */
+ public AbstractPrioritizableStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance) {
+ super(stage, context, faultTolerance);
+ }
+
+ protected void process(Object obj) throws StageException {
+ long start = System.currentTimeMillis();
+ this.stage.process(obj);
+ context.raise(new StageProcessTimingEvent(this.stage, System.currentTimeMillis() - start));
+ }
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriver.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.*;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+
+public class BalancedPoolStageDriver extends AbstractPrioritizableStageDriver {
+ public enum Runnability {RUNNABLE, STOPPABLE, NOT_RUNNABLE};
+
+ private Log log = LogFactory.getLog(BalancedPoolStageDriver.class);
+
+ //signal telling threads to start polling queue
+ private final CountDownLatch startSignal;
+
+ //counter for worker identity assignment
+ private int nextWorkerId = 0;
+
+ //queue of worker instances - each is associated with a running thread
+ private final Queue<BalancedWorker> workers = new ConcurrentLinkedQueue<BalancedWorker>();
+
+ //number of threads to start initially
+ private int initialThreads;
+
+ //wait timeout to ensure deadlock cannot occur on thread termination
+ private long timeout;
+
+ //units for timeout wait
+ private TimeUnit timeoutTimeUnit;
+
+ //feeder instance for the stage
+ private final SwitchingFeeder feeder;
+
+ /**
+ * This Feeder implementation will switch between synchronous and multithreaded
+ * processing depending upon how many worker threads are available.
+ */
+ private class SwitchingFeeder implements Feeder {
+ //queue to hold data to be processed.
+ private final BlockingQueue queue;
+
+ public SwitchingFeeder(BlockingQueue queue) {
+ this.queue = queue;
+ }
+
+ public void feed(Object obj) {
+ synchronized(BalancedPoolStageDriver.this) {
+ if ( !isInState(RUNNING, STOP_REQUESTED)
+ || workers.size() > 1
+ || (workers.size() == 1 && workers.peek().runnability == Runnability.RUNNABLE)) {
+ try {
+ if (log.isDebugEnabled()) log.debug(stage + ": Queueing object: " + obj);
+ this.queue.put(obj);
+ } catch (InterruptedException e) {
+ throw new Error("Assertion failure: thread interrupted while attempting to enqueue data object.", e);
+ }
+
+ return; //short circuit out of here
+ }
+ }
+
+ try {
+ if (log.isDebugEnabled()) log.debug(stage + ":Processing object directly: " + obj);
+ BalancedPoolStageDriver.this.process(obj);
+ } catch (StageException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == FaultTolerance.NONE) throw fatalError(e);
+ }
+ }
+ };
+
+ /**
+ * This StageDriver implementation runs stage processing in a pool of threads
+ *
+ */
+ public BalancedPoolStageDriver(Stage stage, StageContext context, BlockingQueueFactory queueFactory,
+ int initialThreads, FaultTolerance faultTolerance,
+ long timeout, TimeUnit timeoutTimeUnit) {
+ super(stage, context, faultTolerance);
+
+ this.feeder = new SwitchingFeeder(queueFactory.createQueue());
+ this.startSignal = new CountDownLatch(1);
+ this.initialThreads = initialThreads;
+ this.timeout = timeout;
+ this.timeoutTimeUnit = timeoutTimeUnit;
+ }
+
+ /**
+ * Accessor method for this stage's Feeder.
+ * @return the Feeder for this stage.
+ */
+ public Feeder getFeeder() {
+ return this.feeder;
+ }
+
+ public void start() throws StageException {
+ if (this.currentState == STOPPED) {
+ setState(STARTED);
+
+ if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
+ stage.preprocess();
+ if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
+
+ log.debug("Starting worker threads for stage " + stage + ".");
+ this.addWorkers(initialThreads);
+
+ // let threads know they can start
+ testAndSetState(STARTED, RUNNING);
+ startSignal.countDown();
+
+ log.debug("Worker threads for stage " + stage + " started.");
+ } else {
+ throw new IllegalStateException("Attempt to start driver in state " + this.currentState);
+ }
+
+ }
+
+ public void finish() throws StageException {
+ if (this.currentState == STOPPED) {
+ throw new IllegalStateException("The driver is not currently running.");
+ }
+
+ try {
+ //it may be the case that finish() is called when the driver is still in the process
+ //of starting up, so it is necessary to wait to enter the running state before
+ //a stop can be requested
+ while ( !(this.currentState == RUNNING || this.currentState == ERROR) ) this.wait(this.timeout);
+
+ //ask the worker threads to shut down
+ testAndSetState(RUNNING, STOP_REQUESTED);
+
+ if (log.isDebugEnabled()) log.debug("Waiting for worker threads to stop for stage " + stage + ".");
+ while (!workers.isEmpty()) {
+ BalancedWorker worker = workers.remove();
+ worker.awaitCompletion();
+ }
+ if (log.isDebugEnabled()) log.debug("Worker threads for stage " + stage + " halted");
+
+ //transition into finished state (not used internally?)
+ testAndSetState(STOP_REQUESTED, FINISHED);
+
+ //do not run postprocessing if the driver is in an error state
+ if (this.currentState != ERROR) {
+ if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
+ this.stage.postprocess();
+ if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
+ }
+ } catch (StageException e) {
+ log.error("An error occurred during postprocessing of stage " + stage , e);
+ recordFatalError(e);
+ setState(ERROR);
+ } catch (InterruptedException e) {
+ throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
+ } finally {
+ if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
+ stage.release();
+ if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
+ }
+
+ testAndSetState(FINISHED, STOPPED);
+ }
+
+ /**
+ * This method obtains a lock to set the current state of processing
+ * to error, records the error and returns a RuntimeException encapsulating
+ * the specified throwable.
+ */
+ private RuntimeException fatalError(Throwable t) {
+ try {
+ setState(ERROR);
+ this.recordFatalError(t);
+ stage.release();
+ this.notifyAll();
+ } catch (Exception e) {
+ this.recordFatalError(e);
+ }
+
+ return new RuntimeException("Fatal error halted processing of stage: " + stage);
+ }
+
+ /**
+ *
+ */
+ private synchronized void addWorkers(int count) {
+ while (count-- > 0) {
+ BalancedWorker worker = new BalancedWorker(nextWorkerId, this.feeder.queue);
+ Thread workerThread = new Thread(worker);
+ workers.add(worker);
+ nextWorkerId++;
+ workerThread.start();
+ }
+ }
+
+ /**
+ *
+ */
+ private synchronized void removeWorkers(int count) throws InterruptedException {
+ while (count-- > 0 && !workers.isEmpty()) {
+ if (workers.size() > 1) {
+ BalancedWorker worker = workers.remove();
+ worker.deactivate(false);
+ worker.awaitCompletion();
+ } else {
+ BalancedWorker worker = workers.peek();
+ worker.deactivate(true);
+ }
+ }
+ }
+
+ /**
+ * Increases the priority of the managed stage by increasing the number of
+ * threads in which the stage is running.
+ */
+ public void increasePriority(double amount) {
+ this.addWorkers((int) amount);
+ }
+
+ /**
+ * Decreases the priority of the managed stage by decreasing the number of
+ * threads in which the stage is running.
+ */
+ public void decreasePriority(double amount) {
+ try {
+ this.removeWorkers((int) amount);
+ } catch (InterruptedException e) {
+ throw new Error("Assertion failure: interrupted while awaiting worker thread stop for pool size reduction.", e);
+ }
+ }
+
+ public double getPriority()
+ {
+ return getWorkerCount();
+ }
+
+ /**
+ * Return the nuber of worker threads currently allocated.
+ */
+ public synchronized int getWorkerCount() {
+ return this.workers.size();
+ }
+
+ /**
+ * The worker thread
+ */
+ private class BalancedWorker implements Runnable {
+ private volatile Runnability runnability = Runnability.RUNNABLE;
+ private final int workerId;
+ private final BlockingQueue queue;
+ private final CountDownLatch doneSignal;
+
+ public BalancedWorker(int workerId, BlockingQueue queue) {
+ this.workerId = workerId;
+ this.queue = queue;
+ this.doneSignal = new CountDownLatch(1);
+ }
+
+ public void run() {
+ try {
+ BalancedPoolStageDriver.this.startSignal.await();
+
+ running: while (runnability != Runnability.NOT_RUNNABLE && currentState != ERROR) {
+ try {
+ Object obj = queue.poll(timeout, TimeUnit.MILLISECONDS);
+ if (obj == null) {
+ if (currentState == STOP_REQUESTED || runnability == Runnability.STOPPABLE) break running;
+ //else continue running;
+ } else {
+ try {
+ if (log.isDebugEnabled()) log.debug(stage + ": processing asynchronously: " + obj);
+ BalancedPoolStageDriver.this.process(obj);
+ } catch (StageException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == NONE) throw e;
+ } catch (RuntimeException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Worker thread " + this.workerId + " unexpectedly interrupted while waiting on data for stage " + stage, e);
+ }
+ }
+ } catch (StageException e) {
+ log.error("An error occurred in the stage " + stage + " (workerID: " + this.workerId + ")", e);
+ recordFatalError(e);
+ setState(ERROR);
+ } catch (InterruptedException e) {
+ log.error("Stage " + stage + " (workerId: " + workerId + ") interrupted while waiting for barrier", e);
+ recordFatalError(e);
+ setState(ERROR);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ public void deactivate(boolean waitForQueue) {
+ if (waitForQueue) {
+ this.runnability = Runnability.STOPPABLE;
+ } else {
+ this.runnability = Runnability.NOT_RUNNABLE;
+ }
+ }
+
+ public void awaitCompletion() throws InterruptedException {
+ this.doneSignal.await();
+ }
+ }
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverFactory.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriverFactory;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+
+/**
+ *
+ */
+public class BalancedPoolStageDriverFactory implements PrioritizableStageDriverFactory<BalancedPoolStageDriver> {
+
+ /** Creates a new instance of BalancedPoolStageDriverFactory */
+ public BalancedPoolStageDriverFactory() {
+ }
+
+ public BalancedPoolStageDriver createStageDriver(Stage stage, StageContext context) {
+ return new BalancedPoolStageDriver(stage, context, queueFactory, initialThreads, faultTolerance, timeout, timeoutTimeUnit);
+ }
+
+ /**
+ * Holds value of property initialThreads.
+ */
+ private int initialThreads = 0;
+
+ /**
+ * Getter for property initialThreads.
+ * @return Value of property initialThreads.
+ */
+ public int getInitialThreads() {
+ return this.initialThreads;
+ }
+
+ /**
+ * Setter for property initialThreads.
+ * @param initialThreads New value of property initialThreads.
+ */
+ public void setInitialThreads(int initialThreads) {
+ this.initialThreads = initialThreads;
+ }
+
+ /**
+ * Holds value of property queueFactory.
+ */
+ private BlockingQueueFactory queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory();
+
+ /**
+ * Getter for property queueFactory.
+ * @return Value of property queueFactory.
+ */
+ public BlockingQueueFactory getQueueFactory() {
+ return this.queueFactory;
+ }
+
+ /**
+ * Setter for property queueFactory.
+ * @param queueFactory New value of property queueFactory.
+ */
+ public void setQueueFactory(BlockingQueueFactory queueFactory) {
+ this.queueFactory = queueFactory;
+ }
+
+ /**
+ * Holds value of property faultTolerance.
+ */
+ private FaultTolerance faultTolerance = FaultTolerance.NONE;
+
+ /**
+ * Getter for property faultTolerance.
+ * @return Value of property faultTolerance.
+ */
+ public FaultTolerance getFaultTolerance() {
+ return this.faultTolerance;
+ }
+
+ /**
+ * Setter for property faultTolerance.
+ * @param faultTolerance New value of property faultTolerance.
+ */
+ public void setFaultTolerance(FaultTolerance faultTolerance) {
+ this.faultTolerance = faultTolerance;
+ }
+
+ /**
+ * Holds value of property timeout.
+ */
+ private long timeout = 500;
+
+ /**
+ * Getter for property timeout.
+ * @return Value of property timeout.
+ */
+ public long getTimeout() {
+ return this.timeout;
+ }
+
+ /**
+ * Setter for property timeout.
+ * @param timeout New value of property timeout.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * Holds value of property timeoutTimeUnit.
+ */
+ private TimeUnit timeoutTimeUnit = TimeUnit.MILLISECONDS;
+
+ /**
+ * Getter for property timeoutTimeUnit.
+ * @return Value of property timeoutTimeUnit.
+ */
+ public TimeUnit getTimeoutTimeUnit() {
+ return this.timeoutTimeUnit;
+ }
+
+ /**
+ * Setter for property timeoutTimeUnit.
+ * @param timeoutTimeUnit New value of property timeoutTimeUnit.
+ */
+ public void setTimeoutTimeUnit(TimeUnit timeoutTimeUnit) {
+ this.timeoutTimeUnit = timeoutTimeUnit;
+ }
+
+ /**
+ * Sets the initial priority of the driver instance in an implementation-specific
+ * manner.
+ * @param priority An arbitrary priority value. In this implementation, this
+ * corresponds directly to the number of threads initially assigned to the
+ * managed stage.
+ */
+ public void setInitialPriority(double priority) {
+ this.setInitialThreads((int) priority);
+ }
+
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/CountingDriverController.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.ArrayList;
+import java.util.EventObject;
+import java.util.List;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.StageEventListener;
+import org.apache.commons.pipeline.PipelineLifecycleJob;
+
+/**
+ *
+ */
+public class CountingDriverController extends AbstractDriverController implements PipelineLifecycleJob {
+
+ // flag used to signal that controller thread should stop
+ private volatile boolean running;
+
+ /** Creates a new instance of AbstractPriorityController */
+ public CountingDriverController() { }
+
+ public void onStart(Pipeline pipeline) {
+ if (pipeline != null) pipeline.registerListener(CountingDriverController.this);
+ running = true;
+
+ new Thread() {
+ public void run() {
+ while (running) {
+ List<StageProcessTimingEvent> eventsToHandle;
+ synchronized(CountingDriverController.this) {
+ while (events.size() < minimumEventsToHandle && running) {
+ try {
+ CountingDriverController.this.wait();
+ } catch (InterruptedException e) {
+ throw new Error("Assertion failure: interrupted while waiting for events", e);
+ }
+ }
+
+ eventsToHandle = events;
+ events = new ArrayList<StageProcessTimingEvent>();
+ }
+
+ driverControl.handleEvents(drivers, eventsToHandle);
+ }
+ }
+ }.start();
+ }
+
+ /**
+ * Holds value of property minimumEventsToHandle.
+ */
+ private int minimumEventsToHandle;
+
+ /**
+ * Getter for property minimumEventsToHandle.
+ * @return Value of property minimumEventsToHandle.
+ */
+ public int getMinimumEventsToHandle() {
+ return this.minimumEventsToHandle;
+ }
+
+ /**
+ * Setter for property minimumEventsToHandle.
+ * @param minimumEventsToHandle New value of property minimumEventsToHandle.
+ */
+ public void setMinimumEventsToHandle(int minimumEventsToHandle) {
+ this.minimumEventsToHandle = minimumEventsToHandle;
+ }
+
+ public void onFinish(Pipeline pipeline) {
+ this.running = false;
+ }
+}
Copied: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java?p2=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/DriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -15,19 +15,10 @@
* limitations under the License.
*/
-package org.apache.commons.pipeline.testFramework;
+package org.apache.commons.pipeline.driver.control;
-import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.pipeline.*;
-/**
- * This feeder simply adds the received objects to a list.
- */
-public class TestFeeder implements Feeder {
- public List<Object> receivedValues = new ArrayList<Object>();
-
- public void feed(Object obj) {
- this.receivedValues.add(obj);
- }
-}
\ No newline at end of file
+public interface DriverControlStrategy {
+ public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events);
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.pipeline.Stage;
+
+/**
+ * An implementation of DriverControlStrategy that measures stage execution times
+ * and increases thread counts that are taking longer than other stages on average
+ *
+ * @author mirror
+ */
+public class EqualizingDriverControlStrategy implements DriverControlStrategy {
+
+ private static class Tuple {
+ private int count = 0;
+ private long duration = 0;
+
+ Tuple() { }
+
+ public void add(long duration) {
+ count++;
+ this.duration += duration;
+ }
+ }
+
+ /** Creates a new instance of EqualizingDriverControlStrategy */
+ public EqualizingDriverControlStrategy() {
+ }
+
+ public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
+ if (events.isEmpty()) return;
+
+ Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+ long total = 0;
+ for (StageProcessTimingEvent ev : events) {
+ Tuple tuple = timings.get((Stage) ev.getSource());
+ if (tuple == null) {
+ tuple = new Tuple();
+ timings.put((Stage) ev.getSource(), tuple);
+ }
+
+ tuple.add(ev.getLatency());
+ total += ev.getLatency();
+ }
+
+ //System.out.println("Events handled: " + events.size());
+ System.out.print("Stage latencies: ");
+ for (Map.Entry<Stage,Tuple> entry : timings.entrySet()) {
+ System.out.print(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; ");
+ }
+ System.out.println();
+ //System.out.println("Total latency: " + total);
+
+ double mean = total / events.size();
+ //System.out.println("Mean latency: " + mean);
+
+ for (PrioritizableStageDriver driver : drivers) {
+ Tuple tuple = timings.get(driver.getStage());
+ if (tuple != null) {
+ long averageDuration = tuple.duration / tuple.count;
+ if (averageDuration > mean + allowableDelta) {
+ System.out.println("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+ driver.increasePriority(1);
+ } else if (averageDuration < mean - allowableDelta) {
+ driver.decreasePriority(1);
+ System.out.println("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+ }
+ }
+ }
+ }
+
+ /**
+ * Holds value of property allowableDelta.
+ */
+ private long allowableDelta;
+
+ /**
+ * Getter for property allowableDelta.
+ * @return Value of property allowableDelta.
+ */
+ public long getAllowableDelta() {
+ return this.allowableDelta;
+ }
+
+ /**
+ * Setter for property allowableDelta.
+ * @param allowableDelta New value of property allowableDelta.
+ */
+ public void setAllowableDelta(long allowableDelta) {
+ this.allowableDelta = allowableDelta;
+ }
+
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriver.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+/**
+ * This {@link StageDriver} implementation uses a pool of threads
+ * to process objects from an input queue.
+ */
+public class ExecutorStageDriver extends AbstractPrioritizableStageDriver {
+ private final Log log = LogFactory.getLog(ExecutorStageDriver.class);
+
+ //executor service for parallel processing
+ private final ThreadPoolExecutor threadPoolExecutor;
+
+ //executor service for synchronous processing
+ private final Executor directExecutor = new Executor(){
+ public void execute(Runnable r) { r.run(); }
+ };
+
+ //signal that indicates it's okay to process objects
+ private final CountDownLatch startSignal;
+
+ //reference to the executor that is currently in use by the feeder
+ private volatile Executor executor;
+
+ //maximum number of threads in the pool
+ private int maxThreads;
+
+ //average number of threads in the pool
+ private int coreThreads;
+
+ //feeder used to feed data to this stage's queue
+ private final Feeder feeder = new Feeder() {
+ public void feed(final Object obj) {
+ if (isInState(ERROR)) throw new IllegalStateException("Stage " + stage + " is in state ERROR and is hence unable to process data.");
+
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ startSignal.await();
+ ExecutorStageDriver.this.process(obj);
+ } catch (InterruptedException e) {
+ throw new Error("Assertion failure: interrupted while awaiting start signal.", e);
+ } catch (StageException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == NONE) setState(ERROR);
+ } catch (RuntimeException e) {
+ recordProcessingException(obj, e);
+ if (faultTolerance == CHECKED || faultTolerance == NONE) throw e;
+ }
+ }
+ });
+ }
+ };
+
+ /**
+ * Creates a new ExecutorStageDriver.
+ *
+ * @param stage The stage that the driver will run
+ * @param context the context in which to run the stage
+ * @param queue The object queue to use for storing objects prior to processing. The
+ * default is {@link LinkedBlockingQueue}
+ * @param timeout The amount of time, in milliseconds, that the worker thread
+ * will wait before checking the processing state if no objects are available
+ * in the thread's queue.
+ * @param faultTolerance Flag determining the behavior of the driver when
+ * an error is encountered in execution of {@link Stage#process(Object)}.
+ * If this is set to false, any exception thrown during {@link Stage#process(Object)}
+ * will cause the worker thread to halt without executing {@link Stage#postprocess()}
+ * ({@link Stage#release()} will be called.)
+ * @param numThreads Number of threads that will be simultaneously reading from queue
+ */
+ public ExecutorStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance, int coreThreads, int maxThreads) {
+ super(stage, context, faultTolerance);
+ this.threadPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
+ this.threadPoolExecutor.setCorePoolSize(coreThreads);
+ this.threadPoolExecutor.setMaximumPoolSize(maxThreads);
+ this.threadPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ public void rejectedExecution(Runnable command, ThreadPoolExecutor exec) {
+ ExecutorStageDriver.this.directExecutor.execute(command);
+ }
+ });
+
+ this.executor = (maxThreads == 0) ? this.directExecutor : this.threadPoolExecutor;
+ this.startSignal = new CountDownLatch(1);
+ }
+
+ /**
+ * Return the Feeder used to feed data to the queue of objects to be processed.
+ * @return The feeder for objects processed by this driver's stage.
+ */
+ public Feeder getFeeder() {
+ return this.feeder;
+ }
+
+ /**
+ * Start the processing of the stage. Creates threads to poll items
+ * from queue.
+ * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state during startup
+ */
+ public synchronized void start() throws StageException {
+ if (this.currentState == STOPPED) {
+ setState(STARTED);
+ if (log.isDebugEnabled()) log.debug("Preprocessing stage " + stage + "...");
+ this.stage.preprocess();
+ if (log.isDebugEnabled()) log.debug("Preprocessing for stage " + stage + " complete.");
+
+ // let threads know they can start
+ testAndSetState(STARTED, RUNNING);
+ this.startSignal.countDown();
+ }
+ }
+
+ /**
+ * Causes processing to shut down gracefully. Waits until all worker threads
+ * have completed.
+ * @throws org.apache.commons.pipeline.StageException Thrown if the driver is in an illegal state for shutdown.
+ */
+ public synchronized void finish() throws StageException {
+ try {
+ testAndSetState(RUNNING, STOP_REQUESTED);
+ this.threadPoolExecutor.shutdown();
+ testAndSetState(STOP_REQUESTED, STOPPED);
+
+ while (!this.threadPoolExecutor.isShutdown() && this.currentState != ERROR) this.wait();
+
+ if (log.isDebugEnabled()) log.debug("Postprocessing stage " + stage + "...");
+ this.stage.postprocess();
+ if (log.isDebugEnabled()) log.debug("Postprocessing for stage " + stage + " complete.");
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Unexpectedly interrupted while awaiting thread pool shutdown.", e);
+ } finally {
+ if (log.isDebugEnabled()) log.debug("Releasing resources for stage " + stage + "...");
+ stage.release();
+ if (log.isDebugEnabled()) log.debug("Stage " + stage + " released.");
+
+ testAndSetState(STOPPED, FINISHED);
+ }
+ }
+
+ public synchronized void increasePriority(double amount) {
+ this.maxThreads += amount;
+ this.coreThreads += (int) (amount / 1.5);
+ this.threadPoolExecutor.setCorePoolSize(coreThreads);
+ this.threadPoolExecutor.setMaximumPoolSize(maxThreads);
+ if (this.executor == this.directExecutor || maxThreads > 0) {
+ this.executor = this.threadPoolExecutor;
+ }
+ }
+
+ public synchronized void decreasePriority(double amount) {
+ this.maxThreads = (amount / 1.5 > maxThreads) ? 0 : maxThreads - (int) (amount / 1.5);
+ this.coreThreads = (amount > coreThreads) ? 0 : coreThreads - (int) amount;
+ this.threadPoolExecutor.setCorePoolSize(coreThreads);
+ this.threadPoolExecutor.setMaximumPoolSize(maxThreads);
+ if (maxThreads == 0) {
+ this.executor = this.directExecutor;
+ }
+ }
+
+ public double getPriority()
+ {
+ return maxThreads;
+ }
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverFactory.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriverFactory;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+/**
+ *
+ */
+public class ExecutorStageDriverFactory implements PrioritizableStageDriverFactory<ExecutorStageDriver> {
+
+ /** Creates a new instance of ExecutorStageDriverFactory */
+ public ExecutorStageDriverFactory() {
+ }
+
+ public ExecutorStageDriver createStageDriver(Stage stage, StageContext context) {
+ return new ExecutorStageDriver(stage, context, faultTolerance, coreThreads, maxThreads);
+ }
+
+ /**
+ * Holds value of property faultTolerance.
+ */
+ private FaultTolerance faultTolerance;
+
+ /**
+ * Getter for property faultTolerance.
+ * @return Value of property faultTolerance.
+ */
+ public FaultTolerance getFaultTolerance() {
+ return this.faultTolerance;
+ }
+
+ /**
+ * Setter for property faultTolerance.
+ * @param faultTolerance New value of property faultTolerance.
+ */
+ public void setFaultTolerance(FaultTolerance faultTolerance) {
+ this.faultTolerance = faultTolerance;
+ }
+
+ /**
+ * Holds value of property coreThreads.
+ */
+ private int coreThreads;
+
+ /**
+ * Getter for property coreThreads.
+ * @return Value of property coreThreads.
+ */
+ public int getCoreThreads() {
+ return this.coreThreads;
+ }
+
+ /**
+ * Setter for property coreThreads.
+ * @param coreThreads New value of property coreThreads.
+ */
+ public void setCoreThreads(int coreThreads) {
+ this.coreThreads = coreThreads;
+ }
+
+ /**
+ * Holds value of property maxThreads.
+ */
+ private int maxThreads;
+
+ /**
+ * Getter for property maxThreads.
+ * @return Value of property maxThreads.
+ */
+ public int getMaxThreads() {
+ return this.maxThreads;
+ }
+
+ /**
+ * Setter for property maxThreads.
+ * @param maxThreads New value of property maxThreads.
+ */
+ public void setMaxThreads(int maxThreads) {
+ this.maxThreads = maxThreads;
+ }
+
+ public void setInitialPriority(double priority) {
+ this.setCoreThreads((int) priority);
+ this.setMaxThreads((int) priority);
+ }
+
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.pipeline.Stage;
+
+/**
+ * An implementation of DriverControlStrategy that every so often experimentally
+ * increases and decreases priorities to see if performance is improved. If
+ * a performance improvement is found, additional experiments are done in the
+ * same direction
+ *
+ * @author braeckel
+ */
+public class ExperimentalDriverControlStrategy implements DriverControlStrategy {
+ /**
+ * The minimum time difference (in percent) between different analyses of a stage for
+ * modifications to take place. In other words, if a stage is stable with
+ * its current priority, no changes are made
+ */
+ private int minDifferencePercent = 3;
+
+
+ private enum Action {
+ Decrease { void execute( PrioritizableStageDriver driver ){driver.decreasePriority( 1 ); } },
+ Increase { void execute( PrioritizableStageDriver driver ){driver.increasePriority( 1 ); } },
+ None { void execute( PrioritizableStageDriver driver ){ /*do nothing*/ } };
+
+ abstract void execute(PrioritizableStageDriver driver);
+ }
+
+ private class Tuple {
+ private int count = 0;
+ private long duration = 0;
+ private Action lastAction = Action.None;
+
+ Tuple() { }
+
+ public void add(long duration) {
+ count++;
+ this.duration += duration;
+ }
+ }
+
+ private Map<Stage, Tuple> lastTimings = new HashMap<Stage,Tuple>();
+
+ /** Creates a new instance of EqualizingDriverControlStrategy */
+ public ExperimentalDriverControlStrategy() {
+ }
+
+ public ExperimentalDriverControlStrategy( int minDifferencePercent ){
+ if( minDifferencePercent < 0 || minDifferencePercent > 100 )
+ {
+ throw new IllegalArgumentException( "Minimum difference percent must be between 0 and 100" );
+ }
+ this.minDifferencePercent = minDifferencePercent;
+ }
+
+ public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
+ Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+ for (StageProcessTimingEvent ev : events) {
+ Tuple tuple = timings.get(ev.getSource());
+ if (tuple == null) {
+ tuple = new Tuple();
+ timings.put((Stage) ev.getSource(), tuple);
+ }
+
+ tuple.add(ev.getLatency());
+ }
+
+ for (PrioritizableStageDriver driver : drivers) {
+ Tuple mostRecentTiming = timings.get(driver.getStage());
+ Tuple previousTiming = lastTimings.get( driver.getStage() );
+ double avgMostRecentDuration = mostRecentTiming.duration / mostRecentTiming.count;
+ //first time around, try increasing priority
+ if( previousTiming == null )
+ {
+ mostRecentTiming.lastAction = Action.Increase;
+ driver.increasePriority( 1 );
+ }
+
+ if( previousTiming != null ){
+ double avgPreviousTiming = previousTiming.duration / previousTiming.count;
+ //if the performance has decreased significantly...
+ double timingDifference = avgPreviousTiming - avgMostRecentDuration;
+
+ System.out.println( "Performance went from "+avgPreviousTiming + " to "+avgMostRecentDuration +"("+timingDifference+")");
+ //if the timing difference was significant enough to work with...
+ double minDifference = avgPreviousTiming * (minDifferencePercent / 100.0);
+ if( Math.abs( timingDifference ) >= minDifference )
+ {
+ //if the diff is positive, we have a performance improvement
+ if( timingDifference > 0 )
+ {
+ //continue whatever we did last time to try and get further
+ //improvement
+ if( previousTiming.lastAction == Action.Increase ){
+ driver.increasePriority( 1 );
+ mostRecentTiming.lastAction = Action.Increase;
+ }
+ else if( previousTiming.lastAction == Action.Decrease ){
+ driver.decreasePriority( 1 );
+ mostRecentTiming.lastAction = Action.Decrease;
+ }
+ //there was no last action. Try a random action
+ else{
+ System.out.println( "Significant performance change without a previous action: RANDOM action");
+ Action randomAction = getRandomAction();
+ mostRecentTiming.lastAction = randomAction;
+ randomAction.execute( driver );
+ }
+ }
+ //there was a performance degradation, reverse our last step
+ else
+ {
+ //reverse whatever we did last time to try and get further
+ //improvement
+ if( previousTiming.lastAction == Action.Increase ){
+ driver.decreasePriority( 1 );
+ mostRecentTiming.lastAction = Action.Decrease;
+ }
+ else if( previousTiming.lastAction == Action.Decrease ){
+ driver.increasePriority( 1 );
+ mostRecentTiming.lastAction = Action.Increase;
+ }
+ //there was no last action. Try a random action
+ else{
+ System.out.println( "Significant performance change without a previous action: RANDOM action");
+ Action randomAction = getRandomAction();
+ mostRecentTiming.lastAction = randomAction;
+ randomAction.execute( driver );
+ }
+ }
+ }
+ else{
+ mostRecentTiming.lastAction = Action.None;
+ }
+ }
+
+ System.out.println( "Action="+mostRecentTiming.lastAction+", current priority="+driver.getPriority() );
+ //take our most recent timings and roll them into the previous timings
+ lastTimings.put( driver.getStage(), mostRecentTiming );
+ }
+ }
+
+ private Action getRandomAction()
+ {
+ int val = new Random().nextInt();
+ if( val < 0 ) val *= -1;
+ int actionVal = val % 3;
+ switch( actionVal ){
+ case 0: return Action.None;
+ case 1: return Action.Increase;
+ case 2: return Action.Decrease;
+ default: throw new IllegalStateException();
+ }
+ }
+
+ /**
+ * Holds value of property allowableDelta.
+ */
+ private long allowableDelta;
+
+ /**
+ * Getter for property allowableDelta.
+ * @return Value of property allowableDelta.
+ */
+ public long getAllowableDelta() {
+ return this.allowableDelta;
+ }
+
+ /**
+ * Setter for property allowableDelta.
+ * @param allowableDelta New value of property allowableDelta.
+ */
+ public void setAllowableDelta(long allowableDelta) {
+ this.allowableDelta = allowableDelta;
+ }
+
+}
Copied: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java?p2=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriver.java Wed Sep 22 21:57:01 2010
@@ -15,19 +15,16 @@
* limitations under the License.
*/
-package org.apache.commons.pipeline.testFramework;
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.commons.pipeline.driver.control;
+
import org.apache.commons.pipeline.*;
/**
- * This feeder simply adds the received objects to a list.
+ *
*/
-public class TestFeeder implements Feeder {
- public List<Object> receivedValues = new ArrayList<Object>();
-
- public void feed(Object obj) {
- this.receivedValues.add(obj);
- }
-}
\ No newline at end of file
+public interface PrioritizableStageDriver extends StageDriver{
+ public void increasePriority(double amount);
+ public void decreasePriority(double amount);
+ public double getPriority();
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverFactory.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriverFactory;
+
+/**
+ * <p>This interface represents a factory that is used by a {@link Pipeline} to create
+ * a driver for a {@link Stage} when that stage is added to the pipeline. The factory
+ * pattern is used here to ensure that each stage is run by a unique driver
+ * instance.</p>
+ *
+ * <p>In order to guarantee that PrioritizableStageDriverFactory instances can be used
+ * effectively in configuration frameworks, each PrioritizableStageDriverFactory implementation
+ * <em>must</em> provide a no-argument constructor.
+ */
+public interface PrioritizableStageDriverFactory<T extends PrioritizableStageDriver> extends StageDriverFactory<T> {
+ /**
+ * This method is used to create a driver that will run the specified stage
+ * in the specified context.
+ * @param stage The stage to be run by the newly created driver.
+ * @param context The context in which the stage will be run
+ * @return The newly created driver
+ */
+ public T createStageDriver(Stage stage, StageContext context);
+
+ /**
+ * Sets the initial priority of the driver to be created.
+ */
+ public void setInitialPriority(double priority);
+}
Copied: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java?p2=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/StageProcessTimingEvent.java Wed Sep 22 21:57:01 2010
@@ -15,19 +15,25 @@
* limitations under the License.
*/
-package org.apache.commons.pipeline.testFramework;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.pipeline.*;
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.EventObject;
+import org.apache.commons.pipeline.Stage;
/**
- * This feeder simply adds the received objects to a list.
+ *
*/
-public class TestFeeder implements Feeder {
- public List<Object> receivedValues = new ArrayList<Object>();
-
- public void feed(Object obj) {
- this.receivedValues.add(obj);
+public class StageProcessTimingEvent extends EventObject {
+ private long latency;
+
+ /** Creates a new instance of StageProcessTimingEvent */
+ public StageProcessTimingEvent(Stage source, long latency) {
+ super(source);
+ this.latency = latency;
}
-}
\ No newline at end of file
+
+ public long getLatency() {
+ return latency;
+ }
+}
Added: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java (added)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategy.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.pipeline.Stage;
+
+/**
+ * An implementation of DriverControlStrategy that gauges performance by the
+ * number of queued objects for each stage. Those stages with consistently high
+ * numbers of queued objects have threads added.
+ */
+public class WallClockThresholdDriverControlStrategy implements DriverControlStrategy {
+ /**
+ * The threshold at which a stage is considered to be functioning correctly.
+ * If the time taken to process a stage is on average greater than this
+ * number, more priority is added to the stage
+ */
+ private long thresholdMs = 500;
+
+ private class Timing {
+ private int count = 0;
+ private long duration = 0;
+
+ Timing() { }
+
+ public void add(long duration) {
+ count++;
+ this.duration += duration;
+ }
+ }
+
+ /** Creates a new instance of EqualizingDriverControlStrategy */
+ public WallClockThresholdDriverControlStrategy() {
+ }
+
+ public WallClockThresholdDriverControlStrategy( int thresholdMs ){
+ this.thresholdMs = thresholdMs;
+ }
+
+ public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
+ Map<Stage, Timing> timings = new HashMap<Stage,Timing>();
+ long total = 0;
+ for (StageProcessTimingEvent ev : events) {
+ Timing timing = timings.get((Stage) ev.getSource());
+ if (timing == null) {
+ timing = new Timing();
+ timings.put((Stage) ev.getSource(), timing);
+ }
+
+ timing.add(ev.getLatency());
+ total += ev.getLatency();
+ }
+
+ long mean = total / timings.size();
+
+ for (PrioritizableStageDriver driver : drivers) {
+ Timing timing = timings.get(driver.getStage());
+ long averageDuration = timing.duration / timing.count;
+ if( averageDuration >= thresholdMs )
+ {
+ driver.increasePriority( 1 );
+ }
+ }
+ }
+
+ /**
+ * Holds value of property allowableDelta.
+ */
+ private long allowableDelta;
+
+ /**
+ * Getter for property allowableDelta.
+ * @return Value of property allowableDelta.
+ */
+ public long getAllowableDelta() {
+ return this.allowableDelta;
+ }
+
+ /**
+ * Setter for property allowableDelta.
+ * @param allowableDelta New value of property allowableDelta.
+ */
+ public void setAllowableDelta(long allowableDelta) {
+ this.allowableDelta = allowableDelta;
+ }
+
+}
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java Wed Sep 22 21:57:01 2010
@@ -19,6 +19,7 @@ package org.apache.commons.pipeline.stag
import java.util.Collection;
import java.util.Collections;
+
import org.apache.commons.pipeline.validation.ConsumedTypes;
import org.apache.commons.pipeline.validation.ProducesConsumed;
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FileFinderStage.java Wed Sep 22 21:57:01 2010
@@ -19,6 +19,7 @@ package org.apache.commons.pipeline.stag
import java.io.File;
import java.util.regex.Pattern;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.StageException;
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java Wed Sep 22 21:57:01 2010
@@ -24,8 +24,9 @@ import java.io.OutputStream;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
-import java.util.regex.Pattern;
import java.util.Set;
+import java.util.regex.Pattern;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java Wed Sep 22 21:57:01 2010
@@ -21,7 +21,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import org.apache.commons.pipeline.stage.BaseStage;
+
import org.apache.commons.pipeline.StageException;
/**
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java Wed Sep 22 21:57:01 2010
@@ -25,6 +25,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+
import org.apache.commons.pipeline.StageContext;
import org.apache.commons.pipeline.StageEventListener;
import org.apache.commons.pipeline.StageException;
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/LogStage.java Wed Sep 22 21:57:01 2010
@@ -17,11 +17,9 @@
package org.apache.commons.pipeline.stage;
-import org.apache.commons.pipeline.StageException;
-import org.apache.commons.pipeline.stage.BaseStage;
-import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.StageException;
/**
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java Wed Sep 22 21:57:01 2010
@@ -23,10 +23,9 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-import java.util.Queue;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.stage.BaseStage;
import org.apache.commons.pipeline.StageException;
/**
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java Wed Sep 22 21:57:01 2010
@@ -19,7 +19,7 @@ package org.apache.commons.pipeline.test
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.*;
+import org.apache.commons.pipeline.StageException;
import org.apache.commons.pipeline.validation.ConsumedTypes;
import org.apache.commons.pipeline.validation.ProducesConsumed;