You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by sk...@apache.org on 2005/07/27 08:37:40 UTC
svn commit: r225468 [1/2] - in /jakarta/commons/sandbox/pipeline/trunk: ./
src/java/org/apache/commons/pipeline/
src/java/org/apache/commons/pipeline/config/
src/java/org/apache/commons/pipeline/driver/
src/java/org/apache/commons/pipeline/stage/ src/t...
Author: skitching
Date: Tue Jul 26 23:36:55 2005
New Revision: 225468
URL: http://svn.apache.org/viewcvs?rev=225468&view=rev
Log:
Refactoring of core package to provide easier StageDriver implementation
Modified:
jakarta/commons/sandbox/pipeline/trunk/project.properties
jakarta/commons/sandbox/pipeline/trunk/project.xml
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java
jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java
jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties
jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml
jakarta/commons/sandbox/pipeline/trunk/src/test/conf/test_conf.xml
jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/config/DigesterPipelineFactoryTest.java
Modified: jakarta/commons/sandbox/pipeline/trunk/project.properties
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/project.properties?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/project.properties (original)
+++ jakarta/commons/sandbox/pipeline/trunk/project.properties Tue Jul 26 23:36:55 2005
@@ -1,3 +1,4 @@
+maven.repo.remote=http://www.apache.org/dist/java-repository
maven.checkstyle.properties = checkstyle.xml
maven.compile.source=1.5
Modified: jakarta/commons/sandbox/pipeline/trunk/project.xml
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/project.xml?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/project.xml (original)
+++ jakarta/commons/sandbox/pipeline/trunk/project.xml Tue Jul 26 23:36:55 2005
@@ -1,117 +1,103 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
<project>
- <extend>../commons-build/sandbox-project.xml</extend>
- <pomVersion>3</pomVersion>
- <name>Commons Pipeline</name>
- <id>commons-pipeline</id>
- <logo>/images/pipeline-logo-white.png</logo>
-
- <currentVersion>0.1.0-dev</currentVersion>
- <inceptionYear>2004</inceptionYear>
- <package>org.apache.commons.pipeline</package>
-
- <shortDescription>A simple pipeline processing framework.</shortDescription>
- <description>
- A framework for constructing pipelined processing systems that support
- concurrent processing of objects in multiple processing stages.
- </description>
-
- <url>http://jakarta.apache.org/commons/sandbox/pipeline</url>
- <siteAddress>people.apache.org</siteAddress>
- <siteDirectory>/www/jakarta.apache.org/commons/sandbox/pipeline</siteDirectory>
-
- <distributionDirectory></distributionDirectory>
-
- <repository>
- <connection></connection>
- <url></url>
-
- </repository>
-
- <developers>
- <developer>
- <name>Kris Nuttycombe</name>
- <email>kris.nuttycombe@noaa.gov</email>
- <organization>National Geophysical Data Center</organization>
- </developer>
- </developers>
-
- <dependencies>
- <dependency>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils</artifactId>
- <version>1.7.0</version>
-
- <url>http://jakarta.apache.org/commons/beanutils</url>
- </dependency>
-
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <version>3.1</version>
-
- <url>http://jakarta.apache.org/commons/collections</url>
- </dependency>
-
- <dependency>
- <groupId>commons-digester</groupId>
- <artifactId>commons-digester</artifactId>
- <version>1.6</version>
-
- <url>http://jakarta.apache.org/commons/digester</url>
- </dependency>
-
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.0.4</version>
-
- <url>http://jakarta.apache.org/commons/logging</url>
- </dependency>
-
- <dependency>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- <version>1.2.1</version>
-
- <url>http://jakarta.apache.org/commons/net</url>
- </dependency>
-
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.8</version>
-
- <url>http://logging.apache.org</url>
- </dependency>
- </dependencies>
-
- <build>
- <nagEmailAddress>kris.nuttycombe@noaa.gov</nagEmailAddress>
- <sourceDirectory>src/java</sourceDirectory>
-
- <!-- Unit test cases -->
-
- <unitTestSourceDirectory>src/test</unitTestSourceDirectory>
- <unitTest>
- <includes>
- <include>**/*Test.java</include>
- </includes>
- <resources>
- <resource>
- <directory>src/test/resources</directory>
-
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- </resource>
- </resources>
- </unitTest>
- </build>
-
-
+ <extend>${basedir}/../commons-build/sandbox-project.xml</extend>
+ <pomVersion>3</pomVersion>
+ <name>Commons Pipeline</name>
+ <id>commons-pipeline</id>
+ <logo>/images/pipeline-logo-white.png</logo>
+ <currentVersion>0.1.0-dev</currentVersion>
+ <inceptionYear>2004</inceptionYear>
+ <package>org.apache.commons.pipeline</package>
+ <shortDescription>A simple pipeline processing framework.</shortDescription>
+ <description>A framework for constructing pipelined processing systems that support
+ concurrent processing of objects in multiple processing stages.</description>
+ <url>http://jakarta.apache.org/commons/sandbox/pipeline</url>
+ <siteAddress>people.apache.org</siteAddress>
+ <siteDirectory>/www/jakarta.apache.org/commons/sandbox/pipeline</siteDirectory>
+ <distributionDirectory></distributionDirectory>
+ <repository>
+ <connection></connection>
+ <url></url>
+ </repository>
+ <developers>
+ <developer>
+ <name>Kris Nuttycombe</name>
+ <id>kjn</id>
+ <email>kris.nuttycombe@noaa.gov</email>
+ <organization>National Geophysical Data Center</organization>
+ </developer>
+ <developer>
+ <name>Travis Stevens</name>
+ <id>tns</id>
+ <email>Travis.Stevens@noaa.gov</email>
+ <organization>National Geophysical Data Center</organization>
+ </developer>
+ </developers>
+ <dependencies>
+ <!-- core library deps -->
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.0.4</version>
+ <type>jar</type>
+ </dependency>
+ <!-- deps for stage & config implementations -->
+ <dependency>
+ <groupId>commons-digester</groupId>
+ <artifactId>commons-digester</artifactId>
+ <version>1.7</version>
+ <type>jar</type>
+ <url>http://jakarta.apache.org/commons/digester</url>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>1.4.0</version>
+ <type>jar</type>
+ <url>http://jakarta.apache.org/commons/net</url>
+ </dependency>
+ <!-- test deps -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.9</version>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils</artifactId>
+ <version>1.7.0</version>
+ <type>jar</type>
+ <url>http://jakarta.apache.org/commons/beanutils</url>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.1</version>
+ <type>jar</type>
+ <url>http://jakarta.apache.org/commons/collections</url>
+ </dependency>
+ </dependencies>
+ <build>
+ <nagEmailAddress>kris.nuttycombe@noaa.gov</nagEmailAddress>
+ <sourceDirectory>src/java</sourceDirectory>
+ <!-- Unit test cases -->
+ <unitTestSourceDirectory>src/test/java</unitTestSourceDirectory>
+ <unitTest>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ <resources>
+ <resource>
+ <directory>src/test/conf</directory>
+ <includes>
+ <include>*.properties</include>
+ <include>*.xml</include>
+ <include>*.txt</include>
+ </includes>
+ </resource>
+ </resources>
+ </unitTest>
+ </build>
</project>
-
-
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/BaseStage.java Tue Jul 26 23:36:55 2005
@@ -1,7 +1,24 @@
/*
- * BaseStage.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* Created on October 12, 2004, 11:31 AM
+ *
+ * $Log: BaseStage.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline;
@@ -10,8 +27,11 @@
import java.util.concurrent.LinkedBlockingQueue;
/**
+ * This is a simple base class for Stages with no-op implementations of the
+ * {@link #preprocess()}, {@link #process()}, {@link #postprocess()},
+ * and {@link #release()} methods.
*
- * @author kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public class BaseStage extends Stage {
@@ -73,7 +93,4 @@
*/
public void release() {
}
-
-
-
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Pipeline.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -12,17 +12,21 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * $Log: Pipeline.java,v $
+ * Revision 1.7 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline;
-import java.lang.Iterable;
-import java.util.*;
-import org.apache.commons.collections.OrderedMap;
-import org.apache.commons.collections.OrderedMapIterator;
-import org.apache.commons.collections.map.ListOrderedMap;
-
-
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.pipeline.driver.SimpleStageDriver;
/**
* This class represents a processing system consisting of a number of stages
@@ -34,71 +38,84 @@
* with methods to start and stop processing for all stages, as well as
* a simple framework for asynchronous event-based communication between stages.
*
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public final class Pipeline implements Iterable<Stage>, Runnable {
private List<StageEventListener> listeners = new ArrayList<StageEventListener>();
-
+
/**
- * Ordered map of stages in the pipeline where the keys are stages and the
- * values are the associated StageDrivers.
+ * List of stages in the pipeline
*/
- protected OrderedMap stages = new ListOrderedMap();
- //private OrderedMap<Stage,StageDriver> stages = new ListOrderedMap<Stage,StageDriver>();
+ protected List<Stage> stages = new ArrayList<Stage>();;
/**
* Map of pipeline branches where the keys are branch names.
*/
protected Map<String,Pipeline> branches = new HashMap<String,Pipeline>();
-
/**
* Creates a new Pipeline
*/
- public Pipeline() { }
+ public Pipeline() {
+ stages = new ArrayList<Stage>();
+ }
+ /**
+ * Creates a new Pipeline with the List of Stages
+ */
+ public Pipeline(List<Stage> stages){
+ for (Stage stage: stages){
+ this.addStage(stage);
+ }
+ }
/**
- * Adds a Stage object to the end of this Pipeline. The pipeline will use
- * the specified StageDriver to run the stage.
- *
- * It is critical that all stages added to a pipeline have distinct hash codes
- * to maintain stage ordering integrity. For this reason, it is
- * strongly suggested that Stage implementations <i>do not</i> override
- * the default {@link java.lang.Object#hashCode() hashCode()} implementation
- * in java.lang.Object.
+ * Adds a {@link Stage} object to the end of this Pipeline. The pipeline will use
+ * the specified {@link StageDriver} to run the stage.
*
* @todo throw IllegalStateException if the stage is being used in a different pipeline
*/
public void addStage(Stage stage, StageDriver driver) {
- if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
- if (driver == null) throw new IllegalArgumentException("Argument \"driver\" for call to Pipeline.addStage() may not be null.");
+ if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage(Stage, StageDriver) may not be null.");
+ if (driver == null) throw new IllegalArgumentException("Argument \"driver\" for call to Pipeline.addStage(Stage, StageDriver) may not be null.");
- stage.setPipeline(this);
- this.stages.put(stage, driver);
+ stage.setStageDriver(driver);
+ this.addStage(stage);
}
+ /**
+ * Adds a {@link Stage} object to the end of this Pipeline.
+ */
+ public void addStage(Stage stage){
+ if (stage == null) throw new IllegalArgumentException("Argument \"stage\" for call to Pipeline.addStage() may not be null.");
+ stage.setPipeline(this);
+ this.stages.add(stage);
+ }
/**
- * Returns the first stage in the pipeline
+ * Returns the first stage in the pipeline, or null if there are no stages
*/
public Stage head() {
- return (Stage) stages.firstKey();
+ if (stages.size() > 0){
+ return (Stage) stages.get(0);
+ } else {
+ return null;
+ }
}
/**
- * Returns the stage after the specified stage in the pipeline
+ * Returns the stage after the specified stage in the pipeline.
*/
public Stage getNextStage(Stage stage) {
- return (Stage) stages.nextKey(stage);
+ int nextIndex = stages.indexOf(stage) + 1;
+ return (stages.size() > nextIndex) ? stages.get(nextIndex) : null;
}
/**
* Returns an Iterator for stages in the pipeline.
*/
public Iterator<Stage> iterator() {
- return (Iterator<Stage>) stages.mapIterator();
+ return stages.iterator();
}
/**
@@ -113,7 +130,6 @@
this.branches.put(key, pipeline);
}
-
/**
* Runs the pipeline from start to finish.
*/
@@ -121,23 +137,26 @@
try {
start();
finish();
- }
- catch (InterruptedException e) {
+ } catch (StageException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
-
/**
- * This method iterates over the stages in the pipeline, looking up a {@link StageDriver}
- * for each stage and using that driver to start the stage. Startups
- * may occur sequentially or in parallel, depending upon the stage driver
- * used.
- */
- public void start() {
- for (OrderedMapIterator iter = stages.orderedMapIterator(); iter.hasNext();) {
- Stage stage = (Stage) iter.next();
- StageDriver driver = (StageDriver) iter.getValue();
+ * This method iterates over the stages in the pipeline, looking up a
+ * {@link StageDriver} for each stage and using that driver to start the stage.
+ * Startups may occur sequentially or in parallel, depending upon the stage driver
+ * used. If a the stage has not been configured with a {@link StageDriver},
+ * we will use the default, non-threaded {@link SimpleStageDriver}.
+ */
+ public void start() throws StageException {
+ for (Stage stage: this.stages){
+ StageDriver driver = stage.getStageDriver();
+ if (driver == null){
+ driver = new SimpleStageDriver();
+ stage.setStageDriver(driver);
+ }
+
driver.start(stage);
}
@@ -158,10 +177,9 @@
* @throws InterruptedException if a worker thread was interrupted at the time
* a stage was asked to finish execution.
*/
- public void finish() throws InterruptedException {
- for (OrderedMapIterator iter = stages.orderedMapIterator(); iter.hasNext();) {
- Stage stage = (Stage) iter.next();
- StageDriver driver = (StageDriver) iter.getValue();
+ public void finish() throws StageException {
+ for (Stage stage: this.stages){
+ StageDriver driver = stage.getStageDriver();
driver.finish(stage);
}
@@ -170,25 +188,22 @@
}
}
-
/**
* Enqueues an object on the first stage if the pipeline is not empty
* @param o the object to enque
*/
public void enqueue(Object o){
- if (!stages.isEmpty()) ((Stage) stages.firstKey()).enqueue(o);
+ if (!stages.isEmpty()) stages.get(0).enqueue(o);
}
-
/**
* This method is used by stages to pass data from one stage to the next.
*/
public void pass(Stage source, Object data) {
- Stage next = (Stage) this.stages.nextKey(source);
+ Stage next = this.getNextStage(source);
if (next != null) next.enqueue(data);
}
-
/**
* Simple method that recursively checks whether the specified
* pipeline is a branch of this pipeline.
@@ -202,7 +217,6 @@
return false;
}
-
/**
* Adds an EventListener to the pipline that will be notified by calls
* to {@link Stage#raise(StageEvent)}.
@@ -210,7 +224,6 @@
public void addEventListener(StageEventListener listener) {
listeners.add(listener);
}
-
/**
* Sequentially notifies each listener in the list of an event, and propagates
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineCreationException.java Tue Jul 26 23:36:55 2005
@@ -1,14 +1,32 @@
/*
- * PipelineCreationException.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* Created on October 4, 2004, 1:35 PM
+ *
+ * $Log: PipelineCreationException.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline;
/**
+ * This is a wrapper exception for use by {@link PipelineFactory}s.
*
- * @author kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public class PipelineCreationException extends java.lang.Exception {
@@ -16,8 +34,7 @@
* Creates a new instance of <code>PipelineCreationException</code> without detail message.
*/
public PipelineCreationException() {
- }
-
+ }
/**
* Constructs an instance of <code>PipelineCreationException</code> with the specified detail message.
@@ -35,4 +52,11 @@
super(msg, cause);
}
+ /**
+ * Constructs an instance of <code>PipelineCreationException</code> with the specified detail message.
+ * @param msg the detail message.
+ */
+ public PipelineCreationException(Throwable cause) {
+ super(cause);
+ }
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/PipelineFactory.java Tue Jul 26 23:36:55 2005
@@ -1,7 +1,24 @@
/*
- * PipelineFactory.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* Created on October 4, 2004, 1:22 PM
+ *
+ * $Log: PipelineFactory.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline;
@@ -11,7 +28,7 @@
/**
* Simple factory interface for creating pipelines.
*
- * @author kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public interface PipelineFactory {
/** Returns a Pipeline created by the factory. */
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/Stage.java Tue Jul 26 23:36:55 2005
@@ -1,44 +1,70 @@
/*
- * Stage.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* Created on November 18, 2004, 10:34 AM
+ *
+ * $Log: Stage.java,v $
+ * Revision 1.5 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.4 2005/07/22 23:18:35 kjn
+ * Added callback in setStageDriver to set the stage's StageMonitor; documentation & code cleanup
*/
package org.apache.commons.pipeline;
-import java.util.*;
+import java.util.Queue;
/**
- * <P>
- * A Stage represents a set of tasks that can be performed on objects
+ * <P>A Stage represents a set of tasks that can be performed on objects
* in a queue, and methods used to communicate with other stages
- * in a {@link Pipeline}.
- * </P>
- * <P>
- * A Stage must provide a unique {@link StageMonitor} object to allow for
+ * in a {@link Pipeline}.</P>
+ * <P>A Stage must provide a unique {@link StageMonitor} object to allow for
* proper handling of multiple processing threads to the {@link StageDriver}
* that runs the stage. Because Stage does not specify the exact behavior of the
* queue (whether it is capacity-bounded or automatically synchronizes accesses,
- * etc) the monitor is necessary to provide proper synchronization.
- * </P>
- * <P>
- * Stages extending this abstract base class automatically establish a relationship
- * with a pipeline when added to that pipeline.
- * </P>
+ * etc) the monitor is necessary to provide proper synchronization.</P>
+ * <P>Stages extending this abstract base class automatically establish a relationship
+ * with a pipeline when added to that pipeline.</P>
*
- * @author Kris Nuttycombe
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public abstract class Stage {
- private Queue<Object> queue;
+
+ /**
+ * The {@link Pipeline} in which this stage will run.
+ */
protected Pipeline pipeline;
+
+ /**
+ * The {@link StageMonitor} that will monitor stage changes and act
+ * as an intermediary between this stage and its {@link StageDriver}.
+ */
protected StageMonitor monitor;
+ // StageDriver that is used to run this stage
+ private StageDriver stageDriver;
+
+ // Queue for objects this stage is to process
+ private Queue<Object> queue;
+
/**
* Builds a new stage with the specified queue.
*/
public Stage(Queue<Object> queue) {
this.queue = queue;
- this.monitor = new StageMonitor();
}
/**
@@ -52,18 +78,29 @@
}
/**
- * Enqueues an object on the wrapped queue. Classes that override this
- * method must also override {@link #poll()}.
+ * Enqueues an object on the wrapped queue. Classes wishing to override this
+ * method should override {@link #innerEnqueue(Object)} instead.
*/
- public void enqueue(Object obj) {
+ public final void enqueue(Object obj) {
+ this.innerEnqueue(obj);
+ if (this.monitor != null) this.monitor.enqueueOccurred();
+ }
+
+ /**
+ * This protected method is designed to be overridden in cases where
+ * additional processing should be performed when an object is enqueued.
+ * Classes that override this method must also override {@link #poll()} if
+ * the underlying queue supporting the stage is changed.
+ */
+ protected void innerEnqueue(Object obj) {
queue.add(obj);
- this.monitor.enqueueOccurred();
}
/**
* Retrieves an object from the head of the wrapped queue, or null
* if the queue is empty. Classes that override this method must also
- * override {@link #enqueue(Object)}
+ * override {@link #innerEnqueue(Object)} if the underlying queue supporting the
+ * stage is changed.
*/
public Object poll() {
synchronized (queue) {
@@ -72,6 +109,15 @@
}
/**
+ * Setter for property stageDriver.
+ * @param stageDriver New value of property stageDriver.
+ */
+ protected final void setStageDriver(StageDriver stageDriver) {
+ this.stageDriver = stageDriver;
+ this.monitor = stageDriver.createStageMonitor(this);
+ }
+
+ /**
* Enqueues the specified object onto the next stage in the pipeline
* if such a stage exists.
*/
@@ -84,9 +130,9 @@
* branch corresponding to the specified key, if such a brach exists.
*/
public final void exqueue(String key, Object obj) {
- Pipeline branch = (Pipeline) this.pipeline.branches.get(key);
+ Pipeline branch = this.pipeline.branches.get(key);
if (branch != null && !branch.stages.isEmpty()) {
- ((Stage) branch.stages.firstKey()).enqueue(obj);
+ branch.head().enqueue(obj);
}
}
@@ -102,26 +148,37 @@
* Getter for wrapped queue.
* @return Value of property queue.
*/
- public Queue getQueue() {
+ public final Queue getQueue() {
return this.queue;
}
/**
- * Setter for wrapped queue.
- * @param queue New value of property queue.
+ * Getter for property stageDriver.
+ * @return Value of property stageDriver.
*/
- public void setQueue(Queue<Object> queue) {
- this.queue = queue;
+ public final StageDriver getStageDriver() {
+ return this.stageDriver;
}
/**
* Returns the monitor for this stage.
*/
- public StageMonitor getMonitor() {
+ public final StageMonitor getMonitor() {
return this.monitor;
}
/**
+ * Stages may not further override hashCode(). This is necessary to maintain stage
+ * ordering integrity within the pipeline.
+ */
+ public final int hashCode() {
+ int retValue;
+
+ retValue = super.hashCode();
+ return retValue;
+ }
+
+ /**
* Implementations of this method should perform any necessary setup that
* needs to be done before any data is processed from this Stage's queue.
*
@@ -159,14 +216,4 @@
*/
public abstract void release();
- /**
- * Stages may not further override hashCode(). This is necessary to maintain stage
- * ordering integrity within the pipeline.
- */
- public final int hashCode() {
- int retValue;
-
- retValue = super.hashCode();
- return retValue;
- }
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageDriver.java Tue Jul 26 23:36:55 2005
@@ -1,32 +1,90 @@
/*
- * StageRunner.java
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* Created on October 6, 2004, 4:30 PM
+ *
+ * $Log: StageDriver.java,v $
+ * Revision 1.5 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.4 2005/07/22 23:20:27 kjn
+ * Changed from interface to abstract base class to allow better use of callbacks
+ * and consolidation of monitor/threading model code.
*/
package org.apache.commons.pipeline;
-import org.apache.commons.pipeline.Stage;
-
+import java.util.List;
/**
+ * This interface is used to define how processing for a stage is started,
+ * stopped, and run. StageDriver implementations may run stages in one or
+ * more threads, and use the {@link StageMonitor} interface to provide communication
+ * between the stage, the driver, and the enclosing pipeline.
*
- * @author kjn
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
-public interface StageDriver {
+public abstract class StageDriver {
+
/**
* Creates and starts new worker thread(s) to process items in the queue.
- * Implementations of this method must call {@link StageMonitor#driverStarting()}
- * on the specified stage's monitor.
*/
- public void start(Stage stage) throws IllegalThreadStateException;
+ public final void start(Stage stage) throws StageException {
+ synchronized (stage) {
+ if (stage.monitor == null || stage.monitor.getState() == StageMonitor.State.STOPPED) {
+ if (stage.monitor == null) stage.setStageDriver(this); // this will set the monitor on the stage using a callback.
+ stage.monitor.startRequested();
+ this.startInternal(stage);
+ }
+ }
+ }
+ /**
+ * Implementations of this method must guarantee that the
+ * {@link StageMonitor#driverStarted()} method will be called on the
+ * specified stage's {@link Stage#getStageMonitor() monitor} when
+ * preprocessing is complete.
+ */
+ protected abstract void startInternal(Stage stage) throws StageException;
/**
- * This method waits for the queue to empty and any processor thread(s) to exit
+ * This method waits for the stage(s) queue(s) to empty and any processor thread(s) to exit
* cleanly and then calls release() to release any resources acquired during processing, if possible.
- * Implementations of this method must call {@link StageMonitor#driverStopped()}
- * on the specified stage's monitor upon completion.
*/
- public void finish(Stage stage) throws InterruptedException;
+ public final void finish(Stage stage) throws StageException {
+ synchronized (stage) {
+ if (stage.monitor != null && stage.monitor.getState() != StageMonitor.State.STOPPED) {
+ stage.monitor.stopRequested();
+ this.finishInternal(stage);
+ }
+ }
+ }
+
+ /**
+ * Implementations of this method must guarantee that the
+ * {@link StageMonitor#driverStopped()} method will be called on the
+ * specified stage's {@link Stage#getStageMonitor() monitor} when
+ * postprocessing is complete and all stage resources have been released.
+ */
+ protected abstract void finishInternal(Stage stage) throws StageException;
+
+ /**
+ * This factory method must return a new instance of a {@link StageMonitor}
+ * that can be used to monitor processing for the specified stage
+ * in conjunction with this driver.
+ */
+ protected abstract StageMonitor createStageMonitor(Stage stage);
+
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageEventListener.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -12,6 +12,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * $Log: StageEventListener.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline;
@@ -22,8 +27,7 @@
/**
* Listener interface for {@link StageEvent}s
*
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public interface StageEventListener extends EventListener {
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageException.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,6 +14,11 @@
* limitations under the License.
*
* Created on December 9, 2003, 4:24 PM
+ *
+ * $Log: StageException.java,v $
+ * Revision 1.3 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline;
@@ -21,10 +26,11 @@
/**
* Exception wrapper class for exceptions that occur while processing a stage.
*
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
-public class StageException extends java.lang.RuntimeException {
+public class StageException extends java.lang.Exception {
+ //Stage within which the error occurred
+ private Stage source;
/**
* Creates a new instance of <code>StageException</code> without detail message.
@@ -41,6 +47,15 @@
super(msg);
}
+ /**
+ * Constructs an instance of <code>StageException</code> with the specified cause.
+ * @param msg the detail message.
+ */
+ public StageException(Throwable thr) {
+ super(thr);
+ }
+
+
/**
* Constructs an instance of <code>StageException</code> with the specified detail message and cause
@@ -49,5 +64,42 @@
*/
public StageException(String msg, Throwable cause) {
super(msg, cause);
+ }
+
+
+ /**
+ * Creates a new instance of <code>StageException</code> without detail message.
+ */
+ public StageException(Stage source) {
+ this.source = source;
+ }
+
+
+ /**
+ * Constructs an instance of <code>StageException</code> with the specified detail message.
+ * @param msg the detail message.
+ */
+ public StageException(Stage source, String msg) {
+ super(msg);
+ this.source = source;
+ }
+
+
+ /**
+ * Constructs an instance of <code>StageException</code> with the specified detail message and cause
+ * @param msg the detail message.
+ * @param cause Throwable that caused this exception.
+ */
+ public StageException(Stage source, String msg, Throwable cause) {
+ super(msg, cause);
+ this.source = source;
+ }
+
+
+ /**
+ * Returns a reference to the Stage object where the exception occurred.
+ */
+ public Stage getSource() {
+ return this.source;
}
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/StageMonitor.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,79 +13,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
+ * $Log: StageMonitor.java,v $
+ * Revision 1.4 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
+ * Revision 1.3 2005/07/21 17:02:43 kjn
+ * Corrected dependencies
+ *
+ * Revision 1.2 2005/07/19 21:07:13 kjn
+ * Changed from class to interface.
*/
package org.apache.commons.pipeline;
-
-import java.util.*;
+import java.util.List;
/**
* A monitor used to control concurrent processing of data in a stage.
*
- * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
- * @version $Rev$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
-public class StageMonitor {
- /** Enumeration of possible states for the stage. */
- public enum Status { STARTING, RUNNING, STOP_REQUESTED, STOPPED }
- private Status status = Status.STOPPED;
- private List<Throwable> errors = new ArrayList<Throwable>();
+public interface StageMonitor {
+ public enum State { STARTING, RUNNING, STOP_REQUESTED, STOPPED, ERROR }
/**
* StageDriver has been requested to start stage processing.
- * Implementations of this method should change the monitor's status to
- * {@link Status#STARTING}.
+ * Implementations of this method should change the driver's state to
+ * {@link State#STARTING}.
*/
- public synchronized void startRequested() {
- if (this.status == Status.STOPPED) this.status = Status.STARTING;
- }
+ public void startRequested();
/**
* StageDriver has started execution.
- * Implementations of this method should change the monitor's status to
- * {@link Status#RUNNING}.
+ * Implementations of this method should change the driver's state to
+ * {@link State#RUNNING}.
*/
- public synchronized void driverStarted() {
- if (this.status == Status.STOPPED || this.status == Status.STARTING) this.status = Status.RUNNING;
- }
+ public void driverStarted();
/**
* StageDriver has been requested to halt stage processing.
- * Implementations of this method should change the monitor's status to
- * {@link Status#STOPPING}.
+ * Implementations of this method should change the driver's state to
+ * {@link State#STOPPING}.
*/
- public synchronized void stopRequested() {
- this.status = Status.STOP_REQUESTED;
- this.notifyAll();
- }
+ public void stopRequested();
/**
* StageDriver has finished execution.
- * Implementations of this method should change the monitor's status to
- * {@link Status#STOPPED}.
+ * Implementations of this method should change the driver's state to
+ * {@link State#STOPPED}.
*/
- public synchronized void driverStopped() {
- this.status = Status.STOPPED;
- }
+ public void driverStopped();
/**
- * Monitor for successful enqueue operations on the stage. Implementations
- * overriding this method must call {@link Object#notifyAll() this.notifyAll()} to
- * ensure that any threads waiting on this monitor are notified.
+ * Notify the driver of successful enqueue operations on any stage managed
+ * by the driver.
*/
- public synchronized void enqueueOccurred() {
- this.notifyAll();
- }
+ public void enqueueOccurred();
/**
* Monitors driver thread interruption failures.
*
* @param fault the faulting exception
*/
- public void driverFailed( InterruptedException fault ) {
- this.errors.add(fault);
- }
+ public void driverFailed( InterruptedException fault );
+
+ /**
+ * Monitors preprocessing failures.
+ *
+ * @param fault the faulting exception
+ */
+ public void preprocessFailed(Throwable fault);
/**
* Monitors handler failures.
@@ -93,22 +90,23 @@
* @param data the data that was being processed as the fault occurred
* @param fault the faulting exception
*/
- public void processingFailed( Object data, Throwable fault ) {
- this.errors.add(fault);
- }
+ public void processingFailed( Object data, Throwable fault);
+
+ /**
+ * Monitors preprocessing failures.
+ *
+ * @param fault the faulting exception
+ */
+ public void postprocessFailed(Throwable fault);
/**
- * Returns the current status of stage processing.
+ * Returns the current state of stage processing.
*/
- public synchronized Status status() {
- return this.status;
- }
+ public State getState();
/**
- * Returns a list of errors recorded by this monitor
+ * Returns a list of errors recorded by this StageDriver
*/
- public List<Throwable> getErrors() {
- return errors;
- }
+ public List<Throwable> getErrors();
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/DigesterPipelineFactory.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,6 +14,11 @@
* limitations under the License.
*
* Created on February 12, 2004, 3:42 PM
+ *
+ * $Log: DigesterPipelineFactory.java,v $
+ * Revision 1.2 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline.config;
@@ -32,8 +37,7 @@
/**
* This factory is designed to simplify creating a pipeline using Digester.
*
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public class DigesterPipelineFactory implements org.apache.commons.pipeline.PipelineFactory {
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/config/PipelineRuleSet.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,6 +14,11 @@
* limitations under the License.
*
* Created on February 12, 2004, 1:48 PM
+ *
+ * $Log: PipelineRuleSet.java,v $
+ * Revision 1.3 2005/07/25 22:04:54 kjn
+ * Corrected Apache licensing, documentation.
+ *
*/
package org.apache.commons.pipeline.config;
@@ -23,8 +28,6 @@
import org.apache.commons.pipeline.*;
import org.xml.sax.Attributes;
-
-
/**
* <P>This is a Digester RuleSet that provides rules for parsing a process pipeline
* XML file.</P>
@@ -42,13 +45,7 @@
* created in this manner are added to the pipeline in the order that they
* occur in the configuration file. The class of the stage is specified by the
* <i>className</i> attribute; all other attributes are used by Digester to set bean
- * properties on the newly created Stage object. At present, Stages configured using
- * this tag must provide a one-argument constructor that takes a StageQueue instance.
- * By default, the stage will be constructed with a
- * {@link org.apache.commons.pipeline.impl.SingleThreadStageQueue SingleThreadStageQueue}
- * instance if the queueClass attribute is not set; otherwise the stage will be
- * constructed with a new instance of the specified class, which should provide
- * a no-arguments constructor.</li>
+ * properties on the newly created Stage object. </li>
*
* <li><enqueue/> - Enqueue an object onto the first stage in the pipeline.</li>
* <li><branch/%gt; - Add a branch to a pipeline. The contents of this tag should
@@ -60,10 +57,7 @@
* attribute.
* </ul>
*
- * @author Kris Nuttycombe, National Geophysical Data Center
- * @version $Revision$
- * @todo Add support for more complicated StageQueue construction and configuration as part of the Stage
- * tag processing.
+ * @author <a href="mailto:Kris.Nuttycombe@noaa.gov">Kris Nuttycombe</a>, National Geophysical Data Center, NOAA
*/
public class PipelineRuleSet extends RuleSetBase {
private static Class[] addBranchTypes = { String.class, Pipeline.class };
@@ -103,7 +97,7 @@
digester.addCallParam("*/pipeline/stage", 0, true);
//this rule is used to create a stage driver for a specific stage
- digester.addObjectCreate("*/pipeline/stage/stageDriver", "org.apache.commons.pipeline.impl.SingleThreadStageDriver", "className");
+ digester.addObjectCreate("*/pipeline/stage/stageDriver", "org.apache.commons.pipeline.driver.SingleThreadStageDriver", "className");
digester.addSetProperties("*/pipeline/stage/stageDriver");
digester.addCallParam("*/pipeline/stage/stageDriver", 1, true);
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/driver/SingleThreadStageDriver.java Tue Jul 26 23:36:55 2005
@@ -42,69 +42,64 @@
//map of stages to worker threads
private Map<Stage, Thread> threadMap = new HashMap<Stage,Thread>();
-
+
/**
- * Default constructor
+ * Default constructor. This sets the thread wait timeout and fault tolerance
+ * values to 500 ms and false, respectively.
*/
public SingleThreadStageDriver() {
+ this(500, false);
}
/**
- *
+ * Creates a new SingleThreadStageDriver with the specified thread wait
+ * timeout and fault tolerance values.
+ *
+ * @param timeout The amount of time, in milliseconds, that the worker thread
+ * will wait before checking the processing state if no objects are available
+ * in the thread's queue.
+ * @param faultTolerant Flag determining the behavior of the driver when
+ * an error is encountered in execution of {@link Stage#process(Object)}.
+ * If this is set to false, any exception thrown during {@link Stage#process(Object)}
+ * will cause the worker thread to halt without executing {@link Stage#postprocess()}
+ * ({@link Stage#release()} will be called.)
*/
public SingleThreadStageDriver(long timeout, boolean faultTolerant) {
this.timeout = timeout;
this.faultTolerant = faultTolerant;
}
-
+
/**
* Creates and starts a new worker thread to process items in the stage's queue.
*/
- public final void start(Stage stage) throws IllegalThreadStateException {
- StageMonitor monitor = stage.getMonitor();
- synchronized (monitor) {
- if (monitor.status() != StageMonitor.Status.STOPPED) {
- throw new IllegalThreadStateException("Processor thread has already been started.");
- }
- }
-
- monitor.startRequested();
-
- log.debug("Starting worker thread.");
+ protected void startInternal(Stage stage) throws StageException {
+ log.debug("Starting worker thread for stage " + stage + ".");
Thread workerThread = new WorkerThread(stage);
workerThread.start();
this.threadMap.put(stage, workerThread);
- log.debug("Worker thread started.");
+ log.debug("Worker thread for stage " + stage + " started.");
}
-
/**
* This method waits for the queue to empty and the processor thread to exit
* cleanly and release any resources acquired during processing, if possible.
*/
- public void finish(Stage stage) throws InterruptedException {
- StageMonitor monitor = stage.getMonitor();
-
- log.debug("Requesting worker thread stop.");
- monitor.stopRequested();
-
- synchronized (monitor) {
- if (monitor.status() == StageMonitor.Status.STOPPED) return;
+ protected void finishInternal(Stage stage) throws StageException {
+ log.debug("Waiting for worker thread stop for stage " + stage + ".");
+ try {
+ this.threadMap.remove(stage).join();
+ } catch (InterruptedException e){
+ throw new StageException(e);
}
-
- log.debug("Waiting for worker thread stop.");
- this.threadMap.remove(stage).join();
- log.debug("Worker thread has finished.");
-
- monitor.driverStopped();
+ log.debug("Worker thread for stage " + stage + " has finished.");
+ stage.getMonitor().driverStopped();
}
-
/**
* Sets the failure tolerance flag for the worker thread. If faultTolerant
* is set to true, {@link StageException StageException}s thrown by
- * the process() method will not interrupt queue processing, but will simply
- * be logged with a severity of ERROR.
+ * the {@link Stage#process(Object)} method will not interrupt queue
+ * processing, but will simply be logged with a severity of ERROR.
*/
public final void setFaultTolerant(boolean faultTolerant) {
this.faultTolerant = faultTolerant;
@@ -118,6 +113,72 @@
return this.faultTolerant;
}
+ /**
+ * Creates a new StageMonitor for the specified stage.
+ */
+ protected StageMonitor createStageMonitor(Stage stage) {
+ return new AbstractStageMonitor(stage) {
+ /**
+ * StageDriver has been requested to start stage processing.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#STARTING}.
+ */
+ public synchronized void startRequested() {
+ if (this.state == State.STOPPED) this.state = State.STARTING;
+ }
+
+ /**
+ * StageDriver has started execution.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#RUNNING}.
+ */
+ public synchronized void driverStarted() {
+ if (this.state == State.STOPPED || this.state == State.STARTING) this.state = State.RUNNING;
+ }
+
+ /**
+ * StageDriver has been requested to halt stage processing.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#STOPPING}.
+ */
+ public synchronized void stopRequested() {
+ this.state = State.STOP_REQUESTED;
+ this.notifyAll();
+ }
+
+ /**
+ * StageDriver has finished execution.
+ * Implementations of this method should change the monitor's state to
+ * {@link State#STOPPED}.
+ */
+ public synchronized void driverStopped() {
+ this.state = State.STOPPED;
+ }
+
+ /**
+ * Monitor for successful enqueue operations on the stage. Implementations
+ * overriding this method must call {@link Object#notifyAll() this.notifyAll()} to
+ * ensure that any threads waiting on this monitor are notified.
+ */
+ public synchronized void enqueueOccurred() {
+ this.notifyAll();
+ }
+
+ /**
+ * Returns the current state of stage processing.
+ */
+ public synchronized State getState() {
+ return this.state;
+ }
+
+ /**
+ * Returns a list of errors recorded by this monitor
+ */
+ public List<Throwable> getErrors() {
+ return errors;
+ }
+ };
+ }
/**
* This worker thread removes and processes data objects from the incoming
@@ -133,7 +194,7 @@
*/
private class WorkerThread extends Thread {
/** The Stage this thread will work on */
- private Stage stage;
+ private Stage stage;
public WorkerThread(Stage stage) {
this.stage = stage;
@@ -142,8 +203,13 @@
public final void run() {
StageMonitor monitor = stage.getMonitor();
try {
- log.debug("preprocessing...");
- stage.preprocess();
+ log.debug("preprocessing stage " + stage + "...");
+ try {
+ stage.preprocess();
+ } catch (Exception t) {
+ monitor.preprocessFailed(t);
+ return;
+ }
monitor.driverStarted();
@@ -153,33 +219,36 @@
obj = stage.poll();
if (obj == null) {
synchronized (monitor) {
- log.debug("Monitor status is: " + monitor.status());
- if (monitor.status() == StageMonitor.Status.STOP_REQUESTED) break running;
+ //log.debug("Monitor getState is: " + monitor.getState());
+ if (monitor.getState() == StageMonitor.State.STOP_REQUESTED
+ || monitor.getState() == StageMonitor.State.ERROR) break running;
+
monitor.wait(timeout);
continue running;
}
- }
- else {
+ } else {
stage.process(obj);
}
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
monitor.driverFailed(e);
- throw new RuntimeException("Driver thread unexpectedly interrupted.", e);
- }
- catch (Throwable t) {
+ throw new RuntimeException("Driver thread unexpectedly interrupted for stage " + stage + ".", e);
+ } catch (Exception t) {
monitor.processingFailed(obj, t);
if (!faultTolerant) {
log.error("Aborting due to error.", t);
- throw new RuntimeException("Stage processing failed, check monitor for details.", t);
+ throw new RuntimeException("Stage processing for stage " + stage + " failed, check monitor for details.", t);
}
}
}
- log.debug("postprocessing...");
- stage.postprocess();
- }
- finally {
+ log.debug("postprocessing stage " + stage + "...");
+ try {
+ stage.postprocess();
+ } catch (Exception t) {
+ monitor.postprocessFailed(t);
+ return;
+ }
+ } finally {
stage.release();
}
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/FtpFileDownloadStage.java Tue Jul 26 23:36:55 2005
@@ -52,20 +52,22 @@
/** Holds value of property password. */
private String password;
-
+ /** Holds value of property port. */
+ private int port;
+
/**
* Default constructor - creates work directory in /tmp
*/
public FtpFileDownloadStage() {
}
-
+
/**
* Constructor specifying work directory.
*/
public FtpFileDownloadStage(String workDir) {
this.workDir = workDir;
}
-
+
/**
* Default constructor - creates work directory in /tmp
*/
@@ -92,7 +94,7 @@
try {
//connect to the ftp site
- client.connect(host);
+ client.connect(host, port);
log.debug(client.getReplyString());
if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
throw new IOException("FTP server at host " + host + " refused connection.");
@@ -103,21 +105,15 @@
if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
throw new IOException("FTP login failed for user " + user + ": " + client.getReplyString());
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new StageException(e.getMessage(), e);
}
}
/**
- * Removes a java.net.URL (an HTTP URL) from the input queue, follows any redirects
- * specified by that URL, and then retrieves the data to a file over an HTTP
- * connection. The file name for download is the
- * last element of the URL path for download appended with a timestamp
- * value, and it is stored in the directory specified by {@link #setWorkDir(String) setWorkDir()}, or to
- * /tmp if no work directory is set.
+ * Retrieves files that match the specified FileSpec from the FTP server
+ * and stores them in the work directory.
*
- * @param obj The URL from which to download data.
* @throws ClassCastException if the parameter obj is not an instance of java.net.URL
*/
public void process(Object obj) throws StageException {
@@ -126,65 +122,82 @@
FileSpec spec = (FileSpec) obj;
try {
+ client.setFileType(spec.type.intValue());
client.changeWorkingDirectory(spec.path);
- log.debug(client.getReplyString());
if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString());
}
- log.debug("FTP connection successfully established to " + host + spec.path);
+ log.debug("FTP connection successfully established to " + host + ":" + spec.path);
//get the list of files
client.enterLocalPassiveMode();
- String[] dirs = client.listNames();
- if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
- throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
- }
- //client.enterLocalActiveMode();
-
- log.debug("FTP file list successfully obtained.");
-
- Pattern pattern = Pattern.compile(spec.pattern);
-
- log.debug("File pattern is " + spec.pattern);
+ searchCurrentDirectory("", spec);
+ } catch (IOException e) {
+ throw new StageException(e.getMessage(), e);
+ }
+ }
+
+
+ /**
+ * Search the current working directory of the FTP client, saving files
+ * to the path specified by workDir + the path to the file on the FTP server.
+ * This method will optionally recursively search directories on the remote server.
+ */
+ private void searchCurrentDirectory(String path, FileSpec spec) throws IOException {
+ FTPFile[] files = client.listFiles();
+ if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
+ throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
+ }
+
+ search: for (FTPFile file : files) {
+ String localPath = path + File.separatorChar + file.getName();
- //create the list of netcdf track files to get
- for (int i = 0; i < dirs.length; i++){
- log.debug("Obtaining files in directory " + dirs[i]);
- String[] files = client.listNames(dirs[i]);
+ if (file.isDirectory() && spec.recursive) {
+ log.debug("Recursing into directory " + file.getName());
+ client.changeWorkingDirectory(file.getName());
+ searchCurrentDirectory(localPath, spec);
+ client.changeToParentDirectory();
+ } else {
+ log.debug("Examining file " + localPath);
+ for (Criterion crit : spec.criteria) {
+ if (!crit.matches(file)) {
+ log.info("File " + localPath + " failed criterion check " + crit);
+ continue search;
+ }
+ }
- for (int j = 0; j < files.length; j++) {
- if (pattern.matcher(files[j]).matches()) {
- log.debug("Matched file name " + files[j] + " against pattern " + spec.pattern);
- File f = new File(workDir + File.separatorChar + files[j]);
- if (! f.getParentFile().exists()) f.getParentFile().mkdir();
-
- OutputStream out = new FileOutputStream(f);
- client.retrieveFile(files[j], out);
- this.exqueue(f);
+ File localFile = new File(workDir + File.separatorChar + localPath);
+ if (localFile.exists() && !spec.overwrite) {
+ //if (spec.)
+ } else {
+ if (! localFile.getParentFile().exists()) localFile.getParentFile().mkdir();
+
+ OutputStream out = new FileOutputStream(localFile);
+ try {
+ client.retrieveFile(file.getName(), out);
+ } finally {
+ out.flush();
+ out.close();
}
}
+
+ this.exqueue(localFile);
}
}
- catch (IOException e) {
- throw new StageException(e.getMessage(), e);
- }
}
-
/**
* Disconnects from FTP server. Errors are logged.
*/
public void release() {
try {
client.disconnect(); //close ftp connection
- }
- catch (IOException e) {
+ } catch (IOException e) {
log.error(e.getMessage(), e);
}
}
-
/**
* Sets the working directory for the file download. If the directory does
* not already exist, it will be created during the preprocess() step.
@@ -240,18 +253,57 @@
this.password = password;
}
+ /**
+ * Getter for property port.
+ * @return Value of property port.
+ */
+ public int getPort() {
+ return this.port;
+ }
+
+ /**
+ * Setter for property port.
+ * @param port New value of property port.
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
/**
* This class is used to specify a path and pattern of file for the FtpFileDownload
* to retrieve.
*/
public static class FileSpec {
+ //enumeration of legal file types
+ public enum FileType {
+ ASCII(FTPClient.ASCII_FILE_TYPE),
+ BINARY(FTPClient.BINARY_FILE_TYPE);
+
+ private int type;
+
+ private FileType(int type) {
+ this.type = type;
+ }
+
+ public int intValue() {
+ return this.type;
+ }
+ }
/** Holds value of property path. */
private String path = "/";
- /** Holds value of property pattern. */
- private String pattern = ".*";
+ /** Holds flag that determines whether or not to perform recursive search of the specified path */
+ private boolean recursive;
+
+ // Holds flag that determines whether or not to overwrite local files
+ private boolean overwrite;
+
+ // Type of file (ascii or binary)
+ private FileType type = FileType.BINARY;
+
+ // List of criteria that the retrieved file must satisfy.
+ private Set<Criterion> criteria = new HashSet<Criterion>();
/** Getter for property path.
* @return Value of property path.
@@ -271,10 +323,10 @@
/** Getter for property pattern.
* @return Value of property pattern.
- *
+ * @deprecated - not retrievable from criterion
*/
public String getPattern() {
- return this.pattern;
+ return null;
}
/** Setter for property pattern.
@@ -282,7 +334,106 @@
*
*/
public void setPattern(String pattern) {
- this.pattern = pattern;
+ this.criteria.add(new FileNameMatchCriterion(pattern));
+ }
+
+ /**
+ * Add a criterion to the set of criteria that must be matched for files
+ * to be downloaded
+ */
+ public void addCriterion(Criterion crit) {
+ this.criteria.add(crit);
+ }
+
+ /**
+ * Sets the flag determining whether or not the stage will recursively
+ * traverse the directory tree to find files.
+ */
+ public void setRecursive(boolean recursive) {
+ this.recursive = recursive;
+ }
+
+ /**
+ * Returns whether or not the stage will recursively
+ * traverse the directory tree to find files.
+ */
+ public boolean isRecursive() {
+ return this.recursive;
+ }
+
+ /**
+ * Sets the file type for the transfer. Legal values are "ascii" and "binary".
+ * Binary transfers are the default.
+ */
+ public void setFileType(String fileType) {
+ if ("ascii".equalsIgnoreCase(fileType)) {
+ this.type = FileType.ASCII;
+ } else {
+ this.type = FileType.BINARY;
+ }
+ }
+
+ /**
+ * Returns the file type for the transfer.
+ */
+ public String getFileType() {
+ return this.type.toString();
+ }
+ }
+
+ /**
+ * This class is used to specify a criterion that the downloaded file
+ * must satisfy.
+ */
+ public interface Criterion {
+ public boolean matches(FTPFile file);
+ }
+
+ /**
+ * Matches file names based upon the Java regex supplied in the constructor.
+ */
+ public static class FileNameMatchCriterion implements Criterion {
+ // precompiled pattern used to match filenames
+ private Pattern pattern;
+ private String _pattern;
+
+ public FileNameMatchCriterion(String pattern) {
+ this._pattern = pattern;
+ this.pattern = Pattern.compile(pattern);
+ }
+
+ public boolean matches(FTPFile file) {
+ return pattern.matcher(file.getName()).matches();
+ }
+
+ public String toString() {
+ return "filename matches pattern " + _pattern;
+ }
+ }
+
+ /**
+ * Matches files based upon a set of date constraints
+ */
+ public static class FileDateMatchCriterion implements Criterion {
+ private Date startDate;
+ private Date endDate;
+
+ public FileDateMatchCriterion(Date startDate, Date endDate) {
+ this.startDate = startDate;
+ this.endDate = endDate;
+ }
+
+ public boolean matches(FTPFile file) {
+ Calendar cal = file.getTimestamp();
+ if ((startDate != null && cal.getTime().before(startDate)) || (endDate != null && cal.getTime().after(endDate))) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public String toString() {
+ return "file date is between " + startDate + " and " + endDate;
}
}
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/HttpFileDownloadStage.java Tue Jul 26 23:36:55 2005
@@ -60,6 +60,7 @@
/**
* Creates a new instance of HttpFileDownload with the specified work directory
* into which to download files.
+ * @deprecated Let File.createTempFile take care of the working directory issue.
*/
public HttpFileDownloadStage(Queue<Object> queue, String workDir) {
super(queue);
@@ -67,15 +68,6 @@
}
/**
- * Creates the directory {@link #setWorkDir(String) workDir} if it does
- * not exist.
- */
- public void preprocess() throws StageException {
- if (fworkDir == null) fworkDir = new File(workDir);
- if (!this.fworkDir.exists()) fworkDir.mkdirs();
- }
-
- /**
* Removes a java.net.URL (an HTTP URL) from the input queue, follows any redirects
* specified by that URL, and then retrieves the data to a file over an HTTP
* connection. The file name for download is the
@@ -87,14 +79,14 @@
* @throws ClassCastException if the parameter obj is not an instance of java.net.URL
*/
public void process(Object obj) throws StageException {
- if (!this.fworkDir.exists()) throw new StageException("The work directory for file download " + workDir.toString() + " does not exist.");
Map params = new HashMap();
URL url;
try {
if (obj instanceof String) {
+ /*
String loc = (String) obj;
- /*int paramIndex = loc.indexOf('?');
+ int paramIndex = loc.indexOf('?');
if (paramIndex > 0) {
url = new URL(loc.substring(0, paramIndex));
for (StringTokenizer st = new StringTokenizer(loc.substring(paramIndex + 1), "&"); st.hasMoreTokens();) {
@@ -108,18 +100,16 @@
}
}
}
- else {*/
- url = new URL((String) obj);
+ else {
+ */
+ url = new URL((String) obj);
//}
- }
- else if (obj instanceof URL) {
+ } else if (obj instanceof URL) {
url = (URL) obj;
- }
- else {
+ } else {
throw new IllegalArgumentException("Unrecognized parameter class to process() for HttpFileDownload: " + obj.getClass().getName() + "; must be URL or String");
}
- }
- catch (MalformedURLException e) {
+ } catch (MalformedURLException e) {
throw new StageException("Malformed URL: " + obj.toString(), e);
}
@@ -137,24 +127,22 @@
java.net.HttpURLConnection con = null;
try {
con = (java.net.HttpURLConnection) url.openConnection();
- /*if (!params.isEmpty()) {
+ /*
+ if (!params.isEmpty()) {
con.setRequestMethod("GET");
for (Iterator iter = params.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
con.setRequestProperty((String) entry.getKey(), (String) entry.getValue());
}
- }*/
- }
- catch (IOException e) {
+ }
+ */
+ } catch (IOException e) {
throw new StageException(e.getMessage(), e);
}
- long time = System.currentTimeMillis();
- String path = url.getPath();
- String fileName = path.substring(path.lastIndexOf('/')) + "." + time; //tag the downloaded file with the time of retrieval.
- File workFile = new File(workDir, fileName);
-
+ File workFile = null;
try {
+ workFile = File.createTempFile("http-file-download","tmp");
//log.debug("About to connect.");
//con.connect();
//log.debug("Connection status: " + con.getResponseCode());
@@ -164,21 +152,24 @@
for (int results = 0; (results = in.read(buffer)) != -1;) {
out.write(buffer, 0, results);
}
- }
- catch (IOException e) {
+ out.close();
+ in.close();
+ } catch (IOException e) {
throw new StageException("An error occurred downloading a data file from " + url.toString() + ": " + e.getMessage(), e);
- }
- finally {
+ } finally {
con.disconnect();
}
this.exqueue(workFile);
}
-
+
/**
* Sets the working directory for the file download. If the directory does
* not already exist, it will be created during the preprocess() step.
+ * If you do not set this directory, the work directory will be the
+ * default temporary directory for your machine type.
+ * @deprecated Let File.createTempFile worry about were to create files.
*/
public void setWorkDir(String workDir) {
this.workDir = workDir;
@@ -210,11 +201,9 @@
if (location.startsWith("http:")) {
url = new URL(location);
- }
- else if (location.startsWith("/")) {
+ } else if (location.startsWith("/")) {
url = new URL("http://" + url.getHost() + location);
- }
- else {
+ } else {
url = new URL(con.getURL(), location);
}
Modified: jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/java/org/apache/commons/pipeline/stage/LogStage.java Tue Jul 26 23:36:55 2005
@@ -1,5 +1,5 @@
/*
- * Copyright 2004 The Apache Software Foundation
+ * Copyright 2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Modified: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/TestResources.properties Tue Jul 26 23:36:55 2005
@@ -1,6 +1,20 @@
+# Copyright 2005 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
# Resource bundle for test resources
test.DigesterPipelineFactoryTest.configFile=test_conf.xml
test.DigesterPipelineFactoryTest.logConfig=log4j_conf.xml
-test.DigesterPipelineFactoryTest.stage0.class=org.apache.commons.pipeline.impl.FileFinderStage
-test.DigesterPipelineFactoryTest.stage1.class=org.apache.commons.pipeline.impl.LogStage
\ No newline at end of file
+test.DigesterPipelineFactoryTest.stage0.class=org.apache.commons.pipeline.stage.FileFinderStage
+test.DigesterPipelineFactoryTest.stage1.class=org.apache.commons.pipeline.stage.LogStage
\ No newline at end of file
Modified: jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml
URL: http://svn.apache.org/viewcvs/jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml?rev=225468&r1=225467&r2=225468&view=diff
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml (original)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/conf/log4j_conf.xml Tue Jul 26 23:36:55 2005
@@ -1,6 +1,24 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">-->
+<!--
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Author : Kris Nuttycombe, National Geophysical Data Center
+-->
+
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="log_app" class="org.apache.log4j.FileAppender">
@@ -37,7 +55,7 @@
<appender-ref ref="external_app" />
</category>
- <category name="org.apache.commons.pipeline.impl" additivity="false">
+ <category name="org.apache.commons.pipeline.stage" additivity="false">
<level value="debug" />
<appender-ref ref="impl_app" />
</category>
---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org