You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by nu...@apache.org on 2010/09/22 23:57:03 UTC
svn commit: r1000251 [2/2] - in /commons/sandbox/pipeline/trunk: ./
src/main/java/org/apache/commons/pipeline/
src/main/java/org/apache/commons/pipeline/config/
src/main/java/org/apache/commons/pipeline/driver/
src/main/java/org/apache/commons/pipeline...
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestFeeder.java Wed Sep 22 21:57:01 2010
@@ -19,7 +19,8 @@ package org.apache.commons.pipeline.test
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.pipeline.*;
+
+import org.apache.commons.pipeline.Feeder;
/**
* This feeder simply adds the received objects to a list.
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/TestStage.java Wed Sep 22 21:57:01 2010
@@ -20,6 +20,7 @@ package org.apache.commons.pipeline.test
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Stage;
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/PipelineValidator.java Wed Sep 22 21:57:01 2010
@@ -18,6 +18,7 @@
package org.apache.commons.pipeline.validation;
import java.util.List;
+
import org.apache.commons.pipeline.Pipeline;
import org.apache.commons.pipeline.Stage;
import org.apache.commons.pipeline.StageDriverFactory;
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/validation/SimplePipelineValidator.java Wed Sep 22 21:57:01 2010
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+
import org.apache.commons.pipeline.Pipeline;
import org.apache.commons.pipeline.Stage;
import org.apache.commons.pipeline.StageDriverFactory;
Modified: commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml (original)
+++ commons/sandbox/pipeline/trunk/src/site/xdoc/pipeline_basics.xml Wed Sep 22 21:57:01 2010
@@ -26,31 +26,29 @@ limitations under the License.
</properties>
<body>
<p>
- A tutorial on some of the Basics needed to use the Apache Commons Pipeline
+ This tutorial covers some of the basics needed to use the Apache Commons Pipeline
workflow framework. The target audience for this document consists of developers
- who will need to assemble existing stages or write their own stages. The
- pipeline provides a Java class library intended to make it easy to use and reuse
- stages as modular processing blocks.
+ who will need to assemble existing stages or write their own stages.
</p>
<section name="Pipeline Structure">
<p>
- <b>Stages</b> in a pipeline represent the logical the steps needed
- to process data. Each represents a single high level processing
- concept such as finding files, reading a file format, computing a
- product from the data, or writing data to a database. The primary
- advantage of using the Pipeline framework and building the
- processing steps into stages is the reusablility of the stages in
- other pipelines.
+ <b>Stages</b> in a pipeline represent logical units of work in
+ a data processing workflow. Each stage implements a single high level
+ processing concept such as finding files, reading a file format, computing a
+ product from the data, or writing data to a database. Commons-pipeline
+ is built from the ground up with parallel processing in mind, so stages
+ are intended to run concurrently.
</p>
<p>
<img src="images/BasicPipeline.png" alt="A basic pipeline"/>
</p>
<p>
- A <b>Pipeline</b> is built up from stages which can pass data on
- to subsequent stages. The arrows above that are labelled
- <b>"EMIT"</b> show the data output of one stage being
- passed to the next stage. At the code level, there is an
- <code>emit()</code> method that sends data to the next stage.
+ A <b>Pipeline</b> is built up from stages that act as filters which
+ process incoming data and pass results on to subsequent stages.
+ In the diagram above, the arrows labelled <b>"EMIT"</b> represent
+ the data output of one stage being passed to the next. The base class for
+ most Stage implementations, <code>BaseStage</code> supplies a convenience method
+ <code>emit()</code> that is used to send data to the next stage in the pipeline.
The data flow starts at the left, where there is an arrow
labelled <b>"FEED"</b>. The FEED starts off the
pipeline and is usually set up by a configuration file,
@@ -125,10 +123,10 @@ limitations under the License.
<li> One object fed into a stage does not always
produce one object out.
<ul>
- <li> Stages that do not pass on (emit) any objecs are referred to as
- <b>terminal stages</b>. Avoid creating this type of stage, since they limit your
- possibilities when building pipelines. (This is easy to do, one line of code
- passes data to the next stage.)
+ <li> Stages that do not pass on (emit) any objecs are
+ referred to as <b>terminal stages</b>. These should be
+ avoided, since they limit your possibilities when building
+ pipelines from existing stage components.
</li>
<li> Stages that send objects on to more than one subsequent
stage are called <b>branching stages</b>.
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/AbstractLoggingTestCase.java Wed Sep 22 21:57:01 2010
@@ -20,8 +20,11 @@ package org.apache.commons.pipeline;
import java.io.File;
import java.io.InputStream;
import java.util.ResourceBundle;
+
import javax.xml.parsers.DocumentBuilderFactory;
-import junit.framework.*;
+
+import junit.framework.TestCase;
+
import org.apache.log4j.xml.DOMConfigurator;
import org.w3c.dom.Document;
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/AbstractStageDriverTest.java Wed Sep 22 21:57:01 2010
@@ -18,7 +18,6 @@
package org.apache.commons.pipeline.driver;
import org.apache.commons.pipeline.AbstractLoggingTestCase;
-import static org.apache.commons.pipeline.StageDriver.State.*;
import org.apache.commons.pipeline.testFramework.TestFeeder;
import org.apache.commons.pipeline.testFramework.TestStage;
import org.apache.commons.pipeline.testFramework.TestStageContext;
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/StageDriverTestUtils.java Wed Sep 22 21:57:01 2010
@@ -18,9 +18,10 @@
package org.apache.commons.pipeline.driver;
import junit.framework.TestCase;
+
import org.apache.commons.pipeline.StageDriver;
-import org.apache.commons.pipeline.testFramework.FaultingTestStage;
import org.apache.commons.pipeline.StageDriverFactory;
+import org.apache.commons.pipeline.testFramework.FaultingTestStage;
import org.apache.commons.pipeline.testFramework.TestFeeder;
import org.apache.commons.pipeline.testFramework.TestStage;
import org.apache.commons.pipeline.testFramework.TestStageContext;
Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/BalancedPoolStageDriverTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import junit.framework.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.StageDriver.State;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.*;
+import org.apache.commons.pipeline.testFramework.TestStage;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+import org.apache.commons.pipeline.testFramework.TestFeeder;
+import org.apache.commons.pipeline.testFramework.TestStageContext;
+
+public class BalancedPoolStageDriverTest extends AbstractStageDriverTest {
+ private Log log;
+
+ public BalancedPoolStageDriverTest(String testName) {
+ super(testName);
+ this.log = LogFactory.getLog(BalancedPoolStageDriverTest.class);
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ /**
+ * Test of getFeeder method, of class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+ */
+ public void testGetFeeder() {
+ System.out.println("testGetFeeder");
+ BalancedPoolStageDriver instance = new BalancedPoolStageDriver(stage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+ Feeder feeder = instance.getFeeder();
+ assertNotNull(feeder);
+ }
+
+ /**
+ * Test of finish method, of class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+ */
+ public void testFinish() throws Exception {
+ System.out.println("testStart");
+ BalancedPoolStageDriver instance = new BalancedPoolStageDriver(stage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+ instance.start(); //must start the driver before we can tell it to finish
+ instance.finish();
+
+ assertEquals(0, instance.getWorkerCount());
+ assertEquals(State.STOPPED, instance.getState());
+ }
+
+ /**
+ * Test of decreasePriority method, of class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+ */
+ public void testAlterPriority() throws Exception {
+ System.out.println("decreasePriority");
+
+ BalancedPoolStageDriver instance = new BalancedPoolStageDriver(stage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+ assertSame("Driver should not be initialized in state " + instance.getState(), instance.getState(), State.STOPPED);
+ instance.start();
+ assertSame("Driver is not running after instance start.", instance.getState(), State.RUNNING);
+
+ int threads = instance.getWorkerCount();
+ instance.increasePriority(2);
+ assertEquals( 2, instance.getWorkerCount() );
+
+ threads = instance.getWorkerCount();
+ instance.decreasePriority(1);
+ assertEquals( 1, instance.getWorkerCount() );
+
+ assertNotSame("Driver has unexpectedly stopped.", instance.getState(), State.STOPPED);
+ instance.finish();
+ assertSame("Driver failed to shut down correctly", instance.getState(), State.STOPPED);
+ }
+
+ /**
+ * Integration test of combined feed/priority change for objects.
+ */
+ public void testSingleStage() throws Exception {
+ TestStage stage = new TestStage(0);
+ PrioritizableStageDriverTestUtils.testSingleStage(this, new BalancedPoolStageDriverFactory(), stage);
+ }
+
+ /**
+ * Integration test of combined feed/priority change for objects.
+ */
+ public void testMultiStage() throws Exception {
+ log.debug("testMultiStage -------------------------------------------");
+ TestStage[] stages = {
+ new TestStage(0),
+ new CPUBoundTestStage(1, 50),
+ new IOBoundTestStage(2, 50, 250)
+ };
+ PrioritizableStageDriverTestUtils.testMultiStage(this, new BalancedPoolStageDriverFactory(), 30, stages);
+ }
+
+ public void testCPUBound() throws Exception {
+ TestStage cpuStage = new CPUBoundTestStage( 1, 50 );
+ PrioritizableStageDriverTestUtils.testSingleStage(this, new BalancedPoolStageDriverFactory(), cpuStage );
+ }
+
+ public void testIOBound() throws Exception {
+ TestStage ioStage = new IOBoundTestStage( 1, 50, 250 );
+ PrioritizableStageDriverTestUtils.testSingleStage( this, new BalancedPoolStageDriverFactory(), ioStage );
+ }
+
+ /**
+ * Test of increasePriority method, sof class org.apache.commons.pipeline.driver.control.BalancedPoolStageDriver.
+ */
+ public void testMultiThreadExecution() throws Exception {
+ System.out.println("multiThreadExecution");
+
+ //start with multiple threads
+ int count = 5;
+ final Set threadNames = Collections.synchronizedSet(new HashSet());
+ TestStage threadNameTrackingStage = new TestStage( 1 ) {
+ public void process( Object obj ) throws StageException {
+ super.process( obj );
+ threadNames.add( Thread.currentThread().getName() );
+ //yield thread control
+ Thread.currentThread().yield();
+ }
+ };
+
+ BalancedPoolStageDriver instance = new BalancedPoolStageDriver(threadNameTrackingStage, context, new BlockingQueueFactory.LinkedBlockingQueueFactory(), 0, FaultTolerance.NONE, 500, java.util.concurrent.TimeUnit.MILLISECONDS);
+ instance.increasePriority( count );
+ assertEquals( count, instance.getWorkerCount() );
+
+ TestStageContext context = new TestStageContext();
+ TestFeeder terminalFeeder = new TestFeeder();
+
+ context.registerDownstreamFeeder(threadNameTrackingStage, terminalFeeder);
+ threadNameTrackingStage.init(context);
+
+ instance.start();
+
+ //ensure that with 1000 fed objects all the threads get exercised
+ int fedObjCount = 100;
+ for( int i = 0; i < fedObjCount; i++ ) {
+ instance.getFeeder().feed( i );
+ }
+
+ instance.finish();
+
+ assertEquals( count, threadNames.size() );
+ }
+
+}
Copied: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java?p2=commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/CPUBoundTestStage.java Wed Sep 22 21:57:01 2010
@@ -15,35 +15,27 @@
* limitations under the License.
*/
-package org.apache.commons.pipeline.testFramework;
+package org.apache.commons.pipeline.driver.control;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.*;
-import org.apache.commons.pipeline.validation.ConsumedTypes;
-import org.apache.commons.pipeline.validation.ProducesConsumed;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.testFramework.TestStage;
-/**
- * This stage will generate {@link StageException}s for every other object this
- * stage processes. By design, the even numbered objects will cause a <CODE>StageException</CODE>
- * to be thrown (counting the first object as 1).
- */
-@ConsumedTypes(Object.class)
-@ProducesConsumed
-public class FaultingTestStage extends TestStage {
- private Log log = LogFactory.getLog(FaultingTestStage.class);
- private int counter = 0;
+
+class CPUBoundTestStage extends TestStage {
+ private Log log = LogFactory.getLog(CPUBoundTestStage.class);
+ private long consume;
- public FaultingTestStage(int index) {
- super(index);
+ public CPUBoundTestStage(int id, long consume) {
+ super( id );
+ this.consume = consume;
}
public void process(Object obj) throws StageException {
- if (++counter % 2 == 0) {
- log.error("Planned fault in stage " + this + ".");
- throw new StageException(this, "Planned fault in stage " + super.getIndex() + ".");
- }
-
- super.process(obj);
+ super.process( obj );
+ long startTime = System.currentTimeMillis();
+ double val = PrioritizableStageDriverTestUtils.consumeNCubed( consume );
+ log.debug( "CPU stage took " + (System.currentTimeMillis() - startTime) + " ms to produce value " + val);
}
-}
\ No newline at end of file
+}
Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategyTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.pipeline.Stage;
+
+public class EqualizingDriverControlStrategyTest extends TestCase {
+
+ public EqualizingDriverControlStrategyTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ /**
+ * Test of handleEvents method, of class org.apache.commons.pipeline.driver.control.EqualizingDriverControlStrategy.
+ */
+ public void testHandleEvents() {
+ System.out.println("handleEvents");
+
+ List<PrioritizableStageDriver> drivers = null;
+ List<StageProcessTimingEvent> events = null;
+ EqualizingDriverControlStrategy instance = new EqualizingDriverControlStrategy();
+
+ instance.handleEvents(drivers, events);
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+ /**
+ * Test of getAllowableDelta method, of class org.apache.commons.pipeline.driver.control.EqualizingDriverControlStrategy.
+ */
+ public void testGetAllowableDelta() {
+ System.out.println("getAllowableDelta");
+
+ EqualizingDriverControlStrategy instance = new EqualizingDriverControlStrategy();
+
+ long expResult = 0L;
+ long result = instance.getAllowableDelta();
+ assertEquals(expResult, result);
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+ /**
+ * Test of setAllowableDelta method, of class org.apache.commons.pipeline.driver.control.EqualizingDriverControlStrategy.
+ */
+ public void testSetAllowableDelta() {
+ System.out.println("setAllowableDelta");
+
+ long allowableDelta = 0L;
+ EqualizingDriverControlStrategy instance = new EqualizingDriverControlStrategy();
+
+ instance.setAllowableDelta(allowableDelta);
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+}
Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExecutorStageDriverTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.*;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+public class ExecutorStageDriverTest extends TestCase {
+
+ public ExecutorStageDriverTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(ExecutorStageDriverTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of getFeeder method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+ */
+ public void testGetFeeder() {
+ System.out.println("getFeeder");
+
+ ExecutorStageDriver instance = null;
+
+ Feeder expResult = null;
+ Feeder result = instance.getFeeder();
+ assertEquals(expResult, result);
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+ /**
+ * Test of start method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+ */
+ public void testStart() throws Exception {
+ System.out.println("start");
+
+ ExecutorStageDriver instance = null;
+
+ instance.start();
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+ /**
+ * Test of finish method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+ */
+ public void testFinish() throws Exception {
+ System.out.println("finish");
+
+ ExecutorStageDriver instance = null;
+
+ instance.finish();
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+ /**
+ * Test of increasePriority method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+ */
+ public void testIncreasePriority() {
+ System.out.println("increasePriority");
+
+ double amount = 0.0;
+ ExecutorStageDriver instance = null;
+
+ instance.increasePriority(amount);
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+ /**
+ * Test of decreasePriority method, of class org.apache.commons.pipeline.driver.control.ExecutorStageDriver.
+ */
+ public void testDecreasePriority() {
+ System.out.println("decreasePriority");
+
+ double amount = 0.0;
+ ExecutorStageDriver instance = null;
+
+ instance.decreasePriority(amount);
+
+ // TODO review the generated test code and remove the default call to fail.
+ fail("The test case is a prototype.");
+ }
+
+}
Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategyTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class ExperimentalDriverControlStrategyTest extends TestCase {
+
+ public ExperimentalDriverControlStrategyTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite( ExperimentalDriverControlStrategyTest.class );
+
+ return suite;
+ }
+
+ public void testCPUBoundControl() throws Exception
+ {
+ System.out.println( "Experimental: testCPUBoundControl");
+ CountingDriverController controller = new CountingDriverController();
+ controller.setMinimumEventsToHandle( 50 );
+ controller.setDriverControlStrategy( new ExperimentalDriverControlStrategy( 5 ) );
+ PrioritizableStageDriverTestUtils.testDriverControllerCPUBound(
+ this,
+ new BalancedPoolStageDriverFactory(),
+ controller );
+ }
+
+ public void testIOBoundControl() throws Exception
+ {
+ System.out.println( "Experimental: testIOBoundControl");
+ CountingDriverController controller = new CountingDriverController();
+ controller.setMinimumEventsToHandle( 50 );
+ controller.setDriverControlStrategy( new ExperimentalDriverControlStrategy( 5 ) );
+ PrioritizableStageDriverTestUtils.testDriverControllerIOBound(
+ this,
+ new BalancedPoolStageDriverFactory(),
+ controller );
+ }
+}
Copied: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java (from r936561, commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java)
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java?p2=commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java&p1=commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java&r1=936561&r2=1000251&rev=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/testFramework/FaultingTestStage.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/IOBoundTestStage.java Wed Sep 22 21:57:01 2010
@@ -15,35 +15,36 @@
* limitations under the License.
*/
-package org.apache.commons.pipeline.testFramework;
+package org.apache.commons.pipeline.driver.control;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.*;
-import org.apache.commons.pipeline.validation.ConsumedTypes;
-import org.apache.commons.pipeline.validation.ProducesConsumed;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.testFramework.TestStage;
-/**
- * This stage will generate {@link StageException}s for every other object this
- * stage processes. By design, the even numbered objects will cause a <CODE>StageException</CODE>
- * to be thrown (counting the first object as 1).
- */
-@ConsumedTypes(Object.class)
-@ProducesConsumed
-public class FaultingTestStage extends TestStage {
- private Log log = LogFactory.getLog(FaultingTestStage.class);
- private int counter = 0;
+
+class IOBoundTestStage extends TestStage {
+ private Log log = LogFactory.getLog(IOBoundTestStage.class);
+ private long consume;
+ private long sleeptime;
- public FaultingTestStage(int index) {
- super(index);
+ public IOBoundTestStage(int id, long consume, long sleeptime) {
+ super( id );
+ this.consume = consume;
+ this.sleeptime = sleeptime;
}
public void process(Object obj) throws StageException {
- if (++counter % 2 == 0) {
- log.error("Planned fault in stage " + this + ".");
- throw new StageException(this, "Planned fault in stage " + super.getIndex() + ".");
+ super.process( obj );
+ try {
+ long startTime = System.currentTimeMillis();
+ Thread.currentThread().sleep( sleeptime );
+ double total = PrioritizableStageDriverTestUtils.consumeNCubed( consume );
+ Thread.currentThread().sleep( sleeptime );
+ log.debug( "IO stage took " + (System.currentTimeMillis() - startTime) + " ms");
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
}
- super.process(obj);
}
-}
\ No newline at end of file
+}
Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/PrioritizableStageDriverTestUtils.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+import org.apache.commons.pipeline.PipelineLifecycleJob;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.testFramework.TestFeeder;
+import org.apache.commons.pipeline.testFramework.TestStage;
+import org.apache.commons.pipeline.testFramework.TestStageContext;
+
+public class PrioritizableStageDriverTestUtils {
+
+ /** Creates a new instance of PrioritizableStageDriverTestUtils */
+ private PrioritizableStageDriverTestUtils() {
+ }
+
+ /**
+ * Tests a stage driver created by the provided PrioritizableStageDriverFactory
+ * with a single test stage.
+ */
+ public static void testSingleStage(TestCase test, PrioritizableStageDriverFactory driverFactory, TestStage stage) throws Exception {
+ TestStageContext context = new TestStageContext();
+ TestFeeder terminalFeeder = new TestFeeder();
+
+ context.registerDownstreamFeeder(stage, terminalFeeder);
+ stage.init(context);
+
+ PrioritizableStageDriver instance = driverFactory.createStageDriver(stage, context);
+
+ instance.start();
+ instance.getFeeder().feed("Hello, world!");
+ instance.increasePriority(1);
+ instance.getFeeder().feed("How are you?");
+ instance.increasePriority(1);
+ instance.getFeeder().feed("Feeling blue?");
+ instance.decreasePriority(2);
+ instance.getFeeder().feed("There's naught to do,");
+ instance.decreasePriority(1);
+ instance.getFeeder().feed("but sing about shoes!");
+ instance.finish();
+
+ test.assertEquals("Incorrect processed object count from stage.", 5, stage.processedObjects.size());
+ test.assertEquals("Incorrect final processed object count.", 5, terminalFeeder.receivedValues.size());
+ for (int i = 0; i < 5; i++) {
+ test.assertTrue("Received value " + i + " is not a String!", terminalFeeder.receivedValues.get(i) instanceof String);
+ }
+ }
+
+ /**
+ * Tests a stage driver created by the provided StageDriverFactory
+ * with a set of three test stages.
+ */
+ public static void testMultiStage(TestCase test, PrioritizableStageDriverFactory driverFactory, int objectsToFeed, TestStage... stages) throws Exception {
+ TestStageContext context = new TestStageContext();
+ TestFeeder terminalFeeder = new TestFeeder();
+
+ List<PrioritizableStageDriver> drivers = new ArrayList<PrioritizableStageDriver>();
+ for (TestStage stage : stages) {
+ drivers.add(driverFactory.createStageDriver(stage, context));
+ stage.init(context);
+ }
+
+ for (int i = 0; i < drivers.size(); i++) {
+ if (i < drivers.size() - 1) {
+ context.registerDownstreamFeeder(stages[i], drivers.get(i+1).getFeeder());
+ } else {
+ context.registerDownstreamFeeder(stages[i], terminalFeeder);
+ }
+ }
+
+ Random random = new Random(0);
+ for (StageDriver driver : drivers) driver.start();
+ for( int i = 0; i < objectsToFeed; i++ ) {
+ drivers.get(0).getFeeder().feed( i );
+
+ //randomly permute driver priority
+ if (random.nextBoolean()) {
+ drivers.get(random.nextInt(drivers.size())).increasePriority(1);
+ } else {
+ drivers.get(random.nextInt(drivers.size())).decreasePriority(1);
+ }
+ }
+ for (StageDriver driver : drivers) driver.finish();
+
+ for (TestStage stage : stages) {
+ test.assertEquals("Incorrect processed object count from stage " + stage, objectsToFeed, stage.processedObjects.size());
+ }
+
+ test.assertEquals("Incorrect final processed object count.", objectsToFeed, terminalFeeder.receivedValues.size());
+ }
+
+ /**
+ * Tests that the given PrioritizableStageDriverFactory priority is driven
+ * towards 0 given a number of CPU tasks and a controller
+ */
+ public static void testDriverControllerCPUBound(TestCase test, PrioritizableStageDriverFactory driverFactory, AbstractDriverController controller ) throws Exception {
+ TestStage stage = new CPUBoundTestStage(0,300);
+ TestFeeder terminalFeeder = new TestFeeder();
+ TestStageContext context = new TestStageContext();
+ context.registerDownstreamFeeder(stage, terminalFeeder);
+ context.registerListener(controller);
+
+ stage.init(context);
+
+ //start the drivercontroller
+ if( controller instanceof PipelineLifecycleJob ) {
+ ((PipelineLifecycleJob) controller).onStart( null );
+ }
+
+ PrioritizableStageDriver instance = driverFactory.createStageDriver(stage, context);
+
+ controller.addManagedStageDriver( instance );
+ long initialPriority = 2;
+ instance.increasePriority( initialPriority );
+ test.assertEquals( initialPriority, Math.round( instance.getPriority()));
+
+ instance.start();
+ int numFeeds = 500;
+ for( int i = 0; i < numFeeds; i++ ) {
+ instance.getFeeder().feed("Hello, world!");
+ }
+
+ long startTime = System.currentTimeMillis();
+ long feedTime = 15000;
+ while( (System.currentTimeMillis()-startTime)<feedTime)
+ {
+ instance.getFeeder().feed("Hello, world!");
+ Thread.currentThread().sleep( 200 );
+ }
+ test.assertTrue( "Controller should have decreased priority below "+
+ initialPriority+", instead was "+instance.getPriority(),
+ instance.getPriority() < initialPriority );
+ instance.finish();
+ }
+
+ /**
+ * Tests that the given PrioritizableStageDriverFactory priority is driven
+ * up given a number of IO tasks and a controller
+ */
+ public static void testDriverControllerIOBound(TestCase test, PrioritizableStageDriverFactory driverFactory, AbstractDriverController controller ) throws Exception {
+ TestStage stage = new IOBoundTestStage(0,0,150);
+ TestFeeder terminalFeeder = new TestFeeder();
+ TestStageContext context = new TestStageContext();
+ context.registerDownstreamFeeder(stage, terminalFeeder);
+ context.registerListener(controller);
+
+ stage.init(context);
+
+ //start the drivercontroller
+ if( controller instanceof PipelineLifecycleJob ) {
+ ((PipelineLifecycleJob) controller).onStart( null );
+ }
+
+ PrioritizableStageDriver instance = driverFactory.createStageDriver(stage, context);
+
+ controller.addManagedStageDriver( instance );
+ long initialPriority = 1;
+ instance.increasePriority( initialPriority );
+ test.assertEquals( initialPriority, Math.round( instance.getPriority()));
+
+ instance.start();
+ long startTime = System.currentTimeMillis();
+ long feedTime = 20000;
+ while( (System.currentTimeMillis()-startTime)<feedTime)
+ {
+ instance.getFeeder().feed("Hello, world!");
+ Thread.currentThread().sleep( 100 );
+ }
+
+ test.assertTrue( "Controller should have increased priority above "+
+ initialPriority+", instead was "+instance.getPriority(),
+ instance.getPriority() > initialPriority );
+
+ instance.finish();
+ }
+
+ /**
+ * Consume resources in O(n^2) fashion
+ *
+ * Return the aggregate value so the sqrt operation is less likely to be
+ * optimized by the JRE
+ */
+ public static double consumeNSquared( long operations ){
+ double aggregate = 0;
+ for( int i = 0; i < operations; i++ ) {
+ for( int j = 0; j < operations; j++ ) {
+ aggregate += Math.sqrt( i + j );
+ }
+ }
+ return aggregate;
+ }
+
+ /**
+ * Consume resources in O(n^3) fashion
+ *
+ * Return the aggregate value so the sqrt operation is less likely to be
+ * optimized by the JRE
+ */
+ public static double consumeNCubed( long operations ){
+ double aggregate = 0;
+ for( int i = 0; i < operations; i++ ) {
+ for( int j = 0; j < operations; j++ ) {
+ for( int k = 0; k < operations; k++ ) {
+ aggregate += Math.sqrt( i + j );
+ }
+ }
+ }
+ return aggregate;
+ }
+}
Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/ToyBalancedPipelineTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.TestCase;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.testFramework.TestFeeder;
+import org.apache.commons.pipeline.testFramework.TestStage;
+import org.apache.commons.pipeline.util.BlockingQueueFactory;
+import org.apache.commons.pipeline.validation.ValidationException;
+
+public class ToyBalancedPipelineTest extends TestCase {
+ public void testToyFactory() throws ValidationException, StageException {
+ Pipeline pipeline = new Pipeline();
+ CountingDriverController controller = new CountingDriverController();
+ controller.setMinimumEventsToHandle(20);
+ EqualizingDriverControlStrategy controlStrategy = new EqualizingDriverControlStrategy();
+ controller.setDriverControlStrategy(controlStrategy);
+ controlStrategy.setAllowableDelta(100);
+ pipeline.addLifecycleJob(controller);
+ TestFeeder terminalFeeder = new TestFeeder();
+ pipeline.setTerminalFeeder(terminalFeeder);
+
+ BalancedPoolStageDriverFactory driverFactory = new BalancedPoolStageDriverFactory();
+ driverFactory.setInitialPriority(1);
+ driverFactory.setQueueFactory(new BlockingQueueFactory.LinkedBlockingQueueFactory());
+
+ TestStage moldBody = new MoldBodyStage(1);
+ TestStage bodyPaint = new BodyPaintStage(2);
+ TestStage addWheels = new AddWheelsStage(3);
+ TestStage addAI = new AddAIStage(4);
+
+ pipeline.addStage(moldBody, driverFactory);
+ pipeline.addStage(bodyPaint, driverFactory);
+ pipeline.addStage(addWheels, driverFactory);
+ pipeline.addStage(addAI, driverFactory);
+
+ for (StageDriver driver : pipeline.getStageDrivers()) {
+ if (driver instanceof PrioritizableStageDriver) controller.addManagedStageDriver((PrioritizableStageDriver) driver);
+ }
+
+ DriverMonitor m = new DriverMonitor(pipeline);
+ m.start();
+ pipeline.start();
+
+ for (int i = 0; i < 100; i++) {
+ pipeline.getSourceFeeder().feed(new Car(i, "I'm a hunk of metal!"));
+ }
+
+ pipeline.finish();
+ m.finish();
+
+ assertEquals("Incorrect number of objects received.", 100, terminalFeeder.receivedValues.size());
+ for (Object obj : terminalFeeder.receivedValues) {
+ assertTrue("Object is not a car!", obj instanceof Car);
+ assertEquals("I AM ALIVE!", ((Car) obj).message);
+ }
+ }
+
+ private class DriverMonitor extends Thread {
+ private volatile boolean done = false;
+ private Pipeline pipeline;
+ public DriverMonitor(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ public void run() {
+ while (!done) {
+ StringBuilder b = new StringBuilder();
+ for (StageDriver driver : pipeline.getStageDrivers()) {
+ PrioritizableStageDriver d = (PrioritizableStageDriver) driver;
+ b.append(d.getStage()).append(": ").append(d.getPriority()).append("; ");
+ }
+ System.out.println("Driver priorities: " + b.toString());
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ public void finish() {
+ this.done = true;
+ }
+ }
+
+ private static class Car {
+ public int id;
+ public String message;
+
+ public Car(int id, String message) {
+ this.id = id;
+ this.message = message;
+ }
+
+ public String toString() {
+ return "Car " + this.id + ": " + this.message;
+ }
+ }
+
+
+ private static class MoldBodyStage extends CPUBoundTestStage {
+ public MoldBodyStage(int id) {
+ super(id, 50);
+ }
+
+ public void process(Object obj) throws StageException {
+ Car car = (Car) obj;
+ if ("I'm a hunk of metal!".equals(car.message)) {
+ car.message = "Now I'm a car body!";
+ super.process(car);
+ } else {
+ throw new StageException(this, "Whoa! " + obj);
+ }
+ }
+ }
+
+ private static class BodyPaintStage extends IOBoundTestStage {
+ public BodyPaintStage(int id) {
+ super(id, 10, 250);
+ }
+
+ public void process(Object obj) throws StageException {
+ Car car = (Car) obj;
+ if ("Now I'm a car body!".equals(car.message)) {
+ car.message = "I've been painted!";
+ super.process(car);
+ } else {
+ throw new StageException(this, "Whoa! " + obj);
+ }
+ }
+ }
+
+ private static class AddWheelsStage extends TestStage {
+ public AddWheelsStage(int id) {
+ super(id);
+ }
+
+ public void process(Object obj) throws StageException {
+ Car car = (Car) obj;
+ if ("I've been painted!".equals(car.message)) {
+ car.message = "Got my wheels!";
+ super.process(car);
+ } else {
+ throw new StageException(this, "Whoa! " + obj);
+ }
+ }
+ }
+
+ private static class AddAIStage extends CPUBoundTestStage {
+ public AddAIStage(int id) {
+ super(id, 500);
+ }
+
+ public void process(Object obj) throws StageException {
+ Car car = (Car) obj;
+ if ("Got my wheels!".equals(car.message)) {
+ car.message = "I AM ALIVE!";
+ super.process(car);
+ } else {
+ throw new StageException(this, "Whoa! " + obj);
+ }
+ }
+ }
+
+}
Added: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java (added)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java Wed Sep 22 21:57:01 2010
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.pipeline.driver.control;
+
+import junit.framework.*;
+import static org.apache.commons.pipeline.StageDriver.State.*;
+import static org.apache.commons.pipeline.driver.FaultTolerance.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Feeder;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.driver.FaultTolerance;
+
+public class WallClockThresholdDriverControlStrategyTest extends TestCase {
+
+ public WallClockThresholdDriverControlStrategyTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(WallClockThresholdDriverControlStrategyTest.class);
+
+ return suite;
+ }
+
+ public void testCPUBoundControl() throws Exception
+ {
+ System.out.println( "WallClock: testCPUBoundControl");
+ CountingDriverController controller = new CountingDriverController();
+ controller.setMinimumEventsToHandle( 10 );
+ controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );
+ PrioritizableStageDriverTestUtils.testDriverControllerCPUBound(
+ this,
+ new BalancedPoolStageDriverFactory(),
+ controller );
+ }
+
+ public void testIOBoundControl() throws Exception
+ {
+ System.out.println( "WallClock: testIOBoundControl");
+ CountingDriverController controller = new CountingDriverController();
+ controller.setMinimumEventsToHandle( 10 );
+ controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );
+ PrioritizableStageDriverTestUtils.testDriverControllerIOBound(
+ this,
+ new BalancedPoolStageDriverFactory(),
+ controller );
+ }
+}
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java Wed Sep 22 21:57:01 2010
@@ -19,9 +19,9 @@ package org.apache.commons.pipeline.stag
import java.util.ArrayList;
import java.util.List;
-import junit.framework.*;
-import org.apache.commons.pipeline.testFramework.TestFeeder;
-import org.apache.commons.pipeline.testFramework.TestStageContext;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
/**
* Test cases for AddToCollectionStage
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java Wed Sep 22 21:57:01 2010
@@ -17,9 +17,8 @@
package org.apache.commons.pipeline.stage;
-import junit.framework.*;
-import org.apache.commons.pipeline.testFramework.TestFeeder;
-import org.apache.commons.pipeline.testFramework.TestStageContext;
+import junit.framework.Test;
+import junit.framework.TestSuite;
/**
* Test cases for DynamicLookupStaticMethodStage.
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java Wed Sep 22 21:57:01 2010
@@ -21,7 +21,11 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.net.URL;
-import junit.framework.*;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
import org.apache.commons.pipeline.testFramework.TestFeeder;
import org.apache.commons.pipeline.testFramework.TestStageContext;
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java Wed Sep 22 21:57:01 2010
@@ -18,7 +18,9 @@
package org.apache.commons.pipeline.stage;
import java.io.InputStream;
-import junit.framework.*;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
/**
* Test cases for InputStreamLineBreakStage.
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeMethodStageTest.java Wed Sep 22 21:57:01 2010
@@ -18,9 +18,10 @@
package org.apache.commons.pipeline.stage;
-import junit.framework.*;
import java.lang.reflect.Method;
-import java.util.ArrayList;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
/**
*
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InvokeStaticMethodStageTest.java Wed Sep 22 21:57:01 2010
@@ -18,10 +18,9 @@
package org.apache.commons.pipeline.stage;
import java.lang.reflect.Method;
-import java.util.ArrayList;
-import junit.framework.*;
-import org.apache.commons.pipeline.Pipeline;
-import org.apache.commons.pipeline.driver.SynchronousStageDriver;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
/**
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?rev=1000251&r1=1000250&r2=1000251&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java Wed Sep 22 21:57:01 2010
@@ -49,6 +49,8 @@ public class KeyWaitBufferStageTest exte
* data waiting for notify() to be called with an appropriate event.
*/
public void testProcessAndNotify() throws Exception {
+ System.out.println("notify");
+
String obj = "Hello, World!";
KeyFactory<Object,Integer> keyFactory = new KeyFactory.HashKeyFactory();
EventObject ev = new KeyAvailableEvent<Integer>(this, keyFactory.generateKey(obj));
Added: commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml?rev=1000251&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml (added)
+++ commons/sandbox/pipeline/trunk/src/test/resources/test_eqdrivercontrol_conf.xml Wed Sep 22 21:57:01 2010
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+-->
+
+<pipeline>
+ <listener className="org.apache.commons.pipeline.listener.ObjectProcessedEventCounter"/>
+
+ <driverFactory className="org.apache.commons.pipeline.driver.control.BalancedPoolStageDriverFactory" id="f1">
+ <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
+ capacity="10" fair="false"/>
+ </driverFactory>
+
+ <lifecycleJob className="org.apache.commons.pipeline.driver.control.CountingDriverController">
+
+ </lifecycleJob>
+
+ <env>
+ <object className="java.util.Date" key="testDate"/>
+ <value key="testEnvVar">Hello, World!</value>
+ </env>
+
+ <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="f1"
+ filePattern=".*\.java" />
+
+ <stage className="org.apache.commons.pipeline.stage.LogStage" driverFactoryId="f1" />
+
+ <stage className="org.apache.commons.pipeline.stage.RaiseEventStage" driverFactoryId="f1" />
+
+ <stage className="org.apache.commons.pipeline.stage.RaiseKeyAvailableEventStage" driverFactoryId="f1">
+ <property propName="keyFactory" className="org.apache.commons.pipeline.util.KeyFactory$HashKeyFactory"/>
+ </stage>
+
+ <stage className="org.apache.commons.pipeline.stage.KeyWaitBufferStage" driverFactoryId="f1">
+ <property propName="keyFactory" className="org.apache.commons.pipeline.util.KeyFactory$HashKeyFactory"/>
+ <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
+ capacity="10" fair="false"/>
+ </stage>
+
+ <stage className="org.apache.commons.pipeline.stage.LogStage" driverFactoryId="f1" />
+
+ <feed>
+ <value>src/main/java</value>
+ <value>src/test/java</value>
+ </feed>
+</pipeline>