You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by sk...@apache.org on 2005/07/27 08:37:40 UTC

svn commit: r225468 [1/2] - in /jakarta/commons/sandbox/pipeline/trunk: ./ src/java/org/apache/commons/pipeline/ src/java/org/apache/commons/pipeline/config/ src/java/org/apache/commons/pipeline/driver/ src/java/org/apache/commons/pipeline/stage/ src/t...

Author: skitching
Date: Tue Jul 26 23:36:55 2005
New Revision: 225468

URL: http://svn.apache.org/viewcvs?rev=225468&view=rev
Log:
Refactoring of core package to provide easier StageDriver implementation

Modified:
    jakarta/commons/sandbox/pipeline/trunk/project.properties
    jakarta/commons/sandbox/pipeline/trunk/project.xml
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java
    jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties
    jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml
    jakarta/commons/sandbox/pipeline/trunk/src/test/conf/test_conf.xml
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/config/DigesterPipelineFactoryTest.java

Modified: jakarta/commons/sandbox/pipeline/trunk/project.properties
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/project.properties?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/project.properties (original)
+++ jakarta/commons/sandbox/pipeline/trunk/project.properties Tue Jul 26 23:36:55 2005
@@ -1,3 +1,4 @@
+maven.repo.remote=http://www.apache.org/dist/java-repository
 maven.checkstyle.properties = checkstyle.xml
 
 maven.compile.source=1.5

Modified: jakarta/commons/sandbox/pipeline/trunk/project.xml
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/project.xml?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/project.xml (original)
+++ jakarta/commons/sandbox/pipeline/trunk/project.xml Tue Jul 26 23:36:55 2005
@@ -1,117 +1,103 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <project>
-  <extend>../commons-build/sandbox-project.xml</extend>
-  <pomVersion>3</pomVersion>
-  <name>Commons Pipeline</name>
-  <id>commons-pipeline</id>
-  <logo>/images/pipeline-logo-white.png</logo>
-
-  <currentVersion>0.1.0-dev</currentVersion>
-  <inceptionYear>2004</inceptionYear>
-  <package>org.apache.commons.pipeline</package>
-
-  <shortDescription>A simple pipeline processing framework.</shortDescription>
-  <description>
-    A framework for constructing pipelined processing systems that support
-    concurrent processing of objects in multiple processing stages.
-  </description>
-
-  <url>http://jakarta.apache.org/commons/sandbox/pipeline</url>
-  <siteAddress>people.apache.org</siteAddress>
-  <siteDirectory>/www/jakarta.apache.org/commons/sandbox/pipeline</siteDirectory>
-
-  <distributionDirectory></distributionDirectory>
-
-  <repository>
-    <connection></connection>
-    <url></url>
-
-  </repository>
-
-  <developers>
-    <developer>
-      <name>Kris Nuttycombe</name>
-      <email>kris.nuttycombe@noaa.gov</email>
-      <organization>National Geophysical Data Center</organization>
-    </developer>
-  </developers>
-
-  <dependencies>
-    <dependency>
-      <groupId>commons-beanutils</groupId>
-      <artifactId>commons-beanutils</artifactId>
-      <version>1.7.0</version>
-
-      <url>http://jakarta.apache.org/commons/beanutils</url>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-collections</groupId>
-      <artifactId>commons-collections</artifactId>
-      <version>3.1</version>
-
-      <url>http://jakarta.apache.org/commons/collections</url>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-digester</groupId>
-      <artifactId>commons-digester</artifactId>
-      <version>1.6</version>
-
-      <url>http://jakarta.apache.org/commons/digester</url>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-      <version>1.0.4</version>
-
-      <url>http://jakarta.apache.org/commons/logging</url>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-net</groupId>
-      <artifactId>commons-net</artifactId>
-      <version>1.2.1</version>
-
-      <url>http://jakarta.apache.org/commons/net</url>
-    </dependency>
-
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <version>1.2.8</version>
-
-      <url>http://logging.apache.org</url>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <nagEmailAddress>kris.nuttycombe@noaa.gov</nagEmailAddress>
-    <sourceDirectory>src/java</sourceDirectory>
-    
-    <!-- Unit test cases -->
-
-    <unitTestSourceDirectory>src/test</unitTestSourceDirectory>
-    <unitTest>
-      <includes>
-        <include>**/*Test.java</include>
-      </includes>
-      <resources>
-        <resource>
-          <directory>src/test/resources</directory>
-
-          <includes>
-            <include>**/*.properties</include>
-            <include>**/*.xml</include>
-          </includes>
-        </resource>
-      </resources>
-    </unitTest>    
-  </build>
-
-  
+    <extend>${basedir}/../commons-build/sandbox-project.xml</extend>
+    <pomVersion>3</pomVersion>
+    <name>Commons Pipeline</name>
+    <id>commons-pipeline</id>
+    <logo>/images/pipeline-logo-white.png</logo>
+    <currentVersion>0.1.0-dev</currentVersion>
+    <inceptionYear>2004</inceptionYear>
+    <package>org.apache.commons.pipeline</package>
+    <shortDescription>A simple pipeline processing framework.</shortDescription>
+    <description>A framework for constructing pipelined processing systems that support
+    concurrent processing of objects in multiple processing stages.</description>
+    <url>http://jakarta.apache.org/commons/sandbox/pipeline</url>
+    <siteAddress>people.apache.org</siteAddress>
+    <siteDirectory>/www/jakarta.apache.org/commons/sandbox/pipeline</siteDirectory>
+    <distributionDirectory></distributionDirectory>
+    <repository>
+        <connection></connection>
+        <url></url>
+    </repository>
+    <developers>
+        <developer>
+            <name>Kris Nuttycombe</name>
+            <id>kjn</id>
+            <email>kris.nuttycombe@noaa.gov</email>
+            <organization>National Geophysical Data Center</organization>
+        </developer>
+        <developer>
+            <name>Travis Stevens</name>
+            <id>tns</id>
+            <email>Travis.Stevens@noaa.gov</email>
+            <organization>National Geophysical Data Center</organization>
+        </developer>
+    </developers>
+    <dependencies>
+        <!-- core library deps -->
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <version>1.0.4</version>
+            <type>jar</type>
+        </dependency>
+        <!-- deps for stage & config implementations -->
+        <dependency>
+            <groupId>commons-digester</groupId>
+            <artifactId>commons-digester</artifactId>
+            <version>1.7</version>
+            <type>jar</type>
+            <url>http://jakarta.apache.org/commons/digester</url>
+        </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+            <version>1.4.0</version>
+            <type>jar</type>
+            <url>http://jakarta.apache.org/commons/net</url>
+        </dependency>
+        <!-- test deps -->
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.9</version>
+            <type>jar</type>
+        </dependency>
+        <dependency>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+            <version>1.7.0</version>
+            <type>jar</type>
+            <url>http://jakarta.apache.org/commons/beanutils</url>
+        </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <version>3.1</version>
+            <type>jar</type>
+            <url>http://jakarta.apache.org/commons/collections</url>
+        </dependency>
+    </dependencies>
+    <build>
+        <nagEmailAddress>kris.nuttycombe@noaa.gov</nagEmailAddress>
+        <sourceDirectory>src/java</sourceDirectory>
+        <!-- Unit test cases -->
+        <unitTestSourceDirectory>src/test/java</unitTestSourceDirectory>
+        <unitTest>
+            <includes>
+                <include>**/*Test.java</include>
+            </includes>
+            <resources>
+                <resource>
+                    <directory>src/test/conf</directory>
+                    <includes>
+                        <include>*.properties</include>
+                        <include>*.xml</include>
+                        <include>*.txt</include>
+                    </includes>
+                </resource>
+            </resources>
+        </unitTest>
+    </build>
 </project>
-
-
 

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java Tue Jul 26 23:36:55 2005
@@ -1,7 +1,24 @@
 /*
- * BaseStage.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  *
  * Created on October 12, 2004, 11:31 AM
+ *
+ * $Log: BaseStage.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline;
@@ -10,8 +27,11 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
+ * This is a simple base class for Stages with no-op implementations of the
+ * {@link #preprocess()}, {@link #process()}, {@link #postprocess()},
+ * and {@link #release()} methods.
  *
- * @author  kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public class BaseStage extends Stage {
     
@@ -73,7 +93,4 @@
      */
     public void release() {
     }
-    
-    
-    
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
 /*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -12,17 +12,21 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
+ * $Log: Pipeline.java,v $
+ * Revision 1.7  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline;
 
-import java.lang.Iterable;
-import java.util.*;
-import org.apache.commons.collections.OrderedMap;
-import org.apache.commons.collections.OrderedMapIterator;
-import org.apache.commons.collections.map.ListOrderedMap;
-
-
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.pipeline.driver.SimpleStageDriver;
 
 /**
  * This class represents a processing system consisting of a number of stages
@@ -34,71 +38,84 @@
  * with methods to start and stop processing for all stages, as well as
  * a simple framework for asynchronous event-based communication between stages.
  *
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public final class Pipeline implements Iterable<Stage>, Runnable {
     private List<StageEventListener> listeners = new ArrayList<StageEventListener>();
-
+    
     /**
-     * Ordered map of stages in the pipeline where the keys are stages and the
-     * values are the associated StageDrivers.
+     * List of stages in the pipeline
      */
-    protected OrderedMap stages = new ListOrderedMap();
-    //private OrderedMap<Stage,StageDriver> stages = new ListOrderedMap<Stage,StageDriver>();
+    protected List<Stage> stages = new ArrayList<Stage>();;
     
     /**
      * Map of pipeline branches where the keys are branch names.
      */
     protected Map<String,Pipeline> branches = new HashMap<String,Pipeline>();
     
-    
     /**
      * Creates a new Pipeline
      */
-    public Pipeline() {  }
+    public Pipeline() {
+        stages = new ArrayList<Stage>();
+    }
     
+    /**
+     * Creates a new Pipeline with the List of Stages
+     */
+    public Pipeline(List<Stage> stages){
+        for (Stage stage: stages){
+            this.addStage(stage);
+        }
+    }
     
     /**
-     * Adds a Stage object to the end of this Pipeline. The pipeline will use
-     * the specified StageDriver to run the stage.
-     *
-     * It is critical that all stages added to a pipeline have distinct hash codes
-     * to maintain stage ordering integrity. For this reason, it is
-     * strongly suggested that Stage implementations <i>do not</i> override
-     * the default {@link java.lang.Object#hashCode() hashCode()} implementation
-     * in java.lang.Object.
+     * Adds a {@link Stage} object to the end of this Pipeline. The pipeline will use
+     * the specified {@link StageDriver} to run the stage.
      *
      * @todo throw IllegalStateException if the stage is being used in a different pipeline
      */
     public void addStage(Stage stage, StageDriver driver) {
-        if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
-        if (driver == null) throw new IllegalArgumentException("Argument \"driver\" for call to Pipeline.addStage() may not be null.");
+        if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage(Stage, StageDriver) may not be null.");
+        if (driver == null) throw new IllegalArgumentException("Argument \"driver\" for call to Pipeline.addStage(Stage, StageDriver) may not be null.");
         
-        stage.setPipeline(this);        
-        this.stages.put(stage, driver);
+        stage.setStageDriver(driver);
+        this.addStage(stage);
     }
     
+    /**
+     * Adds a {@link Stage} object to the end of this Pipeline.
+     */
+    public void addStage(Stage stage){
+        if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
+        stage.setPipeline(this);
+        this.stages.add(stage);
+    }
     
     /**
-     * Returns the first stage in the pipeline
+     * Returns the first stage in the pipeline, or null if there are no stages
      */
     public Stage head() {
-        return (Stage) stages.firstKey();
+        if (stages.size() > 0){
+            return (Stage) stages.get(0);
+        } else {
+            return null;
+        }
     }
     
     /**
-     * Returns the stage after the specified stage in the pipeline
+     * Returns the stage after the specified stage in the pipeline.
      */
     public Stage getNextStage(Stage stage) {
-        return (Stage) stages.nextKey(stage);
+        int nextIndex = stages.indexOf(stage) + 1;
+        return (stages.size() > nextIndex) ? stages.get(nextIndex) : null;
     }
     
     /**
      * Returns an Iterator for stages in the pipeline.
      */
     public Iterator<Stage> iterator() {
-        return (Iterator<Stage>) stages.mapIterator();
+        return stages.iterator();
     }
     
     /**
@@ -113,7 +130,6 @@
         this.branches.put(key, pipeline);
     }
     
-    
     /**
      * Runs the pipeline from start to finish.
      */
@@ -121,23 +137,26 @@
         try {
             start();
             finish();
-        }
-        catch (InterruptedException e) {
+        } catch (StageException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
     }
     
-    
     /**
-     * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
-     * for each stage and using that driver to start the stage. Startups
-     * may occur sequentially or in parallel, depending upon the stage driver
-     * used.
-     */
-    public void start() {
-        for (OrderedMapIterator iter = stages.orderedMapIterator(); iter.hasNext();) {
-            Stage stage = (Stage) iter.next();
-            StageDriver driver = (StageDriver) iter.getValue();
+     * This method iterates over the stages in the pipeline, looking up a
+     * {@link StageDriver} for each stage and using that driver to start the stage.
+     * Startups may occur sequentially or in parallel, depending upon the stage driver
+     * used.  If a the stage has not been configured with a {@link StageDriver},
+     * we will use the default, non-threaded {@link SimpleStageDriver}.
+     */
+    public void start() throws StageException {
+        for (Stage stage: this.stages){
+            StageDriver driver = stage.getStageDriver();
+            if (driver == null){
+                driver = new SimpleStageDriver();
+                stage.setStageDriver(driver);
+            }
+            
             driver.start(stage);
         }
         
@@ -158,10 +177,9 @@
      * @throws InterruptedException if a worker thread was interrupted at the time
      * a stage was asked to finish execution.
      */
-    public void finish() throws InterruptedException {
-        for (OrderedMapIterator iter = stages.orderedMapIterator(); iter.hasNext();) {
-            Stage stage = (Stage) iter.next();
-            StageDriver driver = (StageDriver) iter.getValue();
+    public void finish() throws StageException {
+        for (Stage stage: this.stages){
+            StageDriver driver = stage.getStageDriver();
             driver.finish(stage);
         }
         
@@ -170,25 +188,22 @@
         }
     }
     
-    
     /**
      * Enqueues an object on the first stage if the pipeline is not empty
      * @param o the object to enque
      */
     public void enqueue(Object o){
-        if (!stages.isEmpty()) ((Stage) stages.firstKey()).enqueue(o);
+        if (!stages.isEmpty()) stages.get(0).enqueue(o);
     }
     
-    
     /**
      * This method is used by stages to pass data from one stage to the next.
      */
     public void pass(Stage source, Object data) {
-        Stage next = (Stage) this.stages.nextKey(source);
+        Stage next = this.getNextStage(source);
         if (next != null) next.enqueue(data);
     }
     
-    
     /**
      * Simple method that recursively checks whether the specified
      * pipeline is a branch of this pipeline.
@@ -202,7 +217,6 @@
         return false;
     }
     
-    
     /**
      * Adds an EventListener to the pipline that will be notified by calls
      * to {@link Stage#raise(StageEvent)}.
@@ -210,7 +224,6 @@
     public void addEventListener(StageEventListener listener) {
         listeners.add(listener);
     }
-    
     
     /**
      * Sequentially notifies each listener in the list of an event, and propagates

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java Tue Jul 26 23:36:55 2005
@@ -1,14 +1,32 @@
 /*
- * PipelineCreationException.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  *
  * Created on October 4, 2004, 1:35 PM
+ *
+ * $Log: PipelineCreationException.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline;
 
 /**
+ * This is a wrapper exception for use by {@link PipelineFactory}s.
  *
- * @author  kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public class PipelineCreationException extends java.lang.Exception {
     
@@ -16,8 +34,7 @@
      * Creates a new instance of <code>PipelineCreationException</code> without detail message.
      */
     public PipelineCreationException() {
-    }
-    
+    }    
     
     /**
      * Constructs an instance of <code>PipelineCreationException</code> with the specified detail message.
@@ -35,4 +52,11 @@
         super(msg, cause);
     }
     
+    /**
+     * Constructs an instance of <code>PipelineCreationException</code> with the specified detail message.
+     * @param msg the detail message.
+     */
+    public PipelineCreationException(Throwable cause) {
+        super(cause);
+    }    
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java Tue Jul 26 23:36:55 2005
@@ -1,7 +1,24 @@
 /*
- * PipelineFactory.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  *
  * Created on October 4, 2004, 1:22 PM
+ *
+ * $Log: PipelineFactory.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline;
@@ -11,7 +28,7 @@
 /**
  * Simple factory interface for creating pipelines.
  *
- * @author  kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public interface PipelineFactory {
     /** Returns a Pipeline created by the factory. */

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java Tue Jul 26 23:36:55 2005
@@ -1,44 +1,70 @@
 /*
- * Stage.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  *
  * Created on November 18, 2004, 10:34 AM
+ *
+ * $Log: Stage.java,v $
+ * Revision 1.5  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.4  2005/07/22 23:18:35  kjn
+ * Added callback in setStageDriver to set the stage's StageMonitor; documentation & code cleanup
  */
 
 package org.apache.commons.pipeline;
 
-import java.util.*;
+import java.util.Queue;
 
 /**
- * <P>
- * A Stage represents a set of tasks that can be performed on objects
+ * <P>A Stage represents a set of tasks that can be performed on objects
  * in a queue, and methods used to communicate with other stages
- * in a {@link Pipeline}.
- * </P>
- * <P>
- * A Stage must provide a unique {@link StageMonitor} object to allow for
+ * in a {@link Pipeline}.</P>
+ * <P>A Stage must provide a unique {@link StageMonitor} object to allow for
  * proper handling of multiple processing threads to the {@link StageDriver}
  * that runs the stage. Because Stage does not specify the exact behavior of the
  * queue (whether it is capacity-bounded or automatically synchronizes accesses,
- * etc) the monitor is necessary to provide proper synchronization.
- * </P>
- * <P>
- * Stages extending this abstract base class automatically establish a relationship
- * with a pipeline when added to that pipeline.
- * </P>
+ * etc) the monitor is necessary to provide proper synchronization.</P>
+ * <P>Stages extending this abstract base class automatically establish a relationship
+ * with a pipeline when added to that pipeline.</P>
  *
- * @author  Kris Nuttycombe
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public abstract class Stage {
-    private Queue<Object> queue;
+    
+    /**
+     * The {@link Pipeline} in which this stage will run.
+     */
     protected Pipeline pipeline;
+    
+    /**
+     * The {@link StageMonitor} that will monitor stage changes and act
+     * as an intermediary between this stage and its {@link StageDriver}.
+     */
     protected StageMonitor monitor;
     
+    // StageDriver that is used to run this stage
+    private StageDriver stageDriver;
+    
+    // Queue for objects this stage is to process
+    private Queue<Object> queue;
+    
     /**
      * Builds a new stage with the specified queue.
      */
     public Stage(Queue<Object> queue) {
         this.queue = queue;
-        this.monitor =  new StageMonitor();
     }
     
     /**
@@ -52,18 +78,29 @@
     }
     
     /**
-     * Enqueues an object on the wrapped queue. Classes that override this
-     * method must also override {@link #poll()}.
+     * Enqueues an object on the wrapped queue. Classes wishing to override this
+     * method should override {@link #innerEnqueue(Object)} instead.
      */
-    public void enqueue(Object obj) {
+    public final void enqueue(Object obj) {
+        this.innerEnqueue(obj);
+        if (this.monitor != null) this.monitor.enqueueOccurred();
+    }
+    
+    /**
+     * This protected method is designed to be overridden in cases where
+     * additional processing should be performed when an object is enqueued.
+     * Classes that override this method must also override {@link #poll()} if
+     * the underlying queue supporting the stage is changed.
+     */
+    protected void innerEnqueue(Object obj) {
         queue.add(obj);
-        this.monitor.enqueueOccurred();
     }
     
     /**
      * Retrieves an object from the head of the wrapped queue, or null
      * if the queue is empty. Classes that override this method must also
-     * override {@link #enqueue(Object)}
+     * override {@link #innerEnqueue(Object)} if the underlying queue supporting the
+     * stage is changed.
      */
     public Object poll() {
         synchronized (queue) {
@@ -72,6 +109,15 @@
     }
     
     /**
+     * Setter for property stageDriver.
+     * @param stageDriver New value of property stageDriver.
+     */
+    protected final void setStageDriver(StageDriver stageDriver) {
+        this.stageDriver = stageDriver;
+        this.monitor = stageDriver.createStageMonitor(this);
+    }
+    
+    /**
      * Enqueues the specified object onto the next stage in the pipeline
      * if such a stage exists.
      */
@@ -84,9 +130,9 @@
      * branch corresponding to the specified key, if such a brach exists.
      */
     public final void exqueue(String key, Object obj) {
-        Pipeline branch = (Pipeline) this.pipeline.branches.get(key);
+        Pipeline branch = this.pipeline.branches.get(key);
         if (branch != null && !branch.stages.isEmpty()) {
-            ((Stage) branch.stages.firstKey()).enqueue(obj);
+            branch.head().enqueue(obj);
         }
     }
     
@@ -102,26 +148,37 @@
      * Getter for wrapped queue.
      * @return Value of property queue.
      */
-    public Queue getQueue() {
+    public final Queue getQueue() {
         return this.queue;
     }
     
     /**
-     * Setter for wrapped queue.
-     * @param queue New value of property queue.
+     * Getter for property stageDriver.
+     * @return Value of property stageDriver.
      */
-    public void setQueue(Queue<Object> queue) {
-        this.queue = queue;
+    public final StageDriver getStageDriver() {
+        return this.stageDriver;
     }
     
     /**
      * Returns the monitor for this stage.
      */
-    public StageMonitor getMonitor() {
+    public final StageMonitor getMonitor() {
         return this.monitor;
     }
     
     /**
+     * Stages may not further override hashCode(). This is necessary to maintain stage
+     * ordering integrity within the pipeline.
+     */
+    public final int hashCode() {
+        int retValue;
+        
+        retValue = super.hashCode();
+        return retValue;
+    }
+    
+    /**
      * Implementations of this method should perform any necessary setup that
      * needs to be done before any data is processed from this Stage's queue.
      *
@@ -159,14 +216,4 @@
      */
     public abstract void release();
     
-    /**
-     * Stages may not further override hashCode(). This is necessary to maintain stage
-     * ordering integrity within the pipeline.
-     */
-    public final int hashCode() {
-        int retValue;
-        
-        retValue = super.hashCode();
-        return retValue;
-    }
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java Tue Jul 26 23:36:55 2005
@@ -1,32 +1,90 @@
 /*
- * StageRunner.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  *
  * Created on October 6, 2004, 4:30 PM
+ *
+ * $Log: StageDriver.java,v $
+ * Revision 1.5  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.4  2005/07/22 23:20:27  kjn
+ * Changed from interface to abstract base class to allow better use of callbacks
+ * and consolidation of monitor/threading model code.
  */
 
 package org.apache.commons.pipeline;
 
-import org.apache.commons.pipeline.Stage;
-
+import java.util.List;
 
 /**
+ * This interface is used to define how processing for a stage is started,
+ * stopped, and run. StageDriver implementations may run stages in one or
+ * more threads, and use the {@link StageMonitor} interface to provide communication
+ * between the stage, the driver, and the enclosing pipeline.
  *
- * @author  kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
-public interface StageDriver {
+public abstract class StageDriver {
+    
     /**
      * Creates and starts new worker thread(s) to process items in the queue.
-     * Implementations of this method must call {@link StageMonitor#driverStarting()}
-     * on the specified stage's monitor.
      */
-    public void start(Stage stage) throws IllegalThreadStateException;
+    public final void start(Stage stage) throws StageException {
+        synchronized (stage) {
+            if (stage.monitor == null || stage.monitor.getState() == StageMonitor.State.STOPPED) {
+                if (stage.monitor == null) stage.setStageDriver(this); // this will set the monitor on the stage using a callback.
+                stage.monitor.startRequested();
+                this.startInternal(stage);
+            }
+        }
+    }    
     
+    /**
+     * Implementations of this method must guarantee that the
+     * {@link StageMonitor#driverStarted()} method will be called on the
+     * specified stage's {@link Stage#getStageMonitor() monitor} when
+     * preprocessing is complete.
+     */
+    protected abstract void startInternal(Stage stage) throws StageException;
     
     /**
-     * This method waits for the queue to empty and any processor thread(s) to exit
+     * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to exit
      * cleanly and then calls release() to release any resources acquired during processing, if possible.
-     * Implementations of this method must call {@link StageMonitor#driverStopped()}
-     * on the specified stage's monitor upon completion.
      */
-    public void finish(Stage stage) throws InterruptedException;    
+    public final void finish(Stage stage) throws StageException {
+        synchronized (stage) {
+            if (stage.monitor != null && stage.monitor.getState() != StageMonitor.State.STOPPED) {
+                stage.monitor.stopRequested();
+                this.finishInternal(stage);
+            }
+        }
+    }
+    
+    /**
+     * Implementations of this method must guarantee that the 
+     * {@link StageMonitor#driverStopped()} method will be called on the
+     * specified stage's {@link Stage#getStageMonitor() monitor} when
+     * postprocessing is complete and all stage resources have been released.
+     */
+    protected abstract void finishInternal(Stage stage) throws StageException;
+    
+    /**
+     * This factory method must return a new instance of a {@link StageMonitor}
+     * that can be used to monitor processing for the specified stage
+     * in conjunction with this driver.
+     */
+    protected abstract StageMonitor createStageMonitor(Stage stage);
+    
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
 /*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -12,6 +12,11 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License. 
+ *
+ * $Log: StageEventListener.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline;
@@ -22,8 +27,7 @@
 /**
  * Listener interface for {@link StageEvent}s
  *
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public interface StageEventListener extends EventListener {
     

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
 /*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -14,6 +14,11 @@
  * limitations under the License. 
  *
  * Created on December 9, 2003, 4:24 PM
+ *
+ * $Log: StageException.java,v $
+ * Revision 1.3  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline;
@@ -21,10 +26,11 @@
 /**
  * Exception wrapper class for exceptions that occur while processing a stage.
  *  
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
-public class StageException extends java.lang.RuntimeException {
+public class StageException extends java.lang.Exception {
+    //Stage within which the error occurred
+    private Stage source;
     
     /**
      * Creates a new instance of <code>StageException</code> without detail message.
@@ -41,6 +47,15 @@
         super(msg);
     }
     
+    /**
+     * Constructs an instance of <code>StageException</code> with the specified cause.
+     * @param msg the detail message.
+     */
+    public StageException(Throwable thr) {
+        super(thr);
+    }
+    
+    
     
     /**
      * Constructs an instance of <code>StageException</code> with the specified detail message and cause
@@ -49,5 +64,42 @@
      */
     public StageException(String msg, Throwable cause) {
         super(msg, cause);
+    }
+    
+    
+    /**
+     * Creates a new instance of <code>StageException</code> without detail message.
+     */
+    public StageException(Stage source) {
+        this.source = source;
+    }
+    
+    
+    /**
+     * Constructs an instance of <code>StageException</code> with the specified detail message.
+     * @param msg the detail message.
+     */
+    public StageException(Stage source, String msg) {
+        super(msg);
+        this.source = source;
+    }
+    
+    
+    /**
+     * Constructs an instance of <code>StageException</code> with the specified detail message and cause
+     * @param msg the detail message.
+     * @param cause Throwable that caused this exception.
+     */
+    public StageException(Stage source, String msg, Throwable cause) {
+        super(msg, cause);
+        this.source = source;
+    }    
+    
+    
+    /**
+     * Returns a reference to the Stage object where the exception occurred.
+     */
+    public Stage getSource() {
+        return this.source;
     }
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
 /*
- *   Copyright 2004 The Apache Software Foundation
+ *   Copyright 2005 The Apache Software Foundation
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -13,79 +13,76 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  *
+ * $Log: StageMonitor.java,v $
+ * Revision 1.4  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.3  2005/07/21 17:02:43  kjn
+ * Corrected dependencies
+ *
+ * Revision 1.2  2005/07/19 21:07:13  kjn
+ * Changed from class to interface.
  */
 
 package org.apache.commons.pipeline;
 
-
-import java.util.*;
+import java.util.List;
 
 /**
  * A monitor used to control concurrent processing of data in a stage.
  *
- * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
- * @version $Rev$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
-public class StageMonitor {
-    /** Enumeration of possible states for the stage. */
-    public enum Status { STARTING, RUNNING, STOP_REQUESTED, STOPPED }
-    private Status status = Status.STOPPED;
-    private List<Throwable> errors = new ArrayList<Throwable>();
+public interface StageMonitor {
+    public enum State { STARTING, RUNNING, STOP_REQUESTED, STOPPED, ERROR }
     
     /**
      * StageDriver has been requested to start stage processing.
-     * Implementations of this method should change the monitor's status to
-     * {@link Status#STARTING}.
+     * Implementations of this method should change the driver's state to
+     * {@link State#STARTING}.
      */
-    public synchronized void startRequested() {
-        if (this.status == Status.STOPPED) this.status = Status.STARTING;
-    }
+    public void startRequested();
     
     /**
      * StageDriver has started execution.
-     * Implementations of this method should change the monitor's status to
-     * {@link Status#RUNNING}.
+     * Implementations of this method should change the driver's state to
+     * {@link State#RUNNING}.
      */
-    public synchronized void driverStarted() {
-        if (this.status == Status.STOPPED || this.status == Status.STARTING) this.status = Status.RUNNING;
-    }
+    public void driverStarted();
     
     /**
      * StageDriver has been requested to halt stage processing.
-     * Implementations of this method should change the monitor's status to
-     * {@link Status#STOPPING}.
+     * Implementations of this method should change the driver's state to
+     * {@link State#STOPPING}.
      */
-    public synchronized void stopRequested() {
-        this.status = Status.STOP_REQUESTED;
-        this.notifyAll();
-    }
+    public void stopRequested();
     
     /**
      * StageDriver has finished execution.
-     * Implementations of this method should change the monitor's status to
-     * {@link Status#STOPPED}.
+     * Implementations of this method should change the driver's state to
+     * {@link State#STOPPED}.
      */
-    public synchronized void driverStopped() {
-        this.status = Status.STOPPED;
-    }
+    public void driverStopped();
     
     /**
-     * Monitor for successful enqueue operations on the stage. Implementations
-     * overriding this method must call {@link Object#notifyAll() this.notifyAll()} to
-     * ensure that any threads waiting on this monitor are notified.
+     * Notify the driver of successful enqueue operations on any stage managed
+     * by the driver. 
      */
-    public synchronized void enqueueOccurred() {
-        this.notifyAll();
-    }
+    public void enqueueOccurred();
     
     /**
      * Monitors driver thread interruption failures.
      *
      * @param fault the faulting exception
      */
-    public void driverFailed( InterruptedException fault ) {
-        this.errors.add(fault);
-    }
+    public void driverFailed( InterruptedException fault );
+    
+    /**
+     * Monitors preprocessing failures.
+     *
+     * @param fault the faulting exception
+     */
+    public void preprocessFailed(Throwable fault);
     
     /**
      * Monitors handler failures.
@@ -93,22 +90,23 @@
      * @param data the data that was being processed as the fault occurred
      * @param fault the faulting exception
      */
-    public void processingFailed( Object data, Throwable fault ) {
-        this.errors.add(fault);
-    }
+    public void processingFailed( Object data, Throwable fault);
+        
+    /**
+     * Monitors preprocessing failures.
+     *
+     * @param fault the faulting exception
+     */
+    public void postprocessFailed(Throwable fault);
     
     /**
-     * Returns the current status of stage processing.
+     * Returns the current state of stage processing.
      */
-    public synchronized Status status() {
-        return this.status;
-    }
+    public State getState();
     
     /**
-     * Returns a list of errors recorded by this monitor
+     * Returns a list of errors recorded by this StageDriver
      */
-    public List<Throwable> getErrors() {
-        return errors;
-    }
+    public List<Throwable> getErrors();
 }
 

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
 /*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -14,6 +14,11 @@
  * limitations under the License.
  *
  * Created on February 12, 2004, 3:42 PM
+ *
+ * $Log: DigesterPipelineFactory.java,v $
+ * Revision 1.2  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline.config;
@@ -32,8 +37,7 @@
 /**
  * This factory is designed to simplify creating a pipeline using Digester.
  *
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public class DigesterPipelineFactory implements org.apache.commons.pipeline.PipelineFactory {
     

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
 /*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -14,6 +14,11 @@
  * limitations under the License. 
  *
  * Created on February 12, 2004, 1:48 PM
+ *
+ * $Log: PipelineRuleSet.java,v $
+ * Revision 1.3  2005/07/25 22:04:54  kjn
+ * Corrected Apache licensing, documentation.
+ *
  */
 
 package org.apache.commons.pipeline.config;
@@ -23,8 +28,6 @@
 import org.apache.commons.pipeline.*;
 import org.xml.sax.Attributes;
 
-
-
 /**
  * <P>This is a Digester RuleSet that provides rules for parsing a process pipeline
  * XML file.</P>
@@ -42,13 +45,7 @@
  *  created in this manner are added to the pipeline in the order that they
  *  occur in the configuration file. The class of the stage is specified by the
  *  <i>className</i> attribute; all other attributes are used by Digester to set bean
- *  properties on the newly created Stage object. At present, Stages configured using
- *  this tag must provide a one-argument constructor that takes a StageQueue instance. 
- *  By default, the stage will be constructed with a 
- * {@link org.apache.commons.pipeline.impl.SingleThreadStageQueue SingleThreadStageQueue}
- *  instance if the queueClass attribute is not set; otherwise the stage will be
- *  constructed with a new instance of the specified class, which should provide
- *  a no-arguments constructor.</li>
+ *  properties on the newly created Stage object. </li>
  *
  *  <li>&lt;enqueue/&gt; - Enqueue an object onto the first stage in the pipeline.</li>
  *  <li>&lt;branch/%gt; - Add a branch to a pipeline. The contents of this tag should
@@ -60,10 +57,7 @@
  *  attribute.
  * </ul>
  *
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
- * @todo Add support for more complicated StageQueue construction and configuration as part of the Stage 
- * tag processing.
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
  */
 public class PipelineRuleSet extends RuleSetBase {
     private static Class[] addBranchTypes = { String.class, Pipeline.class };
@@ -103,7 +97,7 @@
         digester.addCallParam("*/pipeline/stage", 0, true);
         
         //this rule is used to create a stage driver for a specific stage
-        digester.addObjectCreate("*/pipeline/stage/stageDriver", "org.apache.commons.pipeline.impl.SingleThreadStageDriver", "className");
+        digester.addObjectCreate("*/pipeline/stage/stageDriver", "org.apache.commons.pipeline.driver.SingleThreadStageDriver", "className");
         digester.addSetProperties("*/pipeline/stage/stageDriver");
         digester.addCallParam("*/pipeline/stage/stageDriver", 1, true);
         

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java Tue Jul 26 23:36:55 2005
@@ -42,69 +42,64 @@
     
     //map of stages to worker threads
     private Map<Stage, Thread> threadMap = new HashMap<Stage,Thread>();
-        
+    
     /**
-     * Default constructor
+     * Default constructor. This sets the thread wait timeout and fault tolerance
+     * values to 500 ms and false, respectively.
      */
     public SingleThreadStageDriver() {
+        this(500, false);
     }
     
     /**
-     *
+     * Creates a new SingleThreadStageDriver with the specified thread wait
+     * timeout and fault tolerance values.
+     * 
+     * @param timeout The amount of time, in milliseconds, that the worker thread
+     * will wait before checking the processing state if no objects are available
+     * in the thread's queue.
+     * @param faultTolerant Flag determining the behavior of the driver when
+     * an error is encountered in execution of {@link Stage#process(Object)}. 
+     * If this is set to false, any exception thrown during {@link Stage#process(Object)}
+     * will cause the worker thread to halt without executing {@link Stage#postprocess()}
+     * ({@link Stage#release()} will be called.)
      */
     public SingleThreadStageDriver(long timeout, boolean faultTolerant) {
         this.timeout = timeout;
         this.faultTolerant = faultTolerant;
     }
-        
+    
     /**
      * Creates and starts a new worker thread to process items in the stage's queue.
      */
-    public final void start(Stage stage) throws IllegalThreadStateException {
-        StageMonitor monitor = stage.getMonitor();
-        synchronized (monitor) {
-            if (monitor.status() != StageMonitor.Status.STOPPED) {
-                throw new IllegalThreadStateException("Processor thread has already been started.");
-            }
-        }
-        
-        monitor.startRequested();
-        
-        log.debug("Starting worker thread.");
+    protected void startInternal(Stage stage) throws StageException {
+        log.debug("Starting worker thread for stage " + stage + ".");
         Thread workerThread = new WorkerThread(stage);
         workerThread.start();
         this.threadMap.put(stage, workerThread);
-        log.debug("Worker thread started.");
+        log.debug("Worker thread for stage " + stage + " started.");
     }
     
-    
     /**
      * This method waits for the queue to empty and the processor thread to exit
      * cleanly and release any resources acquired during processing, if possible.
      */
-    public void finish(Stage stage) throws InterruptedException {
-        StageMonitor monitor = stage.getMonitor();
-        
-        log.debug("Requesting worker thread stop.");
-        monitor.stopRequested();
-        
-        synchronized (monitor) {
-            if (monitor.status() == StageMonitor.Status.STOPPED) return;
+    protected void finishInternal(Stage stage) throws StageException {
+        log.debug("Waiting for worker thread stop for stage " + stage + ".");
+        try {
+            this.threadMap.remove(stage).join();
+        } catch (InterruptedException e){
+            throw new StageException(e);
         }
-        
-        log.debug("Waiting for worker thread stop.");
-        this.threadMap.remove(stage).join();
-        log.debug("Worker thread has finished.");
-        
-        monitor.driverStopped();
+        log.debug("Worker thread for stage " + stage + " has finished.");
+        stage.getMonitor().driverStopped();
     }
     
-    
     /**
      * Sets the failure tolerance flag for the worker thread. If faultTolerant
      * is set to true, {@link StageException StageException}s thrown by
-     * the process() method will not interrupt queue processing, but will simply
-     * be logged with a severity of ERROR.
+     * the {@link Stage#process(Object)} method will not interrupt queue 
+     * processing, but will simply be logged with a severity of ERROR.
      */
     public final void setFaultTolerant(boolean faultTolerant) {
         this.faultTolerant = faultTolerant;
@@ -118,6 +113,72 @@
         return this.faultTolerant;
     }
     
+    /**
+     * Creates a new StageMonitor for the specified stage.
+     */
+    protected StageMonitor createStageMonitor(Stage stage) {
+        return new AbstractStageMonitor(stage) {
+            /**
+             * StageDriver has been requested to start stage processing.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#STARTING}.
+             */
+            public synchronized void startRequested() {
+                if (this.state == State.STOPPED) this.state = State.STARTING;
+            }
+            
+            /**
+             * StageDriver has started execution.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#RUNNING}.
+             */
+            public synchronized void driverStarted() {
+                if (this.state == State.STOPPED || this.state == State.STARTING) this.state = State.RUNNING;
+            }
+            
+            /**
+             * StageDriver has been requested to halt stage processing.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#STOPPING}.
+             */
+            public synchronized void stopRequested() {
+                this.state = State.STOP_REQUESTED;
+                this.notifyAll();
+            }
+            
+            /**
+             * StageDriver has finished execution.
+             * Implementations of this method should change the monitor's state to
+             * {@link State#STOPPED}.
+             */
+            public synchronized void driverStopped() {
+                this.state = State.STOPPED;
+            }
+            
+            /**
+             * Monitor for successful enqueue operations on the stage. Implementations
+             * overriding this method must call {@link Object#notifyAll() this.notifyAll()} to
+             * ensure that any threads waiting on this monitor are notified.
+             */
+            public synchronized void enqueueOccurred() {
+                this.notifyAll();
+            }
+            
+            /**
+             * Returns the current state of stage processing.
+             */
+            public synchronized State getState() {
+                return this.state;
+            }
+            
+            /**
+             * Returns a list of errors recorded by this monitor
+             */
+            public List<Throwable> getErrors() {
+                return errors;
+            }
+        };
+    }
     
     /**
      * This worker thread removes and processes data objects from the incoming
@@ -133,7 +194,7 @@
      */
     private class WorkerThread extends Thread {
         /** The Stage this thread will work on */
-        private Stage stage;               
+        private Stage stage;
         
         public WorkerThread(Stage stage) {
             this.stage = stage;
@@ -142,8 +203,13 @@
         public final void run() {
             StageMonitor monitor = stage.getMonitor();
             try {
-                log.debug("preprocessing...");
-                stage.preprocess();
+                log.debug("preprocessing stage " + stage + "...");
+                try {
+                    stage.preprocess();
+                } catch (Exception t) {
+                    monitor.preprocessFailed(t);
+                    return;
+                }
                 
                 monitor.driverStarted();
                 
@@ -153,33 +219,36 @@
                         obj = stage.poll();
                         if (obj == null) {
                             synchronized (monitor) {
-                                log.debug("Monitor status is: " + monitor.status());
-                                if (monitor.status() == StageMonitor.Status.STOP_REQUESTED) break running;
+                                //log.debug("Monitor getState is: " + monitor.getState());
+                                if (monitor.getState() == StageMonitor.State.STOP_REQUESTED
+                                        || monitor.getState() == StageMonitor.State.ERROR) break running;
+                                
                                 monitor.wait(timeout);
                                 continue running;
                             }
-                        }
-                        else {
+                        } else {
                             stage.process(obj);
                         }
-                    }
-                    catch (InterruptedException e) {
+                    } catch (InterruptedException e) {
                         monitor.driverFailed(e);
-                        throw new RuntimeException("Driver thread unexpectedly interrupted.", e);
-                    }
-                    catch (Throwable t) {
+                        throw new RuntimeException("Driver thread unexpectedly interrupted for stage " + stage + ".", e);
+                    } catch (Exception t) {
                         monitor.processingFailed(obj, t);
                         if (!faultTolerant) {
                             log.error("Aborting due to error.", t);
-                            throw new RuntimeException("Stage processing failed, check monitor for details.", t);
+                            throw new RuntimeException("Stage processing for stage " + stage + " failed, check monitor for details.", t);
                         }
                     }
                 }
                 
-                log.debug("postprocessing...");
-                stage.postprocess();
-            }
-            finally {
+                log.debug("postprocessing stage " + stage + "...");
+                try {
+                    stage.postprocess();
+                } catch (Exception t) {
+                    monitor.postprocessFailed(t);
+                    return;
+                }
+            } finally {
                 stage.release();
             }
         }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java Tue Jul 26 23:36:55 2005
@@ -52,20 +52,22 @@
     /** Holds value of property password. */
     private String password;
     
-    
+    /** Holds value of property port.     */
+    private int port;
+
     /**
      * Default constructor - creates work directory in /tmp
      */
     public FtpFileDownloadStage() {
     }
-   
+    
     /**
      * Constructor specifying work directory.
      */
     public FtpFileDownloadStage(String workDir) {
         this.workDir = workDir;
     }
-   
+    
     /**
      * Default constructor - creates work directory in /tmp
      */
@@ -92,7 +94,7 @@
         
         try {
             //connect to the ftp site
-            client.connect(host);
+            client.connect(host, port);
             log.debug(client.getReplyString());
             if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
                 throw new IOException("FTP server at host " + host + " refused connection.");
@@ -103,21 +105,15 @@
             if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
                 throw new IOException("FTP login failed for user " + user + ": " + client.getReplyString());
             }
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             throw new StageException(e.getMessage(), e);
         }
     }
     
     /**
-     * Removes a java.net.URL (an HTTP URL) from the input queue, follows any redirects
-     * specified by that URL, and then retrieves the data to a file over an HTTP
-     * connection. The file name for download is the
-     * last element of the URL path for download appended with a timestamp
-     * value, and it is stored in the directory specified by {@link #setWorkDir(String) setWorkDir()}, or to
-     * /tmp if no work directory is set.
+     * Retrieves files that match the specified FileSpec from the FTP server
+     * and stores them in the work directory.
      *
-     * @param obj The URL from which to download data.
      * @throws ClassCastException if the parameter obj is not an instance of java.net.URL
      */
     public void process(Object obj) throws StageException {
@@ -126,65 +122,82 @@
         FileSpec spec = (FileSpec) obj;
         
         try {
+            client.setFileType(spec.type.intValue());
             client.changeWorkingDirectory(spec.path);
-            log.debug(client.getReplyString());
             if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
                 throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString());
             }
             
-            log.debug("FTP connection successfully established to " + host + spec.path);
+            log.debug("FTP connection successfully established to " + host + ":" + spec.path);
             
             //get the list of files
             client.enterLocalPassiveMode();
-            String[] dirs = client.listNames();
-            if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
-                throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
-            }
-            //client.enterLocalActiveMode();
-                       
-            log.debug("FTP file list successfully obtained.");
-            
-            Pattern pattern = Pattern.compile(spec.pattern);
-            
-            log.debug("File pattern is " + spec.pattern);
+            searchCurrentDirectory("", spec);
+        } catch (IOException e) {
+            throw new StageException(e.getMessage(), e);
+        }
+    }
+    
+    
+    /**
+     * Search the current working directory of the FTP client, saving files
+     * to the path specified by workDir + the path to the file on the FTP server.
+     * This method will optionally recursively search directories on the remote server.
+     */
+    private void searchCurrentDirectory(String path, FileSpec spec) throws IOException {
+        FTPFile[] files = client.listFiles();
+        if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
+            throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
+        }
+        
+        search: for (FTPFile file : files) {
+            String localPath = path + File.separatorChar + file.getName();
             
-            //create the list of netcdf track files to get
-            for (int i = 0; i < dirs.length; i++){
-                log.debug("Obtaining files in directory " + dirs[i]);
-                String[] files = client.listNames(dirs[i]);
+            if (file.isDirectory() && spec.recursive) {
+                log.debug("Recursing into directory " + file.getName());
+                client.changeWorkingDirectory(file.getName());
+                searchCurrentDirectory(localPath, spec);
+                client.changeToParentDirectory();
+            } else {
+                log.debug("Examining file " + localPath);
+                for (Criterion crit : spec.criteria) {
+                    if (!crit.matches(file)) {
+                        log.info("File " + localPath + " failed criterion check " + crit);
+                        continue search;
+                    }
+                }
                 
-                for (int j = 0; j < files.length; j++) {
-                    if (pattern.matcher(files[j]).matches()) {
-                        log.debug("Matched file name " + files[j] + " against pattern " + spec.pattern);
-                        File f = new File(workDir + File.separatorChar + files[j]);
-                        if (! f.getParentFile().exists()) f.getParentFile().mkdir();
-                        
-                        OutputStream out = new FileOutputStream(f);
-                        client.retrieveFile(files[j], out);
-                        this.exqueue(f);
+                File localFile = new File(workDir + File.separatorChar + localPath);
+                if (localFile.exists() && !spec.overwrite) {
+                    //if (spec.)
+                } else {
+                    if (! localFile.getParentFile().exists()) localFile.getParentFile().mkdir();
+                    
+                    OutputStream out = new FileOutputStream(localFile);
+                    try {
+                        client.retrieveFile(file.getName(), out);
+                    } finally {
+                        out.flush();
+                        out.close();
                     }
                 }
+                
+                this.exqueue(localFile);
             }
         }
-        catch (IOException e) {
-            throw new StageException(e.getMessage(), e);
-        }
     }
     
-    
     /**
      * Disconnects from FTP server. Errors are logged.
      */
     public void release() {
         try {
             client.disconnect(); //close ftp connection
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             log.error(e.getMessage(), e);
         }
     }
     
-    
     /**
      * Sets the working directory for the file download. If the directory does
      * not already exist, it will be created during the preprocess() step.
@@ -240,18 +253,57 @@
         this.password = password;
     }
     
+    /**
+     * Getter for property port.
+     * @return Value of property port.
+     */
+    public int getPort() {
+        return this.port;
+    }
+
+    /**
+     * Setter for property port.
+     * @param port New value of property port.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
     
     /**
      * This class is used to specify a path and pattern of file for the FtpFileDownload
      * to retrieve.
      */
     public static class FileSpec {
+        //enumeration of legal file types
+        public enum FileType {
+            ASCII(FTPClient.ASCII_FILE_TYPE),
+                    BINARY(FTPClient.BINARY_FILE_TYPE);
+            
+            private int type;
+            
+            private FileType(int type) {
+                this.type = type;
+            }
+            
+            public int intValue() {
+                return this.type;
+            }
+        }
         
         /** Holds value of property path. */
         private String path = "/";
         
-        /** Holds value of property pattern. */
-        private String pattern = ".*";
+        /** Holds flag that determines whether or not to perform recursive search of the specified path */
+        private boolean recursive;
+        
+        // Holds flag that determines whether or not to overwrite local files
+        private boolean overwrite;
+        
+        // Type of file (ascii or binary)
+        private FileType type = FileType.BINARY;
+        
+        // List of criteria that the retrieved file must satisfy.
+        private Set<Criterion> criteria = new HashSet<Criterion>();
         
         /** Getter for property path.
          * @return Value of property path.
@@ -271,10 +323,10 @@
         
         /** Getter for property pattern.
          * @return Value of property pattern.
-         *
+         * @deprecated - not retrievable from criterion
          */
         public String getPattern() {
-            return this.pattern;
+            return null;
         }
         
         /** Setter for property pattern.
@@ -282,7 +334,106 @@
          *
          */
         public void setPattern(String pattern) {
-            this.pattern = pattern;
+            this.criteria.add(new FileNameMatchCriterion(pattern));
+        }
+        
+        /**
+         * Add a criterion to the set of criteria that must be matched for files
+         * to be downloaded
+         */
+        public void addCriterion(Criterion crit) {
+            this.criteria.add(crit);
+        }
+        
+        /**
+         * Sets the flag determining whether or not the stage will recursively
+         * traverse the directory tree to find files.
+         */
+        public void setRecursive(boolean recursive) {
+            this.recursive = recursive;
+        }
+        
+        /**
+         * Returns whether or not the stage will recursively
+         * traverse the directory tree to find files.
+         */
+        public boolean isRecursive() {
+            return this.recursive;
+        }
+        
+        /**
+         * Sets the file type for the transfer. Legal values are "ascii" and "binary".
+         * Binary transfers are the default.
+         */
+        public void setFileType(String fileType) {
+            if ("ascii".equalsIgnoreCase(fileType)) {
+                this.type = FileType.ASCII;
+            } else {
+                this.type = FileType.BINARY;
+            }
+        }
+        
+        /**
+         * Returns the file type for the transfer.
+         */
+        public String getFileType() {
+            return this.type.toString();
+        }
+    }
+    
+    /**
+     * This class is used to specify a criterion that the downloaded file
+     * must satisfy.
+     */
+    public interface Criterion {
+        public boolean matches(FTPFile file);
+    }
+    
+    /**
+     * Matches file names based upon the Java regex supplied in the constructor.
+     */
+    public static class FileNameMatchCriterion implements Criterion {
+        // precompiled pattern used to match filenames
+        private Pattern pattern;
+        private String _pattern;
+        
+        public FileNameMatchCriterion(String pattern) {
+            this._pattern = pattern;
+            this.pattern = Pattern.compile(pattern);
+        }
+        
+        public boolean matches(FTPFile file) {
+            return pattern.matcher(file.getName()).matches();
+        }
+        
+        public String toString() {
+            return "filename matches pattern " + _pattern;
+        }
+    }
+    
+    /**
+     * Matches files based upon a set of date constraints
+     */
+    public static class FileDateMatchCriterion implements Criterion {
+        private Date startDate;
+        private Date endDate;
+        
+        public FileDateMatchCriterion(Date startDate, Date endDate) {
+            this.startDate = startDate;
+            this.endDate = endDate;
+        }
+        
+        public boolean matches(FTPFile file) {
+            Calendar cal = file.getTimestamp();
+            if ((startDate != null && cal.getTime().before(startDate)) || (endDate != null && cal.getTime().after(endDate))) {
+                return false;
+            } else {
+                return true;
+            }
+        }
+        
+        public String toString() {
+            return "file date is between " + startDate + " and " + endDate;
         }
     }
 }

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java Tue Jul 26 23:36:55 2005
@@ -60,6 +60,7 @@
     /**
      * Creates a new instance of HttpFileDownload with the specified work directory
      * into which to download files.
+     * @deprecated Let File.createTempFile take care of the working directory issue.
      */
     public HttpFileDownloadStage(Queue<Object> queue, String workDir) {
         super(queue);
@@ -67,15 +68,6 @@
     }
     
     /**
-     * Creates the directory {@link #setWorkDir(String) workDir} if it does
-     * not exist.
-     */
-    public void preprocess() throws StageException {
-        if (fworkDir == null) fworkDir = new File(workDir);
-        if (!this.fworkDir.exists()) fworkDir.mkdirs();
-    }
-    
-    /**
      * Removes a java.net.URL (an HTTP URL) from the input queue, follows any redirects
      * specified by that URL, and then retrieves the data to a file over an HTTP
      * connection. The file name for download is the
@@ -87,14 +79,14 @@
      * @throws ClassCastException if the parameter obj is not an instance of java.net.URL
      */
     public void process(Object obj) throws StageException {
-        if (!this.fworkDir.exists()) throw new StageException("The work directory for file download " + workDir.toString() + " does not exist.");
         Map params = new HashMap();
         
         URL url;
         try {
             if (obj instanceof String) {
+                /*
                 String loc = (String) obj;
-                /*int paramIndex = loc.indexOf('?');
+                int paramIndex = loc.indexOf('?');
                 if (paramIndex > 0) {
                     url = new URL(loc.substring(0, paramIndex));
                     for (StringTokenizer st = new StringTokenizer(loc.substring(paramIndex + 1), "&"); st.hasMoreTokens();) {
@@ -108,18 +100,16 @@
                         }
                     }
                 }
-                else {*/
-                    url = new URL((String) obj);
+                else {
+                 */
+                url = new URL((String) obj);
                 //}
-            }
-            else if (obj instanceof URL) {
+            } else if (obj instanceof URL) {
                 url = (URL) obj;
-            }
-            else {
+            } else {
                 throw new IllegalArgumentException("Unrecognized parameter class to process() for HttpFileDownload: " + obj.getClass().getName() + "; must be URL or String");
             }
-        }
-        catch (MalformedURLException e) {
+        } catch (MalformedURLException e) {
             throw new StageException("Malformed URL: " + obj.toString(), e);
         }
         
@@ -137,24 +127,22 @@
         java.net.HttpURLConnection con = null;
         try {
             con = (java.net.HttpURLConnection) url.openConnection();
-            /*if (!params.isEmpty()) {
+            /*
+            if (!params.isEmpty()) {
                 con.setRequestMethod("GET");
                 for (Iterator iter = params.entrySet().iterator(); iter.hasNext();) {
                     Map.Entry entry = (Map.Entry) iter.next();
                     con.setRequestProperty((String) entry.getKey(), (String) entry.getValue());
                 }
-            }*/
-        }
-        catch (IOException e) {
+            }
+             */
+        } catch (IOException e) {
             throw new StageException(e.getMessage(), e);
         }
         
-        long time = System.currentTimeMillis();
-        String path = url.getPath();
-        String fileName = path.substring(path.lastIndexOf('/')) + "." + time; //tag the downloaded file with the time of retrieval.
-        File workFile = new File(workDir, fileName);
-        
+        File workFile = null;
         try {
+            workFile = File.createTempFile("http-file-download","tmp");
             //log.debug("About to connect.");
             //con.connect();
             //log.debug("Connection status: " + con.getResponseCode());
@@ -164,21 +152,24 @@
             for (int results = 0; (results = in.read(buffer)) != -1;) {
                 out.write(buffer, 0, results);
             }
-        }
-        catch (IOException e) {
+            out.close();
+            in.close();
+        } catch (IOException e) {
             throw new StageException("An error occurred downloading a data file from " + url.toString() + ": " + e.getMessage(), e);
-        }
-        finally {
+        } finally {
             con.disconnect();
         }
         
         this.exqueue(workFile);
     }
     
-
+    
     /**
      * Sets the working directory for the file download. If the directory does
      * not already exist, it will be created during the preprocess() step.
+     * If you do not set this directory, the work directory will be the
+     * default temporary directory for your machine type.
+     * @deprecated Let File.createTempFile worry about were to create files.
      */
     public void setWorkDir(String workDir) {
         this.workDir = workDir;
@@ -210,11 +201,9 @@
             
             if (location.startsWith("http:")) {
                 url = new URL(location);
-            }
-            else if (location.startsWith("/")) {
+            } else if (location.startsWith("/")) {
                 url = new URL("http://" + url.getHost() + location);
-            }
-            else {
+            } else {
                 url = new URL(con.getURL(), location);
             }
             

Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
 /*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.

Modified: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties Tue Jul 26 23:36:55 2005
@@ -1,6 +1,20 @@
+# Copyright 2005 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 # Resource bundle for test resources
 
 test.DigesterPipelineFactoryTest.configFile=test_conf.xml
 test.DigesterPipelineFactoryTest.logConfig=log4j_conf.xml
-test.DigesterPipelineFactoryTest.stage0.class=org.apache.commons.pipeline.impl.FileFinderStage
-test.DigesterPipelineFactoryTest.stage1.class=org.apache.commons.pipeline.impl.LogStage
\ No newline at end of file
+test.DigesterPipelineFactoryTest.stage0.class=org.apache.commons.pipeline.stage.FileFinderStage
+test.DigesterPipelineFactoryTest.stage1.class=org.apache.commons.pipeline.stage.LogStage
\ No newline at end of file

Modified: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml Tue Jul 26 23:36:55 2005
@@ -1,6 +1,24 @@
 <?xml version="1.0" encoding="UTF-8" ?>
 <!--<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">-->
 
+<!--
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Author     : Kris Nuttycombe, National Geophysical Data Center
+-->
+
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
 
     <appender name="log_app" class="org.apache.log4j.FileAppender">
@@ -37,7 +55,7 @@
       <appender-ref ref="external_app" />
     </category>
 
-    <category name="org.apache.commons.pipeline.impl" additivity="false">
+    <category name="org.apache.commons.pipeline.stage" additivity="false">
       <level value="debug" />
       <appender-ref ref="impl_app" />
     </category>



---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org