You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by sk...@apache.org on 2005/07/27 08:38:10 UTC
svn commit: r225469 - in /jakarta/commons/sandbox/pipeline/trunk/src:
java/org/apache/commons/pipeline/driver/
java/org/apache/commons/pipeline/stage/ test/conf/
test/java/org/apache/commons/pipeline/driver/
test/java/org/apache/commons/pipeline/stage/
Author: skitching
Date: Tue Jul 26 23:37:44 2005
New Revision: 225469
URL: http://svn.apache.org/viewcvs?rev=225469&view=rev
Log:
Initial import of additional Stage implementations and related tests
Added:
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html
jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html
jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java (with props)
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java (with props)
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 4:34 PM
+ *
+ * $Log: AbstractStageMonitor.java,v $
+ * Revision 1.3 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.2 2005/07/22 23:21:35 kjn
+ * Added stage parameter to constructor to simplify callbacks for those monitors
+ * that need to access the Stage.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageMonitor;
+import org.apache.commons.pipeline.StageMonitor.State;
+
+/**
+ * Provades an abstract {@link StageMonitor} base class that implements methods
+ * that do not need to be synchronized.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public abstract class AbstractStageMonitor implements StageMonitor {
+ private static final Log log = LogFactory.getLog(AbstractStageMonitor.class);
+
+ protected Stage stage;
+ protected List<Throwable> errors = new ArrayList<Throwable>();
+ protected State state = State.STOPPED;
+
+ /**
+ * Creates a new instance of AbstractStageMonitor
+ */
+ public AbstractStageMonitor(Stage stage) {
+ this.stage = stage;
+ }
+
+ /**
+ *
+ */
+ public final void preprocessFailed(Throwable fault) {
+ this.state = State.ERROR;
+ log.fatal("Error in preprocessing caused abort.", fault);
+ this.errors.add(fault);
+ }
+
+ /**
+ * Monitors handler failures.
+ *
+ * @param data the data that was being processed as the fault occurred
+ * @param fault the faulting exception
+ */
+ public final void processingFailed( Object data, Throwable fault) {
+ log.error("Processing error on data object " + data, fault);
+ this.errors.add(fault);
+ }
+
+ /**
+ *
+ */
+ public final void postprocessFailed(Throwable fault) {
+ this.state = State.ERROR;
+ log.fatal("Error in postprocessing caused abort.", fault);
+ this.errors.add(fault);
+ }
+
+ /**
+ * Monitors driver thread interruption failures.
+ *
+ * @param fault the faulting exception
+ */
+ public final void driverFailed( InterruptedException fault ) {
+ this.state = State.ERROR;
+ this.errors.add(fault);
+ }
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/AbstractStageMonitor.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 4:26 PM
+ *
+ * $Log: SimpleStageDriver.java,v $
+ * Revision 1.4 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.3 2005/07/22 23:22:51 kjn
+ * Changes to reflect changes in StageDriver base class, consolidation with
+ * SimpleStageMonitor code to eliminate unnecessary public class.
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import java.util.Collections;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.StageMonitor;
+
+/**
+ * This is a non-threaded version of the StageDriver.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class SimpleStageDriver extends StageDriver {
+
+ private static final Log log = LogFactory.getLog(SimpleStageDriver.class);
+
+ /** Creates a new instance of SimpleStageDriver */
+ public SimpleStageDriver() {
+ super();
+ }
+
+ /** Sets a new monitor on the stage and notifies the monitor we have started */
+ protected void startInternal(Stage stage) throws StageException {
+ stage.preprocess();
+ stage.getMonitor().driverStarted();
+
+ try {
+ Object o = stage.poll();
+ while (o != null){
+ stage.process(o);
+ o = stage.poll();
+ }
+ } catch (Exception e) {
+ stage.release();
+ throw new StageException(e);
+ }
+ }
+
+ /** Notify the monitor that we have stopped */
+ protected void finishInternal(Stage stage) throws StageException {
+ try {
+ stage.postprocess();
+ } finally {
+ stage.release();
+ }
+ }
+
+ /**
+ * Factory method for StageMonitor that works with this driver.
+ */
+ protected StageMonitor createStageMonitor(Stage stage) {
+ return new AbstractStageMonitor(stage) {
+ /**
+ * StageDriver has been requested to start stage processing.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#STARTING}.
+ */
+ public void startRequested() {
+ if (this.state == State.STOPPED) this.state = State.STARTING;
+ }
+
+ /**
+ * StageDriver has started execution.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#RUNNING}.
+ */
+ public void driverStarted() {
+ if (this.state == State.STOPPED || this.state == State.STARTING) this.state = State.RUNNING;
+ }
+
+ /**
+ * StageDriver has been requested to halt stage processing.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#STOPPING}.
+ */
+ public void stopRequested() {
+ this.state = State.STOP_REQUESTED;
+ }
+
+ /**
+ * StageDriver has finished execution.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#STOPPED}.
+ */
+ public void driverStopped() {
+ this.state = State.STOPPED;
+ }
+
+ /** Returns the state */
+ public org.apache.commons.pipeline.StageMonitor.State getState() {
+ return this.state;
+ }
+
+ /** Returns all errors */
+ public java.util.List<Throwable> getErrors() {
+ return Collections.unmodifiableList(this.errors);
+ }
+
+ public void enqueueOccurred() {
+ if (this.state != State.RUNNING) {
+ try {
+ this.stage.getStageDriver().start(this.stage);
+ } catch (StageException e){
+ this.preprocessFailed(e);
+ }
+ } else {
+ Object object = stage.poll();
+ try {
+ stage.process(object);
+ } catch (StageException e){
+ this.processingFailed(object, e);
+ }
+ }
+ }
+ };
+ }
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SimpleStageDriver.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 18, 2005, 4:30 PM
+ *
+ * $Log: AddToCollectionStage.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.1 2005/07/21 17:41:19 tns
+ * Added Simple Driver, made some pipeline modifications and added some stages.
+ *
+ * Revision 1.4 2005/07/21 17:14:04 kjn
+ * Changed to use commons-logging
+ *
+ * Revision 1.3 2005/07/19 22:27:04 kjn
+ * Fixed javadocs
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.commons.pipeline.BaseStage;
+
+/**
+ * This is a simple stage in the pipeline which will add the object to the
+ * specified collection.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class AddToCollectionStage<T> extends BaseStage {
+
+ /**
+ * Holds value of property collection.
+ */
+ private Collection<T> collection;
+
+ /**
+ * Creates a new instance of AddToCollectionStage. This constructor
+ * will synchronized the collection by default.
+ */
+ public AddToCollectionStage(Collection<T> collection) {
+ this(collection, true);
+ }
+
+ /**
+ * Creates a new instance of AddToCollectionStage.
+ * @param collection The collection in which to add objects to
+ * @param synchronized A flag value that determines whether or not accesses
+ * to the underlying collection are synchronized.
+ */
+ public AddToCollectionStage(Collection<T> collection, boolean synchronize) {
+ if (collection == null){
+ throw new IllegalArgumentException("Argument 'collection' can not be null.");
+ }
+
+ this.collection = synchronize ? Collections.synchronizedCollection(collection) : collection;
+ }
+
+ /**
+ * Adds the object to the underlying collection.
+ *
+ * @throws ClassCastException if the object is not of a suitable type to be added
+ * to the collection.
+ */
+ public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+ this.collection.add((T) obj);
+ this.exqueue(obj);
+ }
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/AddToCollectionStage.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 9:25 AM
+ *
+ * $Log: DynamicLookupStaticMethodStage.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * <p>Provide this Stage with a class and a static method name and it will dynamically
+ * look up the appropriate method to call based on the object type. If the
+ * object type is an array, it will assume that the method that needs to be called
+ * contains the method signature as described by the objects in the array.
+ * The object returned from the method call will be exqueued.</p>
+ *
+ * <p>The resulting object will be exqueued on the main pipeline if it is not null. If
+ * it is null, we will try to place the original object on the branch specified
+ * by the nullResultBranchTag property. The default for this value is "nullResult".</p>
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class DynamicLookupStaticMethodStage extends BaseStage {
+
+ // Branch upon which the original objects will be enqueued if the defined
+ // Method returned a null result.
+ private String nullResultBranchTag = "nullResult";
+
+ // Name of the method to call to process the object.
+ private String methodName;
+
+ // Class containing the method.
+ private Class clazz;
+
+ /**
+ * Creates a new instance of DynamicLookupStaticMethodStage
+ *
+ * @param clazz The class that defines the static method that will be used to
+ * process objects.
+ * @param methodName The name of the method. This method may be overloaded.
+ */
+ public DynamicLookupStaticMethodStage(Class clazz, String methodName) {
+ super();
+ if (clazz == null){
+ throw new IllegalArgumentException("Argument 'clazz' can not be null.");
+ }
+ if (methodName == null){
+ throw new IllegalArgumentException("Argument 'methodName' can not be null.");
+ }
+ this.clazz = clazz;
+ this.methodName = methodName;
+ }
+
+ /**
+ * Creates a new DynamicLookupStaticMethodStage for the specified static
+ * method.
+ *
+ * @param className The fully qualified class name of the class in which the
+ * static method that will be used to process objects is defined.
+ * @param methodName The name of the method. This method may be overloaded.
+ */
+ public static DynamicLookupStaticMethodStage newInstance(String className, String methodName) throws ClassNotFoundException {
+ Class clazz = DynamicLookupStaticMethodStage.class.getClassLoader().loadClass(className);
+ return new DynamicLookupStaticMethodStage(clazz, methodName);
+ }
+
+ /** Returns the name of the method we are using */
+ public String getMethodName(){
+ return this.methodName;
+ }
+
+ /** Returns the class we are using */
+ public Class getMethodClass(){
+ return this.clazz;
+ }
+
+ /**
+ * <p>Finds the appropriate method overloading for the method specified
+ * by {@link #getMethodName() methodName}, calls it to process the object, and exqueues
+ * any returned object. If the returned object is null, the original object
+ * is enqueued on the branch specified by the nullResultBranchTag property.</p>
+ *
+ * @param obj The object to process.
+ */
+ public void process(Object obj) throws StageException {
+ try {
+ Method method = null;
+ if (obj.getClass().isArray()){
+ Object[] objs = (Object[]) obj;
+ Class[] classes = new Class[objs.length];
+ for (int i = 0; i < objs.length; i++){
+ classes[i] = objs[i].getClass();
+ }
+ method = this.clazz.getMethod(methodName, classes);
+ } else {
+ method = this.clazz.getMethod(methodName, obj.getClass());
+ }
+
+ Object returnObj = method.invoke(null, obj);
+ if (returnObj != null){
+ this.exqueue(returnObj);
+ } else {
+ this.exqueue("nullResult", obj);
+ }
+ } catch (NoSuchMethodException e){
+ throw new StageException("No method",e);
+ } catch (IllegalAccessException e){
+ throw new StageException("Illegal Access",e);
+ } catch (InvocationTargetException e){
+ throw new StageException("Invocation",e);
+ }
+ }
+
+ /**
+ * Getter for property nullResultBranchTag. The default value is "nullResult".
+ * @return Value of property nullResultBranchTag.
+ */
+ public String getNullResultBranchTag() {
+ return this.nullResultBranchTag;
+ }
+
+ /**
+ * Setter for property nullResultBranchTag.
+ * @param nullResultBranchTag New value of property nullResultBranchTag.
+ */
+ public void setNullResultBranchTag(String nullResultBranchTag) {
+ this.nullResultBranchTag = nullResultBranchTag;
+ }
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStage.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Log: InputStreamLineBreakStage.java,v $
+ * Revision 1.2 2005/07/26 18:20:28 tns
+ * apache license.
+ *
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * Breaks up an InputStream by line and exqueues the line as a string.
+ *
+ * @author tns
+ * @version $Id$
+ */
+public class InputStreamLineBreakStage extends BaseStage {
+
+ /** Creates a new instance of InputStreamLineBreakStage */
+ public InputStreamLineBreakStage() {
+ super();
+ }
+
+ public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+ InputStream is = (InputStream) obj;
+ try {
+ InputStreamReader reader = new InputStreamReader(is);
+ BufferedReader buffered = new BufferedReader(reader);
+ String line = buffered.readLine();
+ while (line != null){
+ this.exqueue(line);
+ line = buffered.readLine();
+ }
+ } catch (IOException e){
+ throw new StageException(e);
+ }
+
+ }
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStage.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 9:05 AM
+ *
+ * $Log: RunPredefinedStaticMethodStage.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * Runs a static method with the object being processed. The returned object
+ * will be exqueued on the main pipeline if it is not null. If the returned
+ * object is null, this stage will attempt to place the original object on the
+ * branch specified by {@link #setNullResultBranchTag(String)}.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class RunPredefinedStaticMethodStage extends BaseStage {
+
+ // Branch upon which the original objects will be enqueued if the defined
+ // Method returned a null result.
+ private String nullResultBranchTag = "nullResult";
+
+ // Method used to process objects in the queue
+ private Method method;
+
+ /**
+ * Creates a new instance of RunPredefinedStaticMethodStage
+ */
+ public RunPredefinedStaticMethodStage(Method method) {
+ super();
+ this.method = method;
+ }
+
+ /**
+ * Returns the Method object for the method that will be used to process
+ * objects in the queue.
+ */
+ public Method getMethod(){
+ return this.method;
+ }
+
+ /**
+ * Convenience method to create the new stage with String description of className, methodName and argumentType
+ *
+ * @param className The fully qualified class name, such as "java.lang.String" of the class in which the method resides
+ * @param methodName The name of the method
+ * @param argumentType The argument type of the method (Sorry, this doesn't support multiple argument methods)
+ */
+ public static RunPredefinedStaticMethodStage newInstance(String className, String methodName, String argumentType) throws ClassNotFoundException, NoSuchMethodException {
+ Class clazz = RunPredefinedStaticMethodStage.class.getClassLoader().loadClass(className);
+ Class argumentClass = RunPredefinedStaticMethodStage.class.getClassLoader().loadClass(argumentType);
+ return new RunPredefinedStaticMethodStage(clazz.getMethod(methodName, argumentClass));
+ }
+
+ /**
+ * <p>Calls the defined static method and exqueues the returned object if it is
+ * not null, otherwise placing the original object on the branch specified
+ * by the nullResultBranchTag property.</p>
+ *
+ * @param obj The object to be processed.
+ */
+ public void process(Object obj) throws StageException {
+ try {
+ Object returnObj = this.method.invoke(null, obj);
+ if (returnObj != null){
+ this.exqueue(returnObj);
+ } else {
+ this.exqueue(nullResultBranchTag, obj);
+ }
+ } catch (IllegalAccessException e){
+ throw new StageException("Illegal Access",e);
+ } catch (InvocationTargetException e){
+ throw new StageException("Invocation",e);
+ }
+ }
+
+ /**
+ * Getter for property nullResultBranchTag. The default value is "nullResult".
+ * @return Value of property nullResultBranchTag.
+ */
+ public String getNullResultBranchTag() {
+ return this.nullResultBranchTag;
+ }
+
+ /**
+ * Setter for property nullResultBranchTag.
+ * @param nullResultBranchTag New value of property nullResultBranchTag.
+ */
+ public void setNullResultBranchTag(String nullResultBranchTag) {
+ this.nullResultBranchTag = nullResultBranchTag;
+ }
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStage.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 21, 2005, 9:55 AM
+ *
+ * $Log: URLToInputStreamStage.java,v $
+ * Revision 1.4 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * Converts a URL into an InputStream. This stage keeps track of all
+ * input streams that are created and closes them at the release step.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class URLToInputStreamStage extends BaseStage {
+
+ private static final Log log = LogFactory.getLog(URLToInputStreamStage.class);
+ private List<InputStream> inputStreams = new ArrayList<InputStream>();
+
+ /** Creates a new instance of URLToInputStreamStage */
+ public URLToInputStreamStage() { }
+
+ /** Creates a new instance of URLToInputStreamStage */
+ public URLToInputStreamStage(Queue<Object> queue) {
+ super(queue);
+ }
+
+ /**
+ * Takes a String or a URL object representing a URL and exqueues the input
+ * stream returned by opening that URL.
+ *
+ * @param obj A String or URL object
+ */
+ public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+ URL url = null;
+ if (obj instanceof URL){
+ url = (URL) obj;
+ } else if (obj instanceof String) {
+ String urlString = (String) obj;
+ try {
+ url = new URL(urlString);
+ } catch (MalformedURLException e){
+ throw new StageException("Error converting url String:" + urlString,e);
+ }
+ }
+
+ try {
+ InputStream inputStream = url.openStream();
+ this.inputStreams.add(inputStream);
+ log.info("enqueing input stream");
+ this.exqueue(inputStream);
+ } catch (IOException e){
+ throw new StageException("Error with stream from url:" + url,e);
+ }
+ }
+
+ /**
+ * Ensure that all opened input streams are closed.
+ */
+ public void release() {
+ log.info("running post process number of streams:" + inputStreams.size());
+ while(inputStreams.size() > 0){
+ InputStream is = (InputStream) inputStreams.remove(0);
+ try {
+ is.close();
+ log.info("closed stream");
+ } catch (IOException e){
+ log.warn("Error closing stream",e);
+ }
+ }
+ }
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/URLToInputStreamStage.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/package.html Tue Jul 26 23:37:44 2005
@@ -0,0 +1,10 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+
+<html>
+ <head>
+ <title></title>
+ </head>
+ <body>
+ A few simple Stage and StageQueue implementations for common use cases.
+ </body>
+</html>
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt Tue Jul 26 23:37:44 2005
@@ -0,0 +1 @@
+This is a test file.
\ No newline at end of file
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-download.txt
------------------------------------------------------------------------------
svn:eol-style = native
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/http-file-download.html Tue Jul 26 23:37:44 2005
@@ -0,0 +1,10 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+
+<html>
+ <head>
+ <title></title>
+ </head>
+ <body>
+
+ </body>
+</html>
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt Tue Jul 26 23:37:44 2005
@@ -0,0 +1,3 @@
+line 1
+line 2
+line 3
\ No newline at end of file
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/url-input-to-stream-test.txt
------------------------------------------------------------------------------
svn:eol-style = native
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 22, 2005, 4:10 PM
+ *
+ * $Log: SimpleStageDriverTest.java,v $
+ * Revision 1.3 2005/07/25 22:19:17 kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import junit.framework.*;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+
+
+/**
+ * Test cases for SimpleStageDriver.
+ *
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
+ */
+public class SimpleStageDriverTest extends TestCase {
+
+ private Pipeline pipeline;
+ private volatile int[] processedObjectCount;
+
+ public SimpleStageDriverTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ this.pipeline = new Pipeline();
+ this.processedObjectCount = new int[3];
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(SimpleStageDriverTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Integration test for org.apache.commons.pipeline.driver.SimpleStageDriver.
+ */
+ public void testRunPipeline() {
+ StageDriver driver = new SimpleStageDriver();
+ Stage stage0 = new TestStage(0);
+ this.pipeline.addStage(stage0, driver);
+ Stage stage1 = new TestStage(1);
+ this.pipeline.addStage(stage1, driver);
+ Stage stage2 = new TestStage(2);
+ this.pipeline.addStage(stage2, driver);
+
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.run();
+
+ Assert.assertEquals(4, processedObjectCount[0]);
+ Assert.assertEquals(4, processedObjectCount[1]);
+ Assert.assertEquals(4, processedObjectCount[2]);
+ Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage1.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+ }
+
+ public void testFaultingPipeline() {
+ StageDriver driver = new SimpleStageDriver();
+ Stage stage0 = new TestStage(0);
+ this.pipeline.addStage(stage0, driver);
+ Stage stage1 = new FaultingTestStage(1);
+ this.pipeline.addStage(stage1, driver);
+ Stage stage2 = new TestStage(2);
+ this.pipeline.addStage(stage2, driver);
+
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.run();
+
+ Assert.assertEquals(4, processedObjectCount[0]);
+ Assert.assertEquals(2, processedObjectCount[1]);
+ Assert.assertEquals(2, processedObjectCount[2]);
+ Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+ Assert.assertEquals(2, stage1.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+ }
+
+ private class TestStage extends BaseStage {
+ private int index;
+
+ public TestStage(int index) {
+ this.index = index;
+ }
+
+ public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+ processedObjectCount[index]++;
+ super.process(obj);
+ }
+ }
+
+ private class FaultingTestStage extends BaseStage {
+ private int index;
+ private int counter;
+
+ public FaultingTestStage(int index) {
+ this.index = index;
+ }
+
+ public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+ if (++counter % 2 == 0) throw new StageException("Planned fault.");
+
+ processedObjectCount[index]++;
+ super.process(obj);
+ }
+ }
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SimpleStageDriverTest.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 22, 2005, 4:10 PM
+ *
+ * $Log: SingleThreadStageDriverTest.java,v $
+ * Revision 1.4 2005/07/25 22:19:17 kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.driver;
+
+import junit.framework.*;
+import org.apache.commons.pipeline.BaseStage;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+import org.apache.commons.pipeline.StageDriver;
+import org.apache.commons.pipeline.StageException;
+
+/**
+ * Test cases for SingleThreadStageDriver.
+ *
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
+ */
+public class SingleThreadStageDriverTest extends TestCase {
+
+ private Pipeline pipeline;
+ private volatile int[] processedObjectCount;
+
+ public SingleThreadStageDriverTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ this.pipeline = new Pipeline();
+ this.processedObjectCount = new int[3];
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(SingleThreadStageDriverTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Integration test for org.apache.commons.pipeline.driver.SingleThreadStageDriver.
+ */
+ public void testRunPipeline() {
+ Stage stage0 = new TestStage(0);
+ this.pipeline.addStage(stage0, new SingleThreadStageDriver());
+ Stage stage1 = new TestStage(1);
+ this.pipeline.addStage(stage1, new SingleThreadStageDriver());
+ Stage stage2 = new TestStage(2);
+ this.pipeline.addStage(stage2, new SingleThreadStageDriver());
+
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.run();
+
+ Assert.assertEquals(4, processedObjectCount[0]);
+ Assert.assertEquals(4, processedObjectCount[1]);
+ Assert.assertEquals(4, processedObjectCount[2]);
+ Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage1.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+ }
+
+ public void testSingleDriverRunPipeline() {
+ StageDriver driver = new SingleThreadStageDriver();
+ Stage stage0 = new TestStage(0);
+ this.pipeline.addStage(stage0, driver);
+ Stage stage1 = new TestStage(1);
+ this.pipeline.addStage(stage1, driver);
+ Stage stage2 = new TestStage(2);
+ this.pipeline.addStage(stage2, driver);
+
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.run();
+
+ Assert.assertEquals(4, processedObjectCount[0]);
+ Assert.assertEquals(4, processedObjectCount[1]);
+ Assert.assertEquals(4, processedObjectCount[2]);
+ Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage1.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+ }
+
+ public void testFaultingPipeline() {
+ Stage stage0 = new TestStage(0);
+ this.pipeline.addStage(stage0, new SingleThreadStageDriver(500, true));
+ Stage stage1 = new FaultingTestStage(1);
+ this.pipeline.addStage(stage1, new SingleThreadStageDriver(500, true));
+ Stage stage2 = new TestStage(2);
+ this.pipeline.addStage(stage2, new SingleThreadStageDriver(500, true));
+
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.run();
+
+ Assert.assertEquals(4, processedObjectCount[0]);
+ Assert.assertEquals(2, processedObjectCount[1]);
+ Assert.assertEquals(2, processedObjectCount[2]);
+ Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+ Assert.assertEquals(2, stage1.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+ }
+
+ public void testSingleDriverFaultingPipeline() {
+ StageDriver driver = new SingleThreadStageDriver(500, true);
+ Stage stage0 = new TestStage(0);
+ this.pipeline.addStage(stage0, driver);
+ Stage stage1 = new FaultingTestStage(1);
+ this.pipeline.addStage(stage1, driver);
+ Stage stage2 = new TestStage(2);
+ this.pipeline.addStage(stage2, driver);
+
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.enqueue(new Object());
+ this.pipeline.run();
+
+ Assert.assertEquals(4, processedObjectCount[0]);
+ Assert.assertEquals(2, processedObjectCount[1]);
+ Assert.assertEquals(2, processedObjectCount[2]);
+ Assert.assertEquals(0, stage0.getMonitor().getErrors().size());
+ Assert.assertEquals(2, stage1.getMonitor().getErrors().size());
+ Assert.assertEquals(0, stage2.getMonitor().getErrors().size());
+ }
+
+ private class TestStage extends BaseStage {
+ private int index;
+
+ public TestStage(int index) {
+ this.index = index;
+ }
+
+ public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+ processedObjectCount[index]++;
+ super.process(obj);
+ }
+ }
+
+ private class FaultingTestStage extends BaseStage {
+ private int index;
+ private int counter;
+
+ public FaultingTestStage(int index) {
+ this.index = index;
+ }
+
+ public void process(Object obj) throws org.apache.commons.pipeline.StageException {
+ if (++counter % 2 == 0) throw new StageException("Planned fault.");
+
+ processedObjectCount[index]++;
+ super.process(obj);
+ }
+ }
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/SingleThreadStageDriverTest.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 10:45 AM
+ *
+ * $Log: AddToCollectionStageTest.java,v $
+ * Revision 1.2 2005/07/25 22:19:17 kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.ArrayList;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.stage.AddToCollectionStage;
+import org.apache.commons.pipeline.driver.SingleThreadStageDriver;
+
+
+/**
+ * Test cases for AddToCollectionStage
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class AddToCollectionStageTest extends TestCase {
+
+ Pipeline pipeline;
+ ArrayList list;
+
+ public AddToCollectionStageTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ pipeline = new Pipeline();
+ list = new ArrayList();
+ AddToCollectionStage stage = new AddToCollectionStage(list);
+ pipeline.addStage(stage, new SingleThreadStageDriver());
+ pipeline.start();
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(AddToCollectionStageTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of process method, of class org.apache.commons.pipeline.stage.AddToCollectionStage.
+ */
+ public void testProcess() throws Exception {
+ Object o = new Object();
+ pipeline.enqueue(o);
+ pipeline.finish();
+ assertEquals(1,list.size());
+ Object pipedO = list.get(0);
+ assertSame(o,pipedO);
+ }
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/AddToCollectionStageTest.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 9:38 AM
+ *
+ * $Log: DynamicLookupStaticMethodStageTest.java,v $
+ * Revision 1.2 2005/07/25 22:19:17 kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.ArrayList;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.driver.SingleThreadStageDriver;
+
+/**
+ * Test cases for DynamicLookupStaticMethodStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class DynamicLookupStaticMethodStageTest extends TestCase {
+
+ Pipeline pipe;
+ ArrayList list;
+ AddToCollectionStage collectionStage;
+
+ public DynamicLookupStaticMethodStageTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ pipe = new Pipeline();
+ list = new ArrayList();
+ collectionStage = new AddToCollectionStage(list, false);
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(DynamicLookupStaticMethodStageTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of newInstance method, of class org.apache.commons.pipeline.stage.DynamicLookupStaticMethodStage.
+ */
+ public void testNewInstance() throws Exception {
+ DynamicLookupStaticMethodStage stage = DynamicLookupStaticMethodStage.newInstance("java.lang.Integer", "toString");
+ assertEquals("toString",stage.getMethodName());
+ assertSame(Integer.class, stage.getMethodClass());
+ }
+
+ /**
+ * Test of process method, of class org.apache.commons.pipeline.stage.DynamicLookupStaticMethodStage.
+ */
+ public void testProcess() throws Exception {
+ DynamicLookupStaticMethodStage methodStage = new DynamicLookupStaticMethodStage(this.getClass(),"runMethod");
+ pipe.addStage(methodStage, new SingleThreadStageDriver());
+ pipe.addStage(collectionStage, new SingleThreadStageDriver());
+ pipe.start();
+ pipe.enqueue("A String");
+ pipe.finish();
+ assertEquals(1,list.size());
+ Object object = list.remove(0);
+ assertNotNull(object);
+ assertTrue(object instanceof String);
+ assertEquals("Ran String Method",object.toString());
+
+
+ pipe.start();
+ pipe.enqueue(new Integer(5));
+ pipe.finish();
+ assertEquals(1,list.size());
+ object = list.remove(0);
+ assertTrue(object instanceof String);
+ assertEquals("Ran Integer Method",object.toString());
+ }
+
+
+ public static String runMethod(String object){
+ return "Ran String Method";
+ }
+
+ public static String runMethod(Integer integer){
+ return "Ran Integer Method";
+ }
+
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/DynamicLookupStaticMethodStageTest.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 20, 2005, 10:27 AM
+ *
+ * $Log: HttpFileDownloadStageTest.java,v $
+ * Revision 1.2 2005/07/25 22:19:17 kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+
+
+/**
+ * Test cases for HttpFileDownloadStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class HttpFileDownloadStageTest extends TestCase {
+
+ Pipeline pipeline;
+ List results;
+ URL fileUrl;
+ URL google;
+
+
+ public HttpFileDownloadStageTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ results = new ArrayList();
+ ArrayList<Stage> stages = new ArrayList<Stage>();
+ stages.add(new HttpFileDownloadStage());
+ stages.add(new AddToCollectionStage(results));
+ this.fileUrl = this.getClass().getClassLoader().getResource("http-download.txt");
+ assertNotNull(fileUrl);
+ google = new URL("http://www.google.com");
+ pipeline = new Pipeline(stages);
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(HttpFileDownloadStageTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of process method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+ */
+// public void testFileUrlStringProcess() throws Exception {
+// pipeline.start();
+// pipeline.enqueue(this.fileUrl.toExternalForm());
+// pipeline.finish();
+// assertEquals(1,results.size());
+// Object o = results.get(0);
+// assertNotNull(o);
+// assertTrue(o instanceof File);
+// File file = (File) o;
+//
+// FileReader rf = new FileReader(file);
+// BufferedReader br = new BufferedReader(rf);
+// try {
+// String line = br.readLine();
+// assertNotNull(line);
+// assertEquals("This is a test file.",line);
+// } finally {
+// rf.close();
+// br.close();
+// }
+//
+//
+// }
+//
+// /**
+// * Test of setWorkDir method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+// */
+// public void testFileUrlProcess() throws Exception {
+// pipeline.start();
+// pipeline.enqueue(this.fileUrl);
+// pipeline.finish();
+// assertEquals(1,results.size());
+// Object o = results.get(0);
+// assertNotNull(o);
+// assertTrue(o instanceof File);
+// File file = (File) o;
+//
+// FileReader rf = new FileReader(file);
+// BufferedReader br = new BufferedReader(rf);
+// try {
+// String line = br.readLine();
+// assertNotNull(line);
+// assertEquals("This is a test file.",line);
+// } finally {
+// rf.close();
+// br.close();
+// }
+//
+// }
+
+ /**
+ * Test of process() method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+ */
+ public void testHttpUrlString() throws Exception {
+ pipeline.start();
+ pipeline.enqueue(this.google.toExternalForm());
+ pipeline.finish();
+ assertEquals(1,results.size());
+ Object o = results.get(0);
+ assertNotNull(o);
+ assertTrue(o instanceof File);
+ File file = (File) o;
+
+ FileReader rf = new FileReader(file);
+ BufferedReader br = new BufferedReader(rf);
+ try {
+ String line = br.readLine();
+ assertNotNull(line);
+ assertTrue("actual line:" + line,line.contains("oogle"));
+ } finally {
+ rf.close();
+ br.close();
+ }
+ }
+
+ /**
+ * Test of process() method, of class org.apache.commons.pipeline.stage.HttpFileDownloadStage.
+ */
+ public void testHttpUrl() throws Exception {
+ pipeline.start();
+ pipeline.enqueue(this.google);
+ pipeline.finish();
+ assertEquals(1,results.size());
+ Object o = results.get(0);
+ assertNotNull(o);
+ assertTrue(o instanceof File);
+ File file = (File) o;
+
+ FileReader rf = new FileReader(file);
+ BufferedReader br = new BufferedReader(rf);
+ try {
+ String line = br.readLine();
+ assertNotNull(line);
+ assertTrue("actual line:" + line,line.contains("oogle"));
+ } finally {
+ rf.close();
+ br.close();
+ }
+ }
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/HttpFileDownloadStageTest.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Log: InputStreamLineBreakStageTest.java,v $
+ * Revision 1.2 2005/07/26 18:20:28 tns
+ * apache license.
+ *
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+
+/**
+ *
+ * @author tns
+ */
+public class InputStreamLineBreakStageTest extends TestCase {
+
+ List results;
+ Pipeline pipe;
+ URL url;
+
+ public InputStreamLineBreakStageTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ url = this.getClass().getClassLoader().getResource("url-input-to-stream-test.txt");
+ assertNotNull(url);
+
+ URLToInputStreamStage stage1 = new URLToInputStreamStage();
+ InputStreamLineBreakStage stage2 = new InputStreamLineBreakStage();
+ results = new ArrayList();
+ Stage stage3 = new AddToCollectionStage(results);
+ ArrayList stages = new ArrayList();
+ stages.add(stage1);
+ stages.add(stage2);
+ stages.add(stage3);
+ pipe = new Pipeline(stages);
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(InputStreamLineBreakStageTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of process method, of class org.apache.commons.pipeline.stage.InputStreamLineBreakStage.
+ */
+ public void testProcess() throws Exception {
+ pipe.start();
+ pipe.enqueue(url);
+ pipe.finish();
+
+ assertEquals(3,results.size());
+ String s0 = (String) results.get(0);
+ assertEquals("line 1", s0);
+ String s1 = (String) results.get(1);
+ assertEquals("line 2", s1);
+ String s2 = (String) results.get(2);
+ assertEquals("line 3", s2);
+ }
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/InputStreamLineBreakStageTest.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 19, 2005, 9:38 AM
+ *
+ * $Log: RunPredefinedStaticMethodStageTest.java,v $
+ * Revision 1.2 2005/07/25 22:19:17 kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import junit.framework.*;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.stage.AddToCollectionStage;
+import org.apache.commons.pipeline.stage.RunPredefinedStaticMethodStage;
+import org.apache.commons.pipeline.driver.SimpleStageDriver;
+
+
+/**
+ * Test cases for RunPredefinedStaticMethodStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class RunPredefinedStaticMethodStageTest extends TestCase {
+
+ Pipeline pipe;
+ ArrayList list;
+ AddToCollectionStage collectionStage;
+
+
+ public RunPredefinedStaticMethodStageTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ pipe = new Pipeline();
+ list = new ArrayList();
+ collectionStage = new AddToCollectionStage(list, false);
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(RunPredefinedStaticMethodStageTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of newInstance method, of class org.apache.commons.pipeline.stage.RunPredefinedStaticMethodStage.
+ */
+ public void testNewInstance() throws Exception {
+ RunPredefinedStaticMethodStage stage = RunPredefinedStaticMethodStage.newInstance("java.lang.Integer", "valueOf", "java.lang.String");
+ Method method = stage.getMethod();
+ assertSame(Integer.class,method.getDeclaringClass());
+ Class[] params = method.getParameterTypes();
+ assertNotNull(params);
+ assertEquals(1,params.length);
+ assertSame(String.class, params[0]);
+ assertEquals("valueOf",method.getName());
+ }
+
+ /**
+ * Test of process() method, of class org.apache.commons.pipeline.stage.RunPredefinedStaticMethodStage.
+ */
+ public void testProcess() throws Exception {
+ Class integerClass = Integer.class;
+ Method method = integerClass.getMethod("valueOf", String.class);
+ RunPredefinedStaticMethodStage stage = new RunPredefinedStaticMethodStage(method);
+ pipe.addStage(stage, new SimpleStageDriver());
+ pipe.addStage(collectionStage, new SimpleStageDriver());
+ pipe.start();
+ pipe.enqueue("5");
+ pipe.finish();
+
+ assertEquals(1,list.size());
+ Object o = list.get(0);
+ assertNotNull(o);
+ assertTrue(o instanceof Integer);
+ Integer i = (Integer)o;
+ assertEquals(5,i.intValue());
+ }
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/RunPredefinedStaticMethodStageTest.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java?rev=225469&view=auto
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java (added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java Tue Jul 26 23:37:44 2005
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on July 21, 2005, 10:07 AM
+ *
+ * $Log: URLToInputStreamStageTest.java,v $
+ * Revision 1.4 2005/07/25 22:19:17 kjn
+ * Updated licenses, documentation.
+ *
+ */
+
+package org.apache.commons.pipeline.stage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import junit.framework.*;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.pipeline.Pipeline;
+import org.apache.commons.pipeline.Stage;
+
+
+/**
+ * Test cases for URLToInputStreamStaticStage.
+ *
+ * @author Travis Stevens, National Geophysical Data Center, NOAA
+ */
+public class URLToInputStreamStageTest extends TestCase {
+
+ URL url;
+ URLToInputStreamStage stage;
+ Pipeline pipe;
+ List<InputStream> results;
+
+ public URLToInputStreamStageTest(String testName) {
+ super(testName);
+ }
+
+ protected void setUp() throws Exception {
+ url = this.getClass().getClassLoader().getResource("url-input-to-stream-test.txt");
+ assertNotNull(url);
+ results = new ArrayList<InputStream>();
+ Stage finalStage = new AddToCollectionStage<InputStream>(results);
+ stage = new URLToInputStreamStage();
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(stage);
+ stages.add(finalStage);
+ pipe = new Pipeline(stages);
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite(URLToInputStreamStageTest.class);
+
+ return suite;
+ }
+
+ /**
+ * Test of process method, of class org.apache.commons.pipeline.stage.URLToInputStreamStage.
+ */
+ public void testProcess() throws Exception {
+ pipe.start();
+ pipe.enqueue(url);
+ assertEquals(1,results.size());
+ InputStream is = results.get(0);
+ assertNotNull(is);
+ byte[] buffer = new byte[128];
+ int bytes = is.read(buffer);
+ pipe.finish();
+ }
+
+ /**
+ * Test of postprocess method, of class org.apache.commons.pipeline.stage.URLToInputStreamStage.
+ */
+ public void testPostprocess() throws Exception {
+ pipe.start();
+ pipe.enqueue(url);
+ pipe.finish();
+ InputStream is = results.get(0);
+ try {
+ byte[] buffer = new byte[128];
+ int bytes = is.read(buffer);
+ fail("input stream should have been closed, so reading should throw an exception.");
+ } catch (IOException expected){
+
+ }
+
+ }
+
+}
Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/URLToInputStreamStageTest.java
------------------------------------------------------------------------------
svn:keywords = Id
---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org