You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by sk...@apache.org on 2005/07/27 08:38:10 UTC

svn commit: r225469 - in /jakarta/commons/sandbox/pipeline/trunk/src: java/org/apache/commons/pipeline/driver/ java/org/apache/commons/pipeline/stage/ test/conf/ test/java/org/apache/commons/pipeline/driver/ test/java/org/apache/commons/pipeline/stage/

Author: skitching
Date: Tue Jul 26 23:37:44 2005
New Revision: 225469

URL: http://svn.apache.org/viewcvs?rev=225469&view=rev
Log:
Initial import of additional Stage implementations and related tests

Added:
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html
    jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html
    jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java   (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java   (with props)

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. 
+ *
+ * Created on July 19, 2005, 4:34 PM
+ *
+ * $Log: AbstractStageMonitor.java,v $
+ * Revision 1.3  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.2  2005/07/22 23:21:35  kjn
+ * Added stage parameter to constructor to simplify callbacks for those monitors
+ * that need to access the Stage.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageMonitor;
+import org.apache.commons.pipeline.StageMonitor.State;
+
+/**
+ * Provades an abstract {@link StageMonitor} base class that implements methods 
+ * that do not need to be synchronized.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public abstract class AbstractStageMonitor implements StageMonitor {
+    private static final Log log = LogFactory.getLog(AbstractStageMonitor.class);
+    
+    protected Stage stage;
+    protected List<Throwable> errors = new ArrayList<Throwable>();
+    protected State state = State.STOPPED;
+    
+    /**
+     * Creates a new instance of AbstractStageMonitor
+     */
+    public AbstractStageMonitor(Stage stage) {
+        this.stage = stage;
+    }
+    
+    /**
+     *
+     */
+    public final void preprocessFailed(Throwable fault) {
+        this.state = State.ERROR;
+        log.fatal("Error in preprocessing caused abort.", fault);
+        this.errors.add(fault);
+    }
+    
+    /**
+     * Monitors handler failures.
+     *
+     * @param data the data that was being processed as the fault occurred
+     * @param fault the faulting exception
+     */
+    public final void processingFailed( Object data, Throwable fault) {
+        log.error("Processing error on data object " + data, fault);
+        this.errors.add(fault);
+    }
+    
+    /**
+     *
+     */
+    public final void postprocessFailed(Throwable fault) {
+        this.state = State.ERROR;
+        log.fatal("Error in postprocessing caused abort.", fault);
+        this.errors.add(fault);
+    }
+    
+    /**
+     * Monitors driver thread interruption failures.
+     *
+     * @param fault the faulting exception
+     */
+    public final void driverFailed( InterruptedException fault ) {
+        this.state = State.ERROR;
+        this.errors.add(fault);
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. 
+ *
+ * Created on July 19, 2005, 4:26 PM
+ *
+ * $Log: SimpleStageDriver.java,v $
+ * Revision 1.4  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.3  2005/07/22 23:22:51  kjn
+ * Changes to reflect changes in StageDriver base class, consolidation with
+ * SimpleStageMonitor code to eliminate unnecessary public class.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.util.Collections;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.StageMonitor;
+
+/**
+ * This is a non-threaded version of the StageDriver.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class SimpleStageDriver extends StageDriver {
+    
+    private static final Log log = LogFactory.getLog(SimpleStageDriver.class);
+    
+    /** Creates a new instance of SimpleStageDriver */
+    public SimpleStageDriver() {
+        super();
+    }
+    
+    /** Sets a new monitor on the stage and notifies the monitor we have started */
+    protected void startInternal(Stage stage) throws StageException {
+        stage.preprocess();
+        stage.getMonitor().driverStarted();
+        
+        try {
+            Object o = stage.poll();
+            while (o != null){
+                stage.process(o);
+                o = stage.poll();
+            }
+        } catch (Exception e) {
+            stage.release();
+            throw new StageException(e);
+        }
+    }
+    
+    /** Notify the monitor that we have stopped */
+    protected void finishInternal(Stage stage) throws StageException {
+        try {
+            stage.postprocess();
+        } finally {
+            stage.release();
+        }
+    }
+    
+    /**
+     * Factory method for StageMonitor that works with this driver.
+     */
+    protected StageMonitor createStageMonitor(Stage stage) {
+        return new AbstractStageMonitor(stage) {
+            /**
+             * StageDriver has been requested to start stage processing.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#STARTING}.
+             */
+            public void startRequested() {
+                if (this.state == State.STOPPED) this.state = State.STARTING;
+            }
+            
+            /**
+             * StageDriver has started execution.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#RUNNING}.
+             */
+            public void driverStarted() {
+                if (this.state == State.STOPPED || this.state == State.STARTING) this.state = State.RUNNING;
+            }
+            
+            /**
+             * StageDriver has been requested to halt stage processing.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#STOPPING}.
+             */
+            public void stopRequested() {
+                this.state = State.STOP_REQUESTED;
+            }
+            
+            /**
+             * StageDriver has finished execution.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#STOPPED}.
+             */
+            public void driverStopped() {
+                this.state = State.STOPPED;
+            }
+            
+            /** Returns the state */
+            public org.apache.commons.pipeline.StageMonitor.State getState() {
+                return this.state;
+            }
+            
+            /** Returns all errors */
+            public java.util.List<Throwable> getErrors() {
+                return Collections.unmodifiableList(this.errors);
+            }
+            
+            public void enqueueOccurred() {
+                if (this.state != State.RUNNING) {
+                    try {
+                        this.stage.getStageDriver().start(this.stage);
+                    } catch (StageException e){
+                        this.preprocessFailed(e);
+                    }
+                } else {
+                    Object object = stage.poll();
+                    try {
+                        stage.process(object);
+                    } catch (StageException e){
+                        this.processingFailed(object, e);
+                    }
+                }
+            }
+        };
+    }
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 18, 2005, 4:30 PM
+ *
+ * $Log: AddToCollectionStage.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.1  2005/07/21 17:41:19  tns
+ * Added Simple Driver, made some pipeline modifications and added some stages.
+ *
+ * Revision 1.4  2005/07/21 17:14:04  kjn
+ * Changed to use commons-logging
+ *
+ * Revision 1.3  2005/07/19 22:27:04  kjn
+ * Fixed javadocs
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.commons.pipeline.BaseStage;
+
+/**
+ * This is a simple stage in the pipeline which will add the object to the
+ * specified collection.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class AddToCollectionStage<T> extends BaseStage {
+    
+    /**
+     * Holds value of property collection.
+     */
+    private Collection<T> collection;
+
+    /** 
+     * Creates a new instance of AddToCollectionStage.  This constructor
+     * will synchronized the collection by default.
+     */
+    public AddToCollectionStage(Collection<T> collection) {
+        this(collection, true);
+    }
+    
+    /** 
+     * Creates a new instance of AddToCollectionStage.
+     * @param collection The collection in which to add objects to
+     * @param synchronized A flag value that determines whether or not accesses
+     * to the underlying collection are synchronized.
+     */
+    public AddToCollectionStage(Collection<T> collection, boolean synchronize) {
+        if (collection == null){
+            throw new IllegalArgumentException("Argument 'collection' can not be null.");
+        }
+        
+        this.collection = synchronize ? Collections.synchronizedCollection(collection) : collection;
+    }
+    
+    /** 
+     * Adds the object to the underlying collection.
+     *
+     * @throws ClassCastException if the object is not of a suitable type to be added
+     * to the collection.
+     */
+    public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+        this.collection.add((T) obj);
+        this.exqueue(obj);
+    }
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 9:25 AM
+ *
+ * $Log: DynamicLookupStaticMethodStage.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * <p>Provide this Stage with a class and a static method name and it will dynamically
+ * look up the appropriate method to call based on the object type.  If the
+ * object type is an array, it will assume that the method that needs to be called
+ * contains the method signature as described by the objects in the array.
+ * The object returned from the method call will be exqueued.</p>
+ *
+ * <p>The resulting object will be exqueued on the main pipeline if it is not null.  If
+ * it is null, we will try to place the original object on the branch specified
+ * by the nullResultBranchTag property. The default for this value is "nullResult".</p>
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class DynamicLookupStaticMethodStage extends BaseStage {
+    
+    // Branch upon which the original objects will be enqueued if the defined
+    // Method returned a null result.
+    private String nullResultBranchTag = "nullResult";
+    
+    // Name of the method to call to process the object.
+    private String methodName;
+    
+    // Class containing the method.
+    private Class clazz;
+    
+    /**
+     * Creates a new instance of DynamicLookupStaticMethodStage
+     *
+     * @param clazz The class that defines the static method that will be used to
+     * process objects.
+     * @param methodName The name of the method. This method may be overloaded.
+     */
+    public DynamicLookupStaticMethodStage(Class clazz, String methodName) {
+        super();
+        if (clazz == null){
+            throw new IllegalArgumentException("Argument 'clazz' can not be null.");
+        }
+        if (methodName == null){
+            throw new IllegalArgumentException("Argument 'methodName' can not be null.");
+        }
+        this.clazz = clazz;
+        this.methodName = methodName;
+    }
+    
+    /**
+     * Creates a new DynamicLookupStaticMethodStage for the specified static
+     * method.
+     *
+     * @param className The fully qualified class name of the class in which the
+     * static method that will be used to process objects is defined.
+     * @param methodName The name of the method. This method may be overloaded.
+     */
+    public static DynamicLookupStaticMethodStage newInstance(String className, String methodName) throws ClassNotFoundException {
+        Class clazz = DynamicLookupStaticMethodStage.class.getClassLoader().loadClass(className);
+        return new DynamicLookupStaticMethodStage(clazz, methodName);
+    }
+    
+    /** Returns the name of the method we are using */
+    public String getMethodName(){
+        return this.methodName;
+    }
+    
+    /** Returns the class we are using */
+    public Class getMethodClass(){
+        return this.clazz;
+    }
+    
+    /**
+     * <p>Finds the appropriate method overloading for the method specified
+     * by {@link #getMethodName() methodName}, calls it to process the object, and exqueues
+     * any returned object. If the returned object is null, the original object
+     * is enqueued on the branch specified by the nullResultBranchTag property.</p>
+     *
+     * @param obj The object to process.
+     */
+    public void process(Object obj) throws StageException {
+        try {
+            Method method = null;
+            if (obj.getClass().isArray()){
+                Object[] objs = (Object[]) obj;
+                Class[] classes = new Class[objs.length];
+                for (int i = 0; i < objs.length; i++){
+                    classes[i] = objs[i].getClass();
+                }
+                method = this.clazz.getMethod(methodName, classes);
+            } else {
+                method = this.clazz.getMethod(methodName, obj.getClass());
+            }
+            
+            Object returnObj = method.invoke(null, obj);
+            if (returnObj != null){
+                this.exqueue(returnObj);
+            } else {
+                this.exqueue("nullResult", obj);
+            }
+        } catch (NoSuchMethodException e){
+            throw new StageException("No method",e);
+        } catch (IllegalAccessException e){
+            throw new StageException("Illegal Access",e);
+        } catch (InvocationTargetException e){
+            throw new StageException("Invocation",e);
+        }
+    }
+    
+    /**
+     * Getter for property nullResultBranchTag. The default value is "nullResult".
+     * @return Value of property nullResultBranchTag.
+     */
+    public String getNullResultBranchTag() {
+        return this.nullResultBranchTag;
+    }
+    
+    /**
+     * Setter for property nullResultBranchTag.
+     * @param nullResultBranchTag New value of property nullResultBranchTag.
+     */
+    public void setNullResultBranchTag(String nullResultBranchTag) {
+        this.nullResultBranchTag = nullResultBranchTag;
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Log: InputStreamLineBreakStage.java,v $
+ * Revision 1.2  2005/07/26 18:20:28  tns
+ * apache license.
+ *
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ *  Breaks up an InputStream by line and exqueues the line as a string.
+ *
+ * @author tns
+ * @version $Id$
+ */
+public class InputStreamLineBreakStage extends BaseStage {
+    
+    /** Creates a new instance of InputStreamLineBreakStage */
+    public InputStreamLineBreakStage() {
+        super();
+    }
+
+    public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+        InputStream is = (InputStream) obj;
+        try {
+            InputStreamReader reader = new InputStreamReader(is);
+            BufferedReader buffered = new BufferedReader(reader);
+            String line = buffered.readLine();
+            while (line != null){
+                this.exqueue(line);
+                line = buffered.readLine();
+            }
+        } catch (IOException e){
+            throw new StageException(e);
+        }
+
+    }
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. 
+ *
+ * Created on July 19, 2005, 9:05 AM
+ * 
+ * $Log: RunPredefinedStaticMethodStage.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * Runs a static method with the object being processed. The returned object 
+ * will be exqueued on the main pipeline if it is not null. If the returned
+ * object is null, this stage will attempt to place the original object on the 
+ * branch specified by {@link #setNullResultBranchTag(String)}.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class RunPredefinedStaticMethodStage extends BaseStage {
+    
+    // Branch upon which the original objects will be enqueued if the defined
+    // Method returned a null result.
+    private String nullResultBranchTag = "nullResult";
+
+    // Method used to process objects in the queue
+    private Method method;
+    
+    /**
+     * Creates a new instance of RunPredefinedStaticMethodStage
+     */
+    public RunPredefinedStaticMethodStage(Method method) {
+        super();
+        this.method = method;
+    }
+    
+    /** 
+     * Returns the Method object for the method that will be used to process
+     * objects in the queue.
+     */
+    public Method getMethod(){
+        return this.method;
+    }
+    
+    /** 
+     * Convenience method to create the new stage with String description of className, methodName and argumentType
+     *
+     * @param className The fully qualified class name, such as "java.lang.String" of the class in which the method resides
+     * @param methodName The name of the method
+     * @param argumentType The argument type of the method (Sorry, this doesn't support multiple argument methods)
+     */
+    public static RunPredefinedStaticMethodStage newInstance(String className, String methodName, String argumentType) throws ClassNotFoundException, NoSuchMethodException {
+        Class clazz = RunPredefinedStaticMethodStage.class.getClassLoader().loadClass(className);
+        Class argumentClass = RunPredefinedStaticMethodStage.class.getClassLoader().loadClass(argumentType);
+        return new RunPredefinedStaticMethodStage(clazz.getMethod(methodName, argumentClass));
+    }
+    
+    /** 
+     * <p>Calls the defined static method and exqueues the returned object if it is 
+     * not null, otherwise placing the original object on the branch specified
+     * by the nullResultBranchTag property.</p>
+     *
+     * @param obj The object to be processed.
+     */
+    public void process(Object obj) throws StageException {
+        try {
+            Object returnObj = this.method.invoke(null, obj);
+            if (returnObj != null){
+                this.exqueue(returnObj);
+            } else {
+                this.exqueue(nullResultBranchTag, obj);
+            }
+        } catch (IllegalAccessException e){
+            throw new StageException("Illegal Access",e);
+        } catch (InvocationTargetException e){
+            throw new StageException("Invocation",e);
+        }       
+    }
+
+    /**
+     * Getter for property nullResultBranchTag. The default value is "nullResult".
+     * @return Value of property nullResultBranchTag.
+     */
+    public String getNullResultBranchTag() {
+        return this.nullResultBranchTag;
+    }
+
+    /**
+     * Setter for property nullResultBranchTag.
+     * @param nullResultBranchTag New value of property nullResultBranchTag.
+     */
+    public void setNullResultBranchTag(String nullResultBranchTag) {
+        this.nullResultBranchTag = nullResultBranchTag;
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 21, 2005, 9:55 AM
+ *
+ * $Log: URLToInputStreamStage.java,v $
+ * Revision 1.4  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * Converts a URL into an InputStream.  This stage keeps track of all
+ * input streams that are created and closes them at the release step.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class URLToInputStreamStage extends BaseStage {
+    
+    private static final Log log = LogFactory.getLog(URLToInputStreamStage.class);
+    private List<InputStream> inputStreams = new ArrayList<InputStream>();
+    
+    /** Creates a new instance of URLToInputStreamStage */
+    public URLToInputStreamStage() {    }
+    
+    /** Creates a new instance of URLToInputStreamStage */
+    public URLToInputStreamStage(Queue<Object> queue) {
+        super(queue);
+    }    
+    
+    /** 
+     * Takes a String or a URL object representing a URL and exqueues the input 
+     * stream returned by opening that URL.
+     *
+     * @param obj A String or URL object
+     */    
+    public void process(Object obj) throws org.apache.commons.pipeline.StageException {        
+        URL url = null;
+        if (obj instanceof URL){
+            url = (URL) obj;
+        } else if (obj instanceof String) {
+            String urlString = (String) obj;
+            try {
+                url = new URL(urlString);
+            } catch (MalformedURLException e){
+                throw new StageException("Error converting url String:" + urlString,e);
+            }
+        }
+        
+        try {
+            InputStream inputStream = url.openStream();
+            this.inputStreams.add(inputStream);
+            log.info("enqueing input stream");
+            this.exqueue(inputStream);
+        } catch (IOException e){
+            throw new StageException("Error with stream from url:" + url,e);
+        }
+    }
+    
+    /**
+     * Ensure that all opened input streams are closed.
+     */
+    public void release() {
+        log.info("running post process number of streams:" + inputStreams.size());
+        while(inputStreams.size() > 0){
+            InputStream is = (InputStream) inputStreams.remove(0);
+            try {
+                is.close();
+                log.info("closed stream");
+            } catch (IOException e){
+                log.warn("Error closing stream",e);
+            }
+        }
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html Tue Jul 26 23:37:44 2005
@@ -0,0 +1,10 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+
+<html>
+  <head>
+    <title></title>
+  </head>
+  <body>
+  A few simple Stage and StageQueue implementations for common use cases.
+  </body>
+</html>

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt Tue Jul 26 23:37:44 2005
@@ -0,0 +1 @@
+This is a test file.
\ No newline at end of file

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html Tue Jul 26 23:37:44 2005
@@ -0,0 +1,10 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+
+<html>
+  <head>
+    <title></title>
+  </head>
+  <body>
+  
+  </body>
+</html>

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt Tue Jul 26 23:37:44 2005
@@ -0,0 +1,3 @@
+line 1
+line 2
+line 3
\ No newline at end of file

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 22, 2005, 4:10 PM
+ *
+ * $Log: SimpleStageDriverTest.java,v $
+ * Revision 1.3  2005/07/25 22:19:17  kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import junit.framework.*;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+
+
+/**
+ * Test cases for SimpleStageDriver.
+ *
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
+ */
+public class SimpleStageDriverTest extends TestCase {
+    
+    private Pipeline pipeline;
+    private volatile int[] processedObjectCount;
+    
+    public SimpleStageDriverTest(String testName) {
+        super(testName);
+    }
+    
+    protected void setUp() throws Exception {
+        this.pipeline = new Pipeline();
+        this.processedObjectCount = new int[3];
+    }
+    
+    protected void tearDown() throws Exception {
+    }
+    
+    public static Test suite() {
+        TestSuite suite = new TestSuite(SimpleStageDriverTest.class);
+        
+        return suite;
+    }
+    
+    /**
+     * Integration test for org.apache.commons.pipeline.driver.SimpleStageDriver.
+     */
+    public void testRunPipeline() {
+        StageDriver driver = new SimpleStageDriver();
+        Stage stage0 = new TestStage(0);
+        this.pipeline.addStage(stage0, driver);
+        Stage stage1 = new TestStage(1);
+        this.pipeline.addStage(stage1, driver);
+        Stage stage2 = new TestStage(2);
+        this.pipeline.addStage(stage2, driver);
+       
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.run();
+        
+        Assert.assertEquals(4, processedObjectCount[0]);
+        Assert.assertEquals(4, processedObjectCount[1]);
+        Assert.assertEquals(4, processedObjectCount[2]);
+        Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage1.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+    }
+    
+    public void testFaultingPipeline() {
+        StageDriver driver = new SimpleStageDriver();
+        Stage stage0 = new TestStage(0);
+        this.pipeline.addStage(stage0, driver);
+        Stage stage1 = new FaultingTestStage(1);
+        this.pipeline.addStage(stage1, driver);
+        Stage stage2 = new TestStage(2);
+        this.pipeline.addStage(stage2, driver);
+        
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.run();
+        
+        Assert.assertEquals(4, processedObjectCount[0]);
+        Assert.assertEquals(2, processedObjectCount[1]);
+        Assert.assertEquals(2, processedObjectCount[2]);
+        Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+        Assert.assertEquals(2, stage1.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+    }
+    
+    private class TestStage extends BaseStage {
+        private int index;
+        
+        public TestStage(int index) {
+            this.index = index;
+        }
+        
+        public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+            processedObjectCount[index]++;
+            super.process(obj);
+        }
+    }
+    
+    private class FaultingTestStage extends BaseStage {
+        private int index;
+        private int counter;
+        
+        public FaultingTestStage(int index) {
+            this.index = index;
+        }
+        
+        public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+            if (++counter % 2 == 0) throw new StageException("Planned fault.");
+            
+            processedObjectCount[index]++;
+            super.process(obj);
+        }
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 22, 2005, 4:10 PM
+ *
+ * $Log: SingleThreadStageDriverTest.java,v $
+ * Revision 1.4  2005/07/25 22:19:17  kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import junit.framework.*;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * Test cases for SingleThreadStageDriver.
+ *
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
+ */
+public class SingleThreadStageDriverTest extends TestCase {
+    
+    private Pipeline pipeline;
+    private volatile int[] processedObjectCount;
+    
+    public SingleThreadStageDriverTest(String testName) {
+        super(testName);
+    }
+    
+    protected void setUp() throws Exception {
+        this.pipeline = new Pipeline();
+        this.processedObjectCount = new int[3];
+    }
+    
+    protected void tearDown() throws Exception {
+    }
+    
+    public static Test suite() {
+        TestSuite suite = new TestSuite(SingleThreadStageDriverTest.class);
+        
+        return suite;
+    }
+    
+    /**
+     * Integration test for org.apache.commons.pipeline.driver.SingleThreadStageDriver.
+     */
+    public void testRunPipeline() {
+        Stage stage0 = new TestStage(0);
+        this.pipeline.addStage(stage0, new SingleThreadStageDriver());
+        Stage stage1 = new TestStage(1);
+        this.pipeline.addStage(stage1, new SingleThreadStageDriver());
+        Stage stage2 = new TestStage(2);
+        this.pipeline.addStage(stage2, new SingleThreadStageDriver());
+        
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.run();
+        
+        Assert.assertEquals(4, processedObjectCount[0]);
+        Assert.assertEquals(4, processedObjectCount[1]);
+        Assert.assertEquals(4, processedObjectCount[2]);
+        Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage1.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+    }
+    
+    public void testSingleDriverRunPipeline() {
+        StageDriver driver = new SingleThreadStageDriver();
+        Stage stage0 = new TestStage(0);
+        this.pipeline.addStage(stage0, driver);
+        Stage stage1 = new TestStage(1);
+        this.pipeline.addStage(stage1, driver);
+        Stage stage2 = new TestStage(2);
+        this.pipeline.addStage(stage2, driver);
+        
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.run();
+        
+        Assert.assertEquals(4, processedObjectCount[0]);
+        Assert.assertEquals(4, processedObjectCount[1]);
+        Assert.assertEquals(4, processedObjectCount[2]);
+        Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage1.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+    }
+    
+    public void testFaultingPipeline() {
+        Stage stage0 = new TestStage(0);
+        this.pipeline.addStage(stage0, new SingleThreadStageDriver(500, true));
+        Stage stage1 = new FaultingTestStage(1);
+        this.pipeline.addStage(stage1, new SingleThreadStageDriver(500, true));
+        Stage stage2 = new TestStage(2);
+        this.pipeline.addStage(stage2, new SingleThreadStageDriver(500, true));
+        
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.run();
+        
+        Assert.assertEquals(4, processedObjectCount[0]);
+        Assert.assertEquals(2, processedObjectCount[1]);
+        Assert.assertEquals(2, processedObjectCount[2]);
+        Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+        Assert.assertEquals(2, stage1.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+    }
+    
+    public void testSingleDriverFaultingPipeline() {
+        StageDriver driver = new SingleThreadStageDriver(500, true);
+        Stage stage0 = new TestStage(0);
+        this.pipeline.addStage(stage0, driver);
+        Stage stage1 = new FaultingTestStage(1);
+        this.pipeline.addStage(stage1, driver);
+        Stage stage2 = new TestStage(2);
+        this.pipeline.addStage(stage2, driver);
+        
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.enqueue(new Object());
+        this.pipeline.run();
+        
+        Assert.assertEquals(4, processedObjectCount[0]);
+        Assert.assertEquals(2, processedObjectCount[1]);
+        Assert.assertEquals(2, processedObjectCount[2]);
+        Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+        Assert.assertEquals(2, stage1.getMonitor().getErrors().size());
+        Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+    }
+    
+    private class TestStage extends BaseStage {
+        private int index;
+        
+        public TestStage(int index) {
+            this.index = index;
+        }
+        
+        public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+            processedObjectCount[index]++;
+            super.process(obj);
+        }
+    }
+    
+    private class FaultingTestStage extends BaseStage {
+        private int index;
+        private int counter;
+        
+        public FaultingTestStage(int index) {
+            this.index = index;
+        }
+        
+        public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+            if (++counter % 2 == 0) throw new StageException("Planned fault.");
+            
+            processedObjectCount[index]++;
+            super.process(obj);
+        }
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 10:45 AM
+ *
+ * $Log: AddToCollectionStageTest.java,v $
+ * Revision 1.2  2005/07/25 22:19:17  kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.ArrayList;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.stage.AddToCollectionStage;
+import org.apache.commons.pipeline.driver.SingleThreadStageDriver;
+
+
+/**
+ * Test cases for AddToCollectionStage
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class AddToCollectionStageTest extends TestCase {
+    
+        Pipeline pipeline;
+        ArrayList list;
+    
+    public AddToCollectionStageTest(String testName) {
+        super(testName);
+    }
+
+    protected void setUp() throws Exception {
+        pipeline = new Pipeline();
+        list = new ArrayList();
+        AddToCollectionStage stage = new AddToCollectionStage(list);
+        pipeline.addStage(stage, new SingleThreadStageDriver());
+        pipeline.start();
+    }
+
+    protected void tearDown() throws Exception {
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite(AddToCollectionStageTest.class);
+        
+        return suite;
+    }
+
+    /**
+     * Test of process method, of class org.apache.commons.pipeline.stage.AddToCollectionStage.
+     */
+    public void testProcess() throws Exception {
+        Object o = new Object();
+        pipeline.enqueue(o);
+        pipeline.finish();
+        assertEquals(1,list.size());
+        Object pipedO = list.get(0);
+        assertSame(o,pipedO);
+    }
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 9:38 AM
+ * 
+ * $Log: DynamicLookupStaticMethodStageTest.java,v $
+ * Revision 1.2  2005/07/25 22:19:17  kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.ArrayList;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.driver.SingleThreadStageDriver;
+
+/**
+ * Test cases for DynamicLookupStaticMethodStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class DynamicLookupStaticMethodStageTest extends TestCase {
+    
+    Pipeline pipe;
+    ArrayList list;
+    AddToCollectionStage collectionStage;
+    
+    public DynamicLookupStaticMethodStageTest(String testName) {
+        super(testName);
+    }
+
+    protected void setUp() throws Exception {
+        pipe = new Pipeline();
+        list = new ArrayList();
+        collectionStage = new AddToCollectionStage(list, false);
+    }
+
+    protected void tearDown() throws Exception {
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite(DynamicLookupStaticMethodStageTest.class);
+        
+        return suite;
+    }
+
+    /**
+     * Test of newInstance method, of class org.apache.commons.pipeline.stage.DynamicLookupStaticMethodStage.
+     */
+    public void testNewInstance() throws Exception {
+        DynamicLookupStaticMethodStage stage = DynamicLookupStaticMethodStage.newInstance("java.lang.Integer", "toString");
+        assertEquals("toString",stage.getMethodName());
+        assertSame(Integer.class, stage.getMethodClass());
+    }
+
+    /**
+     * Test of process method, of class org.apache.commons.pipeline.stage.DynamicLookupStaticMethodStage.
+     */
+    public void testProcess() throws Exception {
+        DynamicLookupStaticMethodStage methodStage = new DynamicLookupStaticMethodStage(this.getClass(),"runMethod");
+        pipe.addStage(methodStage, new SingleThreadStageDriver());
+        pipe.addStage(collectionStage, new SingleThreadStageDriver());
+        pipe.start();
+        pipe.enqueue("A String");
+        pipe.finish();
+        assertEquals(1,list.size());
+        Object object = list.remove(0);
+        assertNotNull(object);
+        assertTrue(object instanceof String);
+        assertEquals("Ran String Method",object.toString());
+        
+        
+        pipe.start();
+        pipe.enqueue(new Integer(5));
+        pipe.finish();
+        assertEquals(1,list.size());
+        object = list.remove(0);
+        assertTrue(object instanceof String);
+        assertEquals("Ran Integer Method",object.toString());
+    }
+    
+    
+    public static String runMethod(String object){
+        return "Ran String Method";
+    }
+    
+    public static String runMethod(Integer integer){
+        return "Ran Integer Method";
+    }
+    
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 20, 2005, 10:27 AM
+ * 
+ * $Log: HttpFileDownloadStageTest.java,v $
+ * Revision 1.2  2005/07/25 22:19:17  kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+
+
+/**
+ * Test cases for HttpFileDownloadStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class HttpFileDownloadStageTest extends TestCase {
+    
+    Pipeline pipeline;
+    List results;
+    URL fileUrl;
+    URL google;
+    
+    
+    public HttpFileDownloadStageTest(String testName) {
+        super(testName);
+    }
+    
+    protected void setUp() throws Exception {
+        results = new ArrayList();
+        ArrayList<Stage> stages = new ArrayList<Stage>();
+        stages.add(new HttpFileDownloadStage());
+        stages.add(new AddToCollectionStage(results));
+        this.fileUrl = this.getClass().getClassLoader().getResource("http-download.txt");
+        assertNotNull(fileUrl);
+        google = new URL("http://www.google.com");
+        pipeline = new Pipeline(stages);
+    }
+    
+    protected void tearDown() throws Exception {
+    }
+    
+    public static Test suite() {
+        TestSuite suite = new TestSuite(HttpFileDownloadStageTest.class);
+        
+        return suite;
+    }
+    
+    /**
+     * Test of process method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+     */
+//    public void testFileUrlStringProcess() throws Exception {
+//        pipeline.start();
+//        pipeline.enqueue(this.fileUrl.toExternalForm());
+//        pipeline.finish();
+//        assertEquals(1,results.size());
+//        Object o = results.get(0);
+//        assertNotNull(o);
+//        assertTrue(o instanceof File);
+//        File file = (File) o;
+//        
+//        FileReader rf = new FileReader(file);
+//        BufferedReader br = new BufferedReader(rf);
+//        try {
+//            String line = br.readLine();
+//            assertNotNull(line);
+//            assertEquals("This is a test file.",line);
+//        } finally {
+//            rf.close();
+//            br.close();
+//        }
+//        
+//        
+//    }
+//    
+//    /**
+//     * Test of setWorkDir method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+//     */
+//    public void testFileUrlProcess() throws Exception {
+//        pipeline.start();
+//        pipeline.enqueue(this.fileUrl);
+//        pipeline.finish();
+//        assertEquals(1,results.size());
+//        Object o = results.get(0);
+//        assertNotNull(o);
+//        assertTrue(o instanceof File);
+//        File file = (File) o;
+//        
+//        FileReader rf = new FileReader(file);
+//        BufferedReader br = new BufferedReader(rf);
+//        try {
+//            String line = br.readLine();
+//            assertNotNull(line);
+//            assertEquals("This is a test file.",line);
+//        } finally {
+//            rf.close();
+//            br.close();
+//        }
+//        
+//    }
+    
+    /**
+     * Test of process() method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+     */
+    public void testHttpUrlString() throws Exception {
+        pipeline.start();
+        pipeline.enqueue(this.google.toExternalForm());
+        pipeline.finish();
+        assertEquals(1,results.size());
+        Object o = results.get(0);
+        assertNotNull(o);
+        assertTrue(o instanceof File);
+        File file = (File) o;
+        
+        FileReader rf = new FileReader(file);
+        BufferedReader br = new BufferedReader(rf);
+        try {
+            String line = br.readLine();
+            assertNotNull(line);
+            assertTrue("actual line:" + line,line.contains("oogle"));
+        } finally {
+            rf.close();
+            br.close();
+        }
+    }
+    
+    /**
+     * Test of process() method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+     */
+    public void testHttpUrl() throws Exception {
+        pipeline.start();
+        pipeline.enqueue(this.google);
+        pipeline.finish();
+        assertEquals(1,results.size());
+        Object o = results.get(0);
+        assertNotNull(o);
+        assertTrue(o instanceof File);
+        File file = (File) o;
+        
+        FileReader rf = new FileReader(file);
+        BufferedReader br = new BufferedReader(rf);
+        try {
+            String line = br.readLine();
+            assertNotNull(line);
+            assertTrue("actual line:" + line,line.contains("oogle"));
+        } finally {
+            rf.close();
+            br.close();
+        }
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Log: InputStreamLineBreakStageTest.java,v $
+ * Revision 1.2  2005/07/26 18:20:28  tns
+ * apache license.
+ *
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+
+/**
+ *
+ * @author tns
+ */
+public class InputStreamLineBreakStageTest extends TestCase {
+    
+    List results;
+    Pipeline pipe;
+    URL url;
+    
+    public InputStreamLineBreakStageTest(String testName) {
+        super(testName);
+    }
+
+    protected void setUp() throws Exception {
+        url = this.getClass().getClassLoader().getResource("url-input-to-stream-test.txt");
+        assertNotNull(url);
+        
+        URLToInputStreamStage stage1 = new URLToInputStreamStage();
+        InputStreamLineBreakStage stage2 = new InputStreamLineBreakStage();
+        results = new ArrayList();
+        Stage stage3 = new AddToCollectionStage(results);
+        ArrayList stages = new ArrayList();
+        stages.add(stage1);
+        stages.add(stage2);
+        stages.add(stage3);
+        pipe = new Pipeline(stages);
+    }
+
+    protected void tearDown() throws Exception {
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite(InputStreamLineBreakStageTest.class);
+        
+        return suite;
+    }
+
+    /**
+     * Test of process method, of class org.apache.commons.pipeline.stage.InputStreamLineBreakStage.
+     */
+    public void testProcess() throws Exception {
+        pipe.start();
+        pipe.enqueue(url);
+        pipe.finish();
+        
+        assertEquals(3,results.size());
+        String s0 = (String) results.get(0);
+        assertEquals("line 1", s0);
+        String s1 = (String) results.get(1);
+        assertEquals("line 2", s1);
+        String s2 = (String) results.get(2);
+        assertEquals("line 3", s2);
+    }
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 9:38 AM
+ * 
+ * $Log: RunPredefinedStaticMethodStageTest.java,v $
+ * Revision 1.2  2005/07/25 22:19:17  kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.stage.AddToCollectionStage;
+import org.apache.commons.pipeline.stage.RunPredefinedStaticMethodStage;
+import org.apache.commons.pipeline.driver.SimpleStageDriver;
+
+
+/**
+ * Test cases for RunPredefinedStaticMethodStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class RunPredefinedStaticMethodStageTest extends TestCase {
+    
+    Pipeline pipe;
+    ArrayList list;
+    AddToCollectionStage collectionStage;
+    
+    
+    public RunPredefinedStaticMethodStageTest(String testName) {
+        super(testName);
+    }
+    
+    protected void setUp() throws Exception {
+        pipe = new Pipeline();
+        list = new ArrayList();
+        collectionStage = new AddToCollectionStage(list, false);
+    }
+    
+    protected void tearDown() throws Exception {
+    }
+    
+    public static Test suite() {
+        TestSuite suite = new TestSuite(RunPredefinedStaticMethodStageTest.class);
+        
+        return suite;
+    }
+    
+    /**
+     * Test of newInstance method, of class org.apache.commons.pipeline.stage.RunPredefinedStaticMethodStage.
+     */
+    public void testNewInstance() throws Exception {
+        RunPredefinedStaticMethodStage stage = RunPredefinedStaticMethodStage.newInstance("java.lang.Integer", "valueOf", "java.lang.String");
+        Method method = stage.getMethod();
+        assertSame(Integer.class,method.getDeclaringClass());
+        Class[] params = method.getParameterTypes();
+        assertNotNull(params);
+        assertEquals(1,params.length);
+        assertSame(String.class, params[0]);
+        assertEquals("valueOf",method.getName());
+    }
+    
+    /**
+     * Test of process() method, of class org.apache.commons.pipeline.stage.RunPredefinedStaticMethodStage.
+     */
+    public void testProcess() throws Exception {
+        Class integerClass = Integer.class;
+        Method method = integerClass.getMethod("valueOf", String.class);
+        RunPredefinedStaticMethodStage stage = new RunPredefinedStaticMethodStage(method);
+        pipe.addStage(stage, new SimpleStageDriver());
+        pipe.addStage(collectionStage, new SimpleStageDriver());
+        pipe.start();
+        pipe.enqueue("5");
+        pipe.finish();
+        
+        assertEquals(1,list.size());
+        Object o = list.get(0);
+        assertNotNull(o);
+        assertTrue(o instanceof Integer);
+        Integer i = (Integer)o;
+        assertEquals(5,i.intValue());
+    }
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 21, 2005, 10:07 AM
+ *
+ * $Log: URLToInputStreamStageTest.java,v $
+ * Revision 1.4  2005/07/25 22:19:17  kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import junit.framework.*;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+
+
+/**
+ * Test cases for URLToInputStreamStaticStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class URLToInputStreamStageTest extends TestCase {
+    
+    URL url;
+    URLToInputStreamStage stage;
+    Pipeline pipe;
+    List<InputStream> results;
+    
+    public URLToInputStreamStageTest(String testName) {
+        super(testName);
+    }
+    
+    protected void setUp() throws Exception {
+        url = this.getClass().getClassLoader().getResource("url-input-to-stream-test.txt");
+        assertNotNull(url);
+        results = new ArrayList<InputStream>();
+        Stage finalStage = new AddToCollectionStage<InputStream>(results);
+        stage = new URLToInputStreamStage();
+        List<Stage> stages = new ArrayList<Stage>();
+        stages.add(stage);
+        stages.add(finalStage);
+        pipe = new Pipeline(stages);
+    }
+    
+    protected void tearDown() throws Exception {
+    }
+    
+    public static Test suite() {
+        TestSuite suite = new TestSuite(URLToInputStreamStageTest.class);
+        
+        return suite;
+    }
+    
+    /**
+     * Test of process method, of class org.apache.commons.pipeline.stage.URLToInputStreamStage.
+     */
+    public void testProcess() throws Exception {
+        pipe.start();
+        pipe.enqueue(url);
+        assertEquals(1,results.size());
+        InputStream is = results.get(0);
+        assertNotNull(is);
+        byte[] buffer = new byte[128];
+        int bytes = is.read(buffer);
+        pipe.finish();
+    }
+    
+    /**
+     * Test of postprocess method, of class org.apache.commons.pipeline.stage.URLToInputStreamStage.
+     */
+    public void testPostprocess() throws Exception {
+        pipe.start();
+        pipe.enqueue(url);
+        pipe.finish();
+        InputStream is = results.get(0);
+        try {
+            byte[] buffer = new byte[128];
+            int bytes = is.read(buffer);
+            fail("input stream should have been closed, so reading should throw an exception.");
+        } catch (IOException expected){
+            
+        }
+        
+    }
+    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java
------------------------------------------------------------------------------
    svn:keywords = Id



---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org