You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/27 16:26:48 UTC

[1/6] incubator-taverna-engine git commit: simpler names in pom.xml

Repository: incubator-taverna-engine
Updated Branches:
  refs/heads/master 44a6bb386 -> 315a829c3


simpler names in pom.xml


Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/commit/46cc1539
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/tree/46cc1539
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/diff/46cc1539

Branch: refs/heads/master
Commit: 46cc1539242308e966eb6c9d381922edc226127f
Parents: 44a6bb3
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Mon Feb 23 15:45:41 2015 +0000
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Mon Feb 23 15:45:41 2015 +0000

----------------------------------------------------------------------
 taverna-capability-impl/pom.xml               | 2 +-
 taverna-credential-manager-impl/pom.xml       | 2 +-
 taverna-database-configuration-impl/pom.xml   | 2 +-
 taverna-execution-impl/pom.xml                | 2 +-
 taverna-observer/pom.xml                      | 2 +-
 taverna-reference-impl/pom.xml                | 2 +-
 taverna-reference-testhelpers/pom.xml         | 2 +-
 taverna-run-impl/pom.xml                      | 2 +-
 taverna-services-impl/pom.xml                 | 2 +-
 taverna-workflowmodel-core-extensions/pom.xml | 2 +-
 taverna-workflowmodel-impl/pom.xml            | 2 +-
 11 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-capability-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-capability-impl/pom.xml b/taverna-capability-impl/pom.xml
index 14c6212..c4fa489 100644
--- a/taverna-capability-impl/pom.xml
+++ b/taverna-capability-impl/pom.xml
@@ -7,7 +7,7 @@
 	</parent>
 	<artifactId>taverna-capability-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Platform Capability Implementation</name>
+	<name>Apache Taverna Platform Capability impl</name>
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-credential-manager-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-credential-manager-impl/pom.xml b/taverna-credential-manager-impl/pom.xml
index ab1536c..ddd5409 100644
--- a/taverna-credential-manager-impl/pom.xml
+++ b/taverna-credential-manager-impl/pom.xml
@@ -8,7 +8,7 @@
 	</parent>
 	<artifactId>taverna-credential-manager-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Credential Manager Implementation</name>
+	<name>Apache Taverna Credential Manager impl</name>
 
   <profiles>
     <profile>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-database-configuration-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-database-configuration-impl/pom.xml b/taverna-database-configuration-impl/pom.xml
index 07bd9b7..39034f4 100644
--- a/taverna-database-configuration-impl/pom.xml
+++ b/taverna-database-configuration-impl/pom.xml
@@ -7,7 +7,7 @@
 	</parent>
 	<artifactId>taverna-database-configuration-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Database Configuration implementation</name>
+	<name>Apache Taverna Database Configuration impl</name>
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-execution-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-execution-impl/pom.xml b/taverna-execution-impl/pom.xml
index ec7a6b0..3c1f0e8 100644
--- a/taverna-execution-impl/pom.xml
+++ b/taverna-execution-impl/pom.xml
@@ -7,7 +7,7 @@
 	</parent>
 	<artifactId>taverna-execution-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Platform Execution Service Implementation</name>
+	<name>Apache Taverna Platform Execution Service impl</name>
 	<description>A Service for executing Taverna workflows</description>
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-observer/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-observer/pom.xml b/taverna-observer/pom.xml
index 1ec7d62..4b94a2e 100644
--- a/taverna-observer/pom.xml
+++ b/taverna-observer/pom.xml
@@ -9,7 +9,7 @@
 	<artifactId>taverna-observer</artifactId>
   <packaging>bundle</packaging>
 	<name>Apache Taverna Observer pattern</name>
-	<description>Implementation of the Observer pattern</description>
+	<description>impl of the Observer pattern</description>
 	<dependencies>
 		<dependency>
 			<groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-reference-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-reference-impl/pom.xml b/taverna-reference-impl/pom.xml
index 0eb9e8c..98f3b4d 100644
--- a/taverna-reference-impl/pom.xml
+++ b/taverna-reference-impl/pom.xml
@@ -8,7 +8,7 @@
 	</parent>
 	<artifactId>taverna-reference-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Reference Manager Implementation</name>
+	<name>Apache Taverna Reference Manager impl</name>
 	<description>
 		Implementations of the core APIs, not including extension point
 		implementations.

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-reference-testhelpers/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-reference-testhelpers/pom.xml b/taverna-reference-testhelpers/pom.xml
index 0f99adf..a9730e3 100644
--- a/taverna-reference-testhelpers/pom.xml
+++ b/taverna-reference-testhelpers/pom.xml
@@ -15,7 +15,7 @@
 		we need a module that is entire external
 		to t2reference and to the test cases. If the test
 		implementations are included in either the api, core
-		implementation or test modules they will be loaded by the root
+		implementations or test modules they will be loaded by the root
 		classloader of the test runner - by putting them in an
 		independent artifact we allow them to be loaded through
 		various SPI discovery mechanisms as they would be in a 'real'

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-run-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-run-impl/pom.xml b/taverna-run-impl/pom.xml
index a451f6c..80429a1 100644
--- a/taverna-run-impl/pom.xml
+++ b/taverna-run-impl/pom.xml
@@ -7,7 +7,7 @@
 	</parent>
 	<artifactId>taverna-run-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Platform Run Service Implementation</name>
+	<name>Apache Taverna Platform Run Service impl</name>
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-services-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-services-impl/pom.xml b/taverna-services-impl/pom.xml
index c975a97..a588e4d 100644
--- a/taverna-services-impl/pom.xml
+++ b/taverna-services-impl/pom.xml
@@ -7,7 +7,7 @@
 	</parent>
 	<artifactId>taverna-services-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Platform Services Implementation</name>
+	<name>Apache Taverna Platform Services impl</name>
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-workflowmodel-core-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/pom.xml b/taverna-workflowmodel-core-extensions/pom.xml
index b19714f..700e56d 100644
--- a/taverna-workflowmodel-core-extensions/pom.xml
+++ b/taverna-workflowmodel-core-extensions/pom.xml
@@ -7,7 +7,7 @@
     </parent>
 	<artifactId>taverna-workflowmodel-core-extensions</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Workflow Model Core Extension Points</name>
+	<name>Apache Taverna Workflow Model Extension Points</name>
 	<description>Implementation of core extension points to the workflow model</description>
 	<dependencies>
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-workflowmodel-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/pom.xml b/taverna-workflowmodel-impl/pom.xml
index 7231e9d..93dfea1 100644
--- a/taverna-workflowmodel-impl/pom.xml
+++ b/taverna-workflowmodel-impl/pom.xml
@@ -8,7 +8,7 @@
 	</parent>
 	<artifactId>taverna-workflowmodel-impl</artifactId>
 	<packaging>bundle</packaging>
-	<name>Apache Taverna Workflow Model implementation</name>
+	<name>Apache Taverna Workflow Model impl</name>
 	<description> Implementation of the core workflow object model for
 		Taverna workflows including
 		concrete instances of the workflow


[2/6] incubator-taverna-engine git commit: taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
new file mode 100644
index 0000000..3169f8c
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
@@ -0,0 +1,163 @@
+/**
+ *
+ */
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.sf.taverna.t2.reference.WorkflowRunIdEntity;
+import net.sf.taverna.t2.workflowmodel.ConfigurationException;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * This layer allows for the cancellation, pausing and resuming of workflow
+ * runs. It does so by intercepting jobs sent to the layer.
+ *
+ * @author alanrw
+ */
+public class Stop extends AbstractDispatchLayer<JsonNode> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop";
+	/**
+	 * The set of ids of workflow runs that have been cancelled.
+	 */
+	private static Set<String> cancelledWorkflowRuns = new HashSet<>();
+	/**
+	 * A map from workflow run ids to the set of Stop layers where jobs have
+	 * been intercepted for that run.
+	 */
+	private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<>();
+	/**
+	 * A map for a given Stop from ids of suspended workflow runs to the jobs
+	 * that have been intercepted.
+	 */
+	private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<>();
+
+	@Override
+	public void configure(JsonNode conf) throws ConfigurationException {
+		// nothing
+	}
+
+	@Override
+	public JsonNode getConfiguration() {
+		return null;
+	}
+
+	@Override
+	public void receiveJob(final DispatchJobEvent jobEvent) {
+		List<WorkflowRunIdEntity> entities = jobEvent.getContext().getEntities(
+				WorkflowRunIdEntity.class);
+		if (entities != null && !entities.isEmpty()) {
+			final String wfRunId = entities.get(0).getWorkflowRunId();
+			// If the workflow run is cancelled then simply "eat" the jobEvent.
+			// This does a hard-cancel.
+			if (cancelledWorkflowRuns.contains(wfRunId))
+				return;
+			// If the workflow run is paused
+			if (pausedLayerMap.containsKey(wfRunId))
+				synchronized (Stop.class) {
+					// double check as pausedLayerMap may have been changed
+					// waiting for the lock
+					if (pausedLayerMap.containsKey(wfRunId)) {
+						// Remember that this Stop layer was affected by the
+						// workflow pause
+						pausedLayerMap.get(wfRunId).add(this);
+						if (!suspendedJobEventMap.containsKey(wfRunId))
+							suspendedJobEventMap.put(wfRunId,
+									new HashSet<DispatchJobEvent>());
+						// Remember the suspended jobEvent
+						suspendedJobEventMap.get(wfRunId).add(jobEvent);
+						return;
+					}
+				}
+		}
+		// By default pass the jobEvent down to the next layer
+		super.receiveJob(jobEvent);
+	}
+
+	@Override
+	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+		super.receiveJobQueue(jobQueueEvent);
+	}
+
+	/**
+	 * Cancel the workflow run with the specified id
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to cancel
+	 * @return If the workflow run was cancelled then true. If it was already
+	 *         cancelled then false.
+	 */
+	public static synchronized boolean cancelWorkflow(String workflowRunId) {
+		if (cancelledWorkflowRuns.contains(workflowRunId))
+			return false;
+		Set<String> cancelledWorkflowRunsCopy = new HashSet<>(
+				cancelledWorkflowRuns);
+		cancelledWorkflowRunsCopy.add(workflowRunId);
+		cancelledWorkflowRuns = cancelledWorkflowRunsCopy;
+		return true;
+	}
+
+	/**
+	 * Pause the workflow run with the specified id
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to pause
+	 * @return If the workflow run was paused then true. If it was already
+	 *         paused or cancelled then false.
+	 */
+	public static synchronized boolean pauseWorkflow(String workflowRunId) {
+		if (cancelledWorkflowRuns.contains(workflowRunId))
+			return false;
+		if (pausedLayerMap.containsKey(workflowRunId))
+			return false;
+		Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>(pausedLayerMap);
+		pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
+		pausedLayerMap = pausedLayerMapCopy;
+		return true;
+	}
+
+	/**
+	 * Resume the workflow run with the specified id
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to resume
+	 * @return If the workflow run was resumed then true. If the workflow run
+	 *         was not paused or it was cancelled, then false.
+	 */
+	public static synchronized boolean resumeWorkflow(String workflowRunId) {
+		if (cancelledWorkflowRuns.contains(workflowRunId))
+			return false;
+		if (!pausedLayerMap.containsKey(workflowRunId))
+			return false;
+		Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>();
+		pausedLayerMapCopy.putAll(pausedLayerMap);
+		Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
+		pausedLayerMap = pausedLayerMapCopy;
+		for (Stop s : stops)
+			s.resumeLayerWorkflow(workflowRunId);
+		return true;
+	}
+
+	/**
+	 * Resume the workflow run with the specified id on this Stop layer. This
+	 * method processes any suspended job events.
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to resume.
+	 */
+	private void resumeLayerWorkflow(String workflowRunId) {
+		synchronized (Stop.class) {
+			for (DispatchJobEvent dje : suspendedJobEventMap
+					.remove(workflowRunId))
+				receiveJob(dje);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
new file mode 100644
index 0000000..fe6e73f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
@@ -0,0 +1,4 @@
+<body>
+Contains implementations of DispatchLayer defined by the core Taverna 2
+specification.
+</body>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
new file mode 100644
index 0000000..dc55fc7
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans:beans xmlns="http://www.springframework.org/schema/osgi" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xmlns:beans="http://www.springframework.org/schema/beans"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans 
+                                 http://www.springframework.org/schema/beans/spring-beans.xsd
+                                 http://www.springframework.org/schema/osgi 
+                                 http://www.springframework.org/schema/osgi/spring-osgi.xsd" >
+
+	<service ref="coreDispatchLayerFactory" interface="net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory" />
+
+</beans:beans>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
new file mode 100644
index 0000000..90ed75f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans 
+                           http://www.springframework.org/schema/beans/spring-beans.xsd">
+                           
+	<bean id="coreDispatchLayerFactory" class="net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.CoreDispatchLayerFactory" />
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
new file mode 100644
index 0000000..e202df1
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
@@ -0,0 +1,110 @@
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import static org.junit.Assert.*;
+
+public class TestRetry {
+    
+    @Test
+    public void defaultConfig() throws Exception {
+        Retry retry = new Retry();
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(0, configuration.get("maxRetries").intValue());
+        assertEquals(1000, configuration.get("initialDelay").intValue());
+        assertEquals(5000, configuration.get("maxDelay").intValue());
+        assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+
+    @Test
+    public void customConfig() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(15, configuration.get("maxRetries").intValue());
+        assertEquals(150, configuration.get("initialDelay").intValue());
+        assertEquals(1200, configuration.get("maxDelay").intValue());
+        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+    
+    @Test
+    public void configureEmpty() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        JsonNode empty = JsonNodeFactory.instance.objectNode();
+        retry.configure(empty);
+        // We would expect missing values to be replaced with the
+        // DEFAULT values rather than the previous values
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(0, configuration.get("maxRetries").intValue());
+        assertEquals(1000, configuration.get("initialDelay").intValue());
+        assertEquals(5000, configuration.get("maxDelay").intValue());
+        assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+
+    @Test
+    public void configurePartly() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxRetries", 15);
+        conf.put("backoffFactor", 1.2);
+        retry.configure(conf);
+        // We would expect to see the new values
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(15, configuration.get("maxRetries").intValue());
+        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+        // And the default values (not the previous values!)
+        assertEquals(1000, configuration.get("initialDelay").intValue());
+        assertEquals(5000, configuration.get("maxDelay").intValue());
+    }    
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void invalidMaxRetries() throws Exception {
+        Retry retry = new Retry();
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxRetries", -15);
+        retry.configure(conf);
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void invalidInitialDelay() throws Exception {
+        Retry retry = new Retry();
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("initialDelay", -15);
+        retry.configure(conf);
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void invalidMaxDelay() throws Exception {
+        Retry retry = new Retry();
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxDelay", 150);
+        // Valid on its own, but less than the default initialDelay of 1000!
+        retry.configure(conf);
+    }
+
+    
+    @Test
+    public void invalidConfigureRecovers() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxRetries", -15);
+        try { 
+            retry.configure(conf);
+        } catch (IllegalArgumentException ex) {
+            // As expected
+        }
+        // We would expect the earlier values to persist
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(15, configuration.get("maxRetries").intValue());
+        assertEquals(150, configuration.get("initialDelay").intValue());
+        assertEquals(1200, configuration.get("maxDelay").intValue());
+        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+    
+    // TODO: Testing the Retry layer without making a big dispatch stack and job context
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/pom.xml b/taverna-workflowmodel-impl/pom.xml
index 93dfea1..97367e5 100644
--- a/taverna-workflowmodel-impl/pom.xml
+++ b/taverna-workflowmodel-impl/pom.xml
@@ -50,7 +50,7 @@
 		</dependency>
 		<dependency>
 			<groupId>${project.parent.groupId}</groupId>
-			<artifactId>taverna-workflowmodel-core-extensions</artifactId>
+			<artifactId>taverna-workflowmodel-extensions</artifactId>
 			<version>${project.parent.version}</version>
 			<!--<scope>test</scope> -->
 		</dependency>


[5/6] incubator-taverna-engine git commit: taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions

Posted by st...@apache.org.
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions


Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/commit/4252fa90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/tree/4252fa90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/diff/4252fa90

Branch: refs/heads/master
Commit: 4252fa90112703905bed89dc4b46f59a77b87e9c
Parents: 46cc153
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Mon Feb 23 15:48:10 2015 +0000
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Mon Feb 23 15:48:10 2015 +0000

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 taverna-platform-integration-tests/pom.xml      |   2 +-
 .../.gitignore                                  |   1 -
 taverna-workflowmodel-core-extensions/pom.xml   |  30 --
 .../layers/CoreDispatchLayerFactory.java        | 103 ----
 .../processor/dispatch/layers/ErrorBounce.java  | 324 ------------
 .../processor/dispatch/layers/Failover.java     | 111 ----
 .../dispatch/layers/IntermediateProvenance.java | 508 -------------------
 .../processor/dispatch/layers/Invoke.java       | 369 --------------
 .../processor/dispatch/layers/Loop.java         | 424 ----------------
 .../dispatch/layers/LoopConfiguration.java      |  75 ---
 .../processor/dispatch/layers/Parallelize.java  | 463 -----------------
 .../dispatch/layers/ParallelizeConfig.java      |  50 --
 .../processor/dispatch/layers/Retry.java        | 180 -------
 .../processor/dispatch/layers/RetryConfig.java  |  97 ----
 .../processor/dispatch/layers/Stop.java         | 163 ------
 .../processor/dispatch/layers/package.html      |   4 -
 ...rkflowmodel-core-extensions-context-osgi.xml |  11 -
 .../workflowmodel-core-extensions-context.xml   |   9 -
 .../processor/dispatch/layers/TestRetry.java    | 110 ----
 taverna-workflowmodel-extensions/.gitignore     |   1 +
 taverna-workflowmodel-extensions/pom.xml        |  30 ++
 .../layers/CoreDispatchLayerFactory.java        | 103 ++++
 .../processor/dispatch/layers/ErrorBounce.java  | 324 ++++++++++++
 .../processor/dispatch/layers/Failover.java     | 111 ++++
 .../dispatch/layers/IntermediateProvenance.java | 508 +++++++++++++++++++
 .../processor/dispatch/layers/Invoke.java       | 369 ++++++++++++++
 .../processor/dispatch/layers/Loop.java         | 424 ++++++++++++++++
 .../dispatch/layers/LoopConfiguration.java      |  75 +++
 .../processor/dispatch/layers/Parallelize.java  | 463 +++++++++++++++++
 .../dispatch/layers/ParallelizeConfig.java      |  50 ++
 .../processor/dispatch/layers/Retry.java        | 180 +++++++
 .../processor/dispatch/layers/RetryConfig.java  |  97 ++++
 .../processor/dispatch/layers/Stop.java         | 163 ++++++
 .../processor/dispatch/layers/package.html      |   4 +
 ...rkflowmodel-core-extensions-context-osgi.xml |  11 +
 .../workflowmodel-core-extensions-context.xml   |   9 +
 .../processor/dispatch/layers/TestRetry.java    | 110 ++++
 taverna-workflowmodel-impl/pom.xml              |   2 +-
 39 files changed, 3035 insertions(+), 3035 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index daef4fd..001d17c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
     <module>taverna-services-impl</module>
     <module>taverna-stringconstant-activity</module>
     <module>taverna-workflowmodel-api</module>
-    <module>taverna-workflowmodel-core-extensions</module>
+    <module>taverna-workflowmodel-extensions</module>
     <module>taverna-workflowmodel-impl</module>
   </modules>
 	<dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-platform-integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-platform-integration-tests/pom.xml b/taverna-platform-integration-tests/pom.xml
index 362fbdc..a958d1c 100644
--- a/taverna-platform-integration-tests/pom.xml
+++ b/taverna-platform-integration-tests/pom.xml
@@ -246,7 +246,7 @@
 		</dependency>
 		<dependency>
 			<groupId>${project.parent.groupId}</groupId>
-			<artifactId>taverna-workflowmodel-core-extensions</artifactId>
+			<artifactId>taverna-workflowmodel-extensions</artifactId>
 			<version>${project.parent.version}</version>
 		</dependency>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/.gitignore
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/.gitignore b/taverna-workflowmodel-core-extensions/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/taverna-workflowmodel-core-extensions/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/pom.xml b/taverna-workflowmodel-core-extensions/pom.xml
deleted file mode 100644
index 700e56d..0000000
--- a/taverna-workflowmodel-core-extensions/pom.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-    <parent>
-			<groupId>org.apache.taverna.engine</groupId>
-			<artifactId>taverna-engine</artifactId>
-			<version>3.1.0-incubating-SNAPSHOT</version>
-    </parent>
-	<artifactId>taverna-workflowmodel-core-extensions</artifactId>
-	<packaging>bundle</packaging>
-	<name>Apache Taverna Workflow Model Extension Points</name>
-	<description>Implementation of core extension points to the workflow model</description>
-	<dependencies>
-    <dependency>
-        <groupId>${project.parent.groupId}</groupId>
-        <artifactId>taverna-workflowmodel-api</artifactId>
-        <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>com.fasterxml.jackson.core</groupId>
-        <artifactId>jackson-databind</artifactId>
-        <version>${jackson.version}</version>
-    </dependency>
-    <dependency>
-        <groupId>junit</groupId>
-        <artifactId>junit</artifactId>
-        <version>${junit.version}</version>
-        <scope>test</scope>
-    </dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
deleted file mode 100644
index 0b11627..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
-
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Factory for creating core dispatch layers.
- *
- * The core dispatch layers are :
- * <ul>
- * <li>ErrorBounce</li>
- * <li>Parallelize</li>
- * <li>Failover</li>
- * <li>Retry</li>
- * <li>Stop</li>
- * <li>Invoke</li>
- * <li>Loop</li>
- * <li>IntermediateProvenance</li>
- * </ul>
- *
- * @author David Withers
- */
-public class CoreDispatchLayerFactory implements DispatchLayerFactory {
-	private static final URI parallelizeLayer = URI.create(Parallelize.URI);
-	private static final URI errorBounceLayer = URI.create(ErrorBounce.URI);
-	private static final URI failoverLayer = URI.create(Failover.URI);
-	private static final URI retryLayer = URI.create(Retry.URI);
-	private static final URI invokeLayer = URI.create(Invoke.URI);
-	private static final URI loopLayer = URI.create(Loop.URI);
-	private static final URI intermediateProvenanceLayer = URI.create(IntermediateProvenance.URI);
-	private static final URI stopLayer = URI.create(Stop.URI);
-
-	private final static Set<URI> dispatchLayerURIs = new HashSet<URI>();
-
-	static {
-		dispatchLayerURIs.add(parallelizeLayer);
-		dispatchLayerURIs.add(errorBounceLayer);
-		dispatchLayerURIs.add(failoverLayer);
-		dispatchLayerURIs.add(retryLayer);
-		dispatchLayerURIs.add(invokeLayer);
-		dispatchLayerURIs.add(loopLayer);
-		dispatchLayerURIs.add(intermediateProvenanceLayer);
-		dispatchLayerURIs.add(stopLayer);
-	}
-
-	@Override
-	public DispatchLayer<?> createDispatchLayer(URI uri) {
-		if (parallelizeLayer.equals(uri))
-			return new Parallelize();
-		else if (errorBounceLayer.equals(uri))
-			return new ErrorBounce();
-		else if (failoverLayer.equals(uri))
-			return new Failover();
-		else if (retryLayer.equals(uri))
-			return new Retry();
-		else if (invokeLayer.equals(uri))
-			return new Invoke();
-		else if (loopLayer.equals(uri))
-			return new Loop();
-		else if (intermediateProvenanceLayer.equals(uri))
-			return new IntermediateProvenance();
-		else if (stopLayer.equals(uri))
-			return new Stop();
-		return null;
-	}
-
-	@Override
-	public JsonNode getDispatchLayerConfigurationSchema(URI uri) {
-		// TODO
-		return null;
-	}
-
-	@Override
-	public Set<URI> getDispatchLayerTypes() {
-		return dispatchLayerURIs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
deleted file mode 100644
index dfde240..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-import net.sf.taverna.t2.reference.ErrorDocument;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-/**
- * Receives job events, checks to see whether any parameters in the job are
- * error tokens or collections which contain errors. If so then sends a
- * corresponding result message back where all outputs are error tokens having
- * registered such with the invocation context's data manager. It also re-writes
- * any failure messages as result messages containing error tokens at the
- * appropriate depth - this means that it must be placed above any error
- * handling layers in order for those to have an effect at all. In general this
- * layer should be placed immediately below the parallelize layer in most
- * default cases (this will guarantee the processor never sees a failure message
- * though, which may or may not be desirable)
- * 
- * @author Tom Oinn
- * 
- */
-@DispatchLayerErrorReaction(emits = { RESULT }, relaysUnmodified = false, stateEffects = {
-		CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE })
-@DispatchLayerJobReaction(emits = { RESULT }, relaysUnmodified = true, stateEffects = {
-		CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@SupportsStreamedResult
-public class ErrorBounce extends AbstractDispatchLayer<JsonNode> implements
-		PropertyContributingDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce";
-
-	/**
-	 * Track the number of reflected and translated errors handled by this error
-	 * bounce instance
-	 */
-	private Map<String, ErrorBounceState> state = new ConcurrentHashMap<>();
-
-	private int totalTranslatedErrors = 0;
-	private int totalReflectedErrors = 0;
-
-	private synchronized ErrorBounceState getState(String owningProcess) {
-		if (state.containsKey(owningProcess))
-			return state.get(owningProcess);
-		ErrorBounceState ebs = new ErrorBounceState();
-		state.put(owningProcess, ebs);
-		return ebs;
-	}
-
-	/**
-	 * If the job contains errors, or collections which contain errors
-	 * themselves then bounce a result message with error documents in back up
-	 * to the layer above
-	 */
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-		Set<T2Reference> errorReferences = new HashSet<>();
-		for (T2Reference ei : jobEvent.getData().values())
-			if (ei.containsErrors())
-				errorReferences.add(ei);
-		if (errorReferences.isEmpty())
-			// relay the message down...
-			getBelow().receiveJob(jobEvent);
-		else {
-			getState(jobEvent.getOwningProcess()).incrementErrorsReflected();
-			sendErrorOutput(jobEvent, null, errorReferences);
-		}
-	}
-
-	/**
-	 * Always send the error document job result on receiving a failure, at
-	 * least for now! This should be configurable, in effect this is the part
-	 * that ensures the processor never sees a top level failure.
-	 */
-	@Override
-	public void receiveError(DispatchErrorEvent errorEvent) {
-		getState(errorEvent.getOwningProcess()).incrementErrorsTranslated();
-		sendErrorOutput(errorEvent, errorEvent.getCause(), null);
-	}
-
-	/**
-	 * Construct and send a new result message with error documents in place of
-	 * all outputs at the appropriate depth
-	 * 
-	 * @param event
-	 * @param cause
-	 * @param errorReferences
-	 */
-	private void sendErrorOutput(Event<?> event, Throwable cause,
-			Set<T2Reference> errorReferences) {
-		ReferenceService rs = event.getContext().getReferenceService();
-
-		Processor p = dispatchStack.getProcessor();
-		Map<String, T2Reference> outputDataMap = new HashMap<>();
-		String[] owningProcessArray = event.getOwningProcess().split(":");
-		String processor = owningProcessArray[owningProcessArray.length - 1];
-		for (OutputPort op : p.getOutputPorts()) {
-			String message = "Processor '" + processor + "' - Port '"
-					+ op.getName() + "'";
-			if (event instanceof DispatchErrorEvent)
-				message += ": " + ((DispatchErrorEvent) event).getMessage();
-			ErrorDocument ed;
-			if (cause != null)
-				ed = rs.getErrorDocumentService().registerError(message, cause,
-						op.getDepth(), event.getContext());
-			else
-				ed = rs.getErrorDocumentService().registerError(message,
-						errorReferences, op.getDepth(), event.getContext());
-			outputDataMap.put(op.getName(), ed.getId());
-		}
-		DispatchResultEvent dre = new DispatchResultEvent(
-				event.getOwningProcess(), event.getIndex(), event.getContext(),
-				outputDataMap, false);
-		getAbove().receiveResult(dre);
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-		// Do nothing - no configuration required
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		// Layer has no configuration associated
-		return null;
-	}
-
-	@Override
-	public void finishedWith(final String owningProcess) {
-		/*
-		 * Delay the removal of the state to give the monitor a chance to poll
-		 */
-		cleanupTimer.schedule(new TimerTask() {
-			@Override
-			public void run() {
-				state.remove(owningProcess);
-			}
-		}, CLEANUP_DELAY_MS);
-	}
-
-	/**
-	 * Two properties, dispatch.errorbounce.reflected(integer) is the number of
-	 * incoming jobs which have been bounced back as results with errors,
-	 * dispatch.errorbounce.translated(integer) is the number of failures from
-	 * downstream in the stack that have been re-written as complete results
-	 * containing error documents.
-	 */
-	@Override
-	public void injectPropertiesFor(final String owningProcess) {
-		MonitorableProperty<Integer> errorsReflectedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce", "reflected" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				ErrorBounceState ebs = state.get(owningProcess);
-				if (ebs == null)
-					return 0;
-				return ebs.getErrorsReflected();
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(errorsReflectedProperty,
-				owningProcess);
-
-		MonitorableProperty<Integer> errorsTranslatedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce", "translated" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				ErrorBounceState ebs = state.get(owningProcess);
-				if (ebs == null)
-					return 0;
-				return ebs.getErrorsTranslated();
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(errorsTranslatedProperty,
-				owningProcess);
-
-		MonitorableProperty<Integer> totalTranslatedTranslatedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce",
-						"totalTranslated" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				return totalTranslatedErrors;
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(
-				totalTranslatedTranslatedProperty, owningProcess);
-
-		MonitorableProperty<Integer> totalReflectedTranslatedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce",
-						"totalReflected" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				return totalReflectedErrors;
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(
-				totalReflectedTranslatedProperty, owningProcess);
-	}
-
-	public class ErrorBounceState {
-		private int errorsReflected = 0;
-		private int errorsTranslated = 0;
-
-		/**
-		 * Number of times the bounce layer has converted an incoming job event
-		 * where the input data contained error tokens into a result event
-		 * containing all errors.
-		 */
-		int getErrorsReflected() {
-			return this.errorsReflected;
-		}
-
-		/**
-		 * Number of times the bounce layer has converted an incoming failure
-		 * event into a result containing error tokens
-		 */
-		int getErrorsTranslated() {
-			return errorsTranslated;
-		}
-
-		void incrementErrorsReflected() {
-			synchronized (this) {
-				errorsReflected++;
-			}
-			synchronized (ErrorBounce.this) {
-				totalReflectedErrors++;
-			}
-		}
-
-		void incrementErrorsTranslated() {
-			synchronized (this) {
-				errorsTranslated++;
-			}
-			synchronized (ErrorBounce.this) {
-				totalTranslatedErrors++;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
deleted file mode 100644
index 1c5ef03..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Failure handling dispatch layer, consumes job events with multiple activities
- * and emits the same job but with only the first activity. On failures the job
- * is resent to the layer below with a new activity list containing the second
- * in the original list and so on. If a failure is received and there are no
- * further activities to use the job fails and the failure is sent back up to
- * the layer above.
- * 
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- */
-@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
-		UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
-@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
-public class Failover extends AbstractErrorHandlerLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover";
-
-	@Override
-	protected JobState getStateObject(DispatchJobEvent jobEvent) {
-		return new FailoverState(jobEvent);
-	}
-
-	/**
-	 * Receive a job from the layer above, store it in the state map then relay
-	 * it to the layer below with a modified activity list containing only the
-	 * activity at index 0
-	 */
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-		addJobToStateList(jobEvent);
-		List<Activity<?>> newActivityList = new ArrayList<>();
-		newActivityList.add(jobEvent.getActivities().get(0));
-		getBelow().receiveJob(
-				new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
-						.getIndex(), jobEvent.getContext(), jobEvent.getData(),
-						newActivityList));
-	}
-
-	class FailoverState extends JobState {
-		int currentActivityIndex = 0;
-
-		public FailoverState(DispatchJobEvent jobEvent) {
-			super(jobEvent);
-		}
-
-		@Override
-		public boolean handleError() {
-			currentActivityIndex++;
-			if (currentActivityIndex == jobEvent.getActivities().size())
-				return false;
-			List<Activity<?>> newActivityList = new ArrayList<>();
-			newActivityList.add(jobEvent.getActivities().get(
-					currentActivityIndex));
-			getBelow().receiveJob(
-					new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
-							.getIndex(), jobEvent.getContext(), jobEvent
-							.getData(), newActivityList));
-			return true;
-		}
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-		// Do nothing - there is no configuration to do
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
deleted file mode 100644
index 718079a..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static java.lang.System.currentTimeMillis;
-
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-/**
- * Sits above the {@link Invoke} layer and collects information about the
- * current workflow run to be stored by the {@link ProvenanceConnector}.
- * 
- * @author Ian Dunlop
- * @author Stian Soiland-Reyes
- * 
- */
-public class IntermediateProvenance extends AbstractDispatchLayer<String> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
-	private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
-
-	private ProvenanceReporter reporter;
-	private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
-	private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
-	private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
-
-	// private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
-	// private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
-
-	private WorkflowProvenanceItem workflowItem;
-
-	@Override
-	public void configure(String o) {
-	}
-
-	/**
-	 * A set of provenance events for a particular owning process has been
-	 * finished with so you can remove all the {@link IterationProvenanceItem}s
-	 * from the map
-	 */
-	@Override
-	public void finishedWith(String owningProcess) {
-		processToIndexes.remove(owningProcess);
-	}
-
-	@Override
-	public String getConfiguration() {
-		return null;
-	}
-
-	protected Map<String, IterationProvenanceItem> getIndexesByProcess(
-			String owningProcess) {
-		synchronized (processToIndexes) {
-			Map<String, IterationProvenanceItem> indexes = processToIndexes
-					.get(owningProcess);
-			if (indexes == null) {
-				indexes = new HashMap<>();
-				processToIndexes.put(owningProcess, indexes);
-			}
-			return indexes;
-		}
-	}
-
-	protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
-		String owningProcess = event.getOwningProcess();
-		int[] originalIndex = event.getIndex();
-		int[] index = event.getIndex();
-		String indexStr = indexStr(index);
-		Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
-		IterationProvenanceItem iterationProvenanceItem = null;
-		synchronized (indexes) {
-			// find the iteration item for the int index eg [1]
-			iterationProvenanceItem = indexes.get(indexStr);
-			// if it is null then strip the index and look again
-
-			while (iterationProvenanceItem == null) {
-				try {
-					index = removeLastIndex(index);
-					iterationProvenanceItem = indexes.get(indexStr(index));
-					/*
-					 * if we have a 'parent' iteration then create a new
-					 * iteration for the original index and link it to the
-					 * activity and the input data
-					 * 
-					 * FIXME should this be linked to the parent iteration
-					 * instead?
-					 */
-					if (iterationProvenanceItem != null) {
-						// set the index to the one from the event
-						IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
-						iterationProvenanceItem1.setIteration(originalIndex);
-						iterationProvenanceItem1.setProcessId(owningProcess);
-						iterationProvenanceItem1.setIdentifier(UUID
-								.randomUUID().toString());
-						iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
-						iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
-						iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
-						iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());
-
-//						for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
-//								.entrySet()) {
-//							List<Object> value = entrySet.getValue();
-//							int[] newIndex = (int[]) value.get(0);
-//							String owner = (String) value.get(1);
-//							String indexString = indexStr(newIndex);
-//							String indexString2 = indexStr(index);
-//
-//							if (owningProcess.equalsIgnoreCase(owner)
-//									&& indexString
-//											.equalsIgnoreCase(indexString2))
-//								iterationProvenanceItem1.setParentId(entrySet
-//										.getKey().getIdentifier());
-//						}
-//						for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
-//								.entrySet()) {
-//							List<Object> value = entrySet.getValue();
-//							int[] newIndex = (int[]) value.get(0);
-//							String owner = (String) value.get(1);
-//							String indexString = indexStr(newIndex);
-//							String indexString2 = indexStr(index);
-//							if (owningProcess.equalsIgnoreCase(owner)
-//									&& indexString
-//											.equalsIgnoreCase(indexString2))
-//								iterationProvenanceItem1
-//										.setInputDataItem(entrySet.getKey());
-//						}
-
-						// for (ActivityProvenanceItem item :
-						// activityProvenanceItemList) {
-						// if (owningProcess.equalsIgnoreCase(item
-						// .getProcessId())) {
-						// iterationProvenanceItem1.setParentId(item
-						// .getIdentifier());
-						// }
-						// }
-						// for (InputDataProvenanceItem item :
-						// inputDataProvenanceItemList) {
-						// if (owningProcess.equalsIgnoreCase(item
-						// .getProcessId())) {
-						// iterationProvenanceItem1.setInputDataItem(item);
-						// }
-						// indexes.put(indexStr, iterationProvenanceItem1);
-						// return iterationProvenanceItem1;
-						// // }
-						// }
-
-						// add this new iteration item to the map
-						getIndexesByProcess(event.getOwningProcess()).put(
-								indexStr(event.getIndex()),
-								iterationProvenanceItem1);
-						return iterationProvenanceItem1;
-					}
-					/*
-					 * if we have not found an iteration items and the index is
-					 * [] then something is wrong remove the last index in the
-					 * int array before we go back through the while
-					 */
-				} catch (IllegalStateException e) {
-					logger
-							.warn("Cannot find a parent iteration with index [] for owning process: "
-									+ owningProcess
-									+ "Workflow invocation is in an illegal state");
-					throw e;
-				}
-			}
-
-			// if (iterationProvenanceItem == null) {
-			// logger.info("Iteration item was null for: "
-			// + event.getOwningProcess() + " " + event.getIndex());
-			// System.out.println("Iteration item was null for: "
-			// + event.getOwningProcess() + " " + event.getIndex());
-			// iterationProvenanceItem = new IterationProvenanceItem(index);
-			// iterationProvenanceItem.setProcessId(owningProcess);
-			// iterationProvenanceItem.setIdentifier(UUID.randomUUID()
-			// .toString());
-			// // for (ActivityProvenanceItem
-			// item:activityProvenanceItemList)
-			// // {
-			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
-			// // iterationProvenanceItem.setParentId(item.getIdentifier());
-			// // }
-			// // }
-			// // for (InputDataProvenanceItem item:
-			// // inputDataProvenanceItemList) {
-			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
-			// // iterationProvenanceItem.setInputDataItem(item);
-			// // }
-			// // }
-			// indexes.put(indexStr, iterationProvenanceItem);
-
-		}
-		return iterationProvenanceItem;
-	}
-
-	private String indexStr(int[] index) {
-		StringBuilder indexStr = new StringBuilder();
-		for (int ind : index)
-			indexStr.append(":").append(ind);
-		return indexStr.toString();
-	}
-
-	/**
-	 * Remove the last index of the int array in the form 1:2:3 etc
-	 * 
-	 * @param index
-	 * @return
-	 */
-	@SuppressWarnings("unused")
-	private String[] stripLastIndex(int[] index) {
-		// will be in form :1:2:3
-		return indexStr(index).split(":");
-	}
-
-	/**
-	 * Remove the last value in the int array
-	 * 
-	 * @param index
-	 * @return
-	 */
-	private int[] removeLastIndex(int[] index) {
-		if (index.length == 0)
-			throw new IllegalStateException(
-					"There is no parent iteration of index [] for this result");
-		int[] newIntArray = new int[index.length - 1];
-		for (int i = 0; i < index.length - 1; i++)
-			newIntArray[i] = index[i];
-		return newIntArray;
-	}
-
-	private static String uuid() {
-		return UUID.randomUUID().toString();
-	}
-
-	/**
-	 * Create an {@link ErrorProvenanceItem} and send across to the
-	 * {@link ProvenanceConnector}
-	 */
-	@Override
-	public void receiveError(DispatchErrorEvent errorEvent) {
-		IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
-		// get using errorEvent.getOwningProcess();
-		
-		ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
-		errorItem.setCause(errorEvent
-				.getCause());
-		errorItem.setErrorType(errorEvent
-				.getFailureType().toString());
-		errorItem.setMessage(errorEvent.getMessage());
-		
-		errorItem.setProcessId(errorEvent.getOwningProcess());
-		errorItem.setIdentifier(uuid());
-		errorItem.setParentId(iterationProvItem.getIdentifier());
-		// iterationProvItem.setErrorItem(errorItem);
-		// FIXME don't need to add to the processor item earlier
-		getReporter().addProvenanceItem(errorItem);
-		super.receiveError(errorEvent);
-	}
-
-	/**
-	 * Create the {@link ProvenanceItem}s and send them all across to the
-	 * {@link ProvenanceConnector} except for the
-	 * {@link IterationProvenanceItem}, this one is told what it's inputs are
-	 * but has to wait until the results are received before being sent across.
-	 * Each item has a unique identifier and also knows who its parent item is
-	 */
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-			try {
-			// FIXME do we need this ProcessProvenanceItem?
-			ProcessProvenanceItem provenanceItem;
-			String[] split = jobEvent.getOwningProcess().split(":");
-			provenanceItem = new ProcessProvenanceItem();
-			String parentDataflowId = workflowItem.getParentId();
-			provenanceItem.setWorkflowId(parentDataflowId);
-			provenanceItem.setFacadeID(split[0]);
-			provenanceItem.setDataflowID(split[1]);
-			provenanceItem.setProcessId(jobEvent.getOwningProcess());
-			provenanceItem.setIdentifier(uuid());
-			provenanceItem.setParentId(workflowItem.getIdentifier());
-			ProcessorProvenanceItem processorProvItem;
-			processorProvItem = new ProcessorProvenanceItem();
-			processorProvItem.setWorkflowId(parentDataflowId);
-			processorProvItem.setProcessId(jobEvent
-					.getOwningProcess());
-			processorProvItem.setIdentifier(uuid());
-			processorProvItem.setParentId(provenanceItem.getIdentifier());
-			provenanceItem.setProcessId(jobEvent.getOwningProcess());
-			getReporter().addProvenanceItem(provenanceItem);
-			getReporter().addProvenanceItem(processorProvItem);
-	
-			IterationProvenanceItem iterationProvItem = null;
-			iterationProvItem = new IterationProvenanceItem();
-			iterationProvItem.setWorkflowId(parentDataflowId);
-			iterationProvItem.setIteration(jobEvent.getIndex());
-			iterationProvItem.setIdentifier(uuid());
-			
-			ReferenceService referenceService = jobEvent.getContext()
-					.getReferenceService();
-	
-			InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
-			inputDataItem.setDataMap(jobEvent.getData());
-			inputDataItem.setReferenceService(referenceService);
-			inputDataItem.setIdentifier(uuid());
-			inputDataItem.setParentId(iterationProvItem.getIdentifier());
-			inputDataItem.setProcessId(jobEvent.getOwningProcess());
-	
-			List<Object> inputIndexOwnerList = new ArrayList<>();
-			inputIndexOwnerList.add(jobEvent.getIndex());
-			inputIndexOwnerList.add(jobEvent.getOwningProcess());
-			inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
-	
-			// inputDataProvenanceItemList.add(inputDataItem);
-			iterationProvItem.setInputDataItem(inputDataItem);
-			iterationProvItem.setIteration(jobEvent.getIndex());
-			iterationProvItem.setProcessId(jobEvent.getOwningProcess());
-	
-			for (Activity<?> activity : jobEvent.getActivities())
-				if (activity instanceof AsynchronousActivity) {
-					ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
-					activityProvItem.setWorkflowId(parentDataflowId);
-					activityProvItem.setIdentifier(uuid());
-					iterationProvItem.setParentId(activityProvItem.getIdentifier());
-					// getConnector().addProvenanceItem(iterationProvItem);
-					activityProvItem.setParentId(processorProvItem.getIdentifier());
-					// processorProvItem.setActivityProvenanceItem(activityProvItem);
-					activityProvItem.setProcessId(jobEvent.getOwningProcess());
-					List<Object> activityIndexOwnerList = new ArrayList<>();
-					activityIndexOwnerList.add(jobEvent.getOwningProcess());
-					activityIndexOwnerList.add(jobEvent.getIndex());
-					activityProvenanceItemMap.put(activityProvItem,
-							inputIndexOwnerList);
-					// activityProvenanceItemList.add(activityProvItem);
-					// activityProvItem.setIterationProvenanceItem(iterationProvItem);
-					getReporter().addProvenanceItem(activityProvItem);
-					break;
-				}
-			getIndexesByProcess(jobEvent.getOwningProcess()).put(
-					indexStr(jobEvent.getIndex()), iterationProvItem);
-			iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
-			getReporter().addProvenanceItem(iterationProvItem);
-		} catch (RuntimeException ex) {
-			logger.error("Could not store provenance for " + jobEvent, ex);
-		}
-		
-		super.receiveJob(jobEvent);
-	}
-
-	@Override
-	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
-		super.receiveJobQueue(jobQueueEvent);
-	}
-
-	/**
-	 * Populate an {@link OutputDataProvenanceItem} with the results and attach
-	 * it to the appropriate {@link IterationProvenanceItem}. Then send the
-	 * {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
-	 */
-	@Override
-	public void receiveResult(DispatchResultEvent resultEvent) {
-		try {
-			// FIXME use the connector from the result event context
-			IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
-			iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
-			
-			ReferenceService referenceService = resultEvent.getContext()
-					.getReferenceService();
-
-			OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
-			outputDataItem.setDataMap(resultEvent.getData());
-			outputDataItem.setReferenceService(referenceService);
-			outputDataItem.setIdentifier(uuid());
-			outputDataItem.setProcessId(resultEvent.getOwningProcess());
-			outputDataItem.setParentId(iterationProvItem.getIdentifier());
-			iterationProvItem.setOutputDataItem(outputDataItem);
-			
-			getReporter().addProvenanceItem(iterationProvItem);
-			// getConnector().addProvenanceItem(outputDataItem);
-	
-			// PM -- testing
-			// add xencoding of data value here??
-	//		Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
-	//		for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
-	//			// create a simpler bean that we can serialize?
-	//			
-	//			T2Reference ref = entry.getValue();
-	//			
-	//			SimplerT2Reference t2RefBean = new SimplerT2Reference();
-	//			t2RefBean.setReferenceType(ref.getReferenceType());
-	//			t2RefBean.setDepth(ref.getDepth());
-	//			t2RefBean.setLocalPart(ref.getLocalPart());
-	//			t2RefBean.setNamespacePart(ref.getNamespacePart());
-	//						
-	//			System.out.println("data ref: "+ref);
-	//			String serializedInput = SerializeParam(t2RefBean);
-	//			System.out.println("serialized reference:" + serializedInput);
-	//			
-	//			System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
-//		}
-		} catch (Exception ex) {
-			logger.error("Could not store provenance for "
-					+ resultEvent.getOwningProcess() + " "
-					+ Arrays.toString(resultEvent.getIndex()), ex);
-			// But don't break super.receiveResult() !!
-		}
-		super.receiveResult(resultEvent);
-	}
-
-	@Override
-	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
-		super.receiveResultCompletion(completionEvent);
-	}
-
-	/**
-	 * Tell this layer what {@link ProvenanceConnector} implementation is being
-	 * used to capture the {@link ProvenanceItem}s. NOTE: should probably use
-	 * the connector from the result events context where possible
-	 * 
-	 * @param connector
-	 */
-	public void setReporter(ProvenanceReporter connector) {
-		this.reporter = connector;
-	}
-
-	public ProvenanceReporter getReporter() {
-		return reporter;
-	}
-
-	/**
-	 * So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
-	 * enacted this layer has to know about the {@link WorkflowProvenanceItem}
-	 * 
-	 * @param workflowItem
-	 */
-	public void setWorkflow(WorkflowProvenanceItem workflowItem) {
-		this.workflowItem = workflowItem;
-	}
-
-	// TODO is this unused?
-	public static String SerializeParam(Object ParamValue) {
-		ByteArrayOutputStream BStream = new ByteArrayOutputStream();
-		XMLEncoder encoder = new XMLEncoder(BStream);
-		encoder.writeObject(ParamValue);
-		encoder.close();
-		return BStream.toString();
-	}
-
-	// TODO is this unused?
-	public static Object DeserializeParam(String SerializedParam) {
-		InputStream IStream = new ByteArrayInputStream(
-				SerializedParam.getBytes());
-		XMLDecoder decoder = new XMLDecoder(IStream);
-		Object output = decoder.readObject();
-		decoder.close();
-		return output;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
deleted file mode 100644
index f8403df..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.ERROR;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT_COMPLETION;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.sql.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.ControlBoundary;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.activity.MonitorableAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Context free invoker layer, does not pass index arrays of jobs into activity
- * instances.
- * <p>
- * This layer will invoke the first invokable activity in the activity list, so
- * any sane dispatch stack will have narrowed this down to a single item list by
- * this point, i.e. by the insertion of a failover layer.
- * <p>
- * Currently only handles activities implementing {@link AsynchronousActivity}.
- *
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- *
- */
-@DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {})
-@ControlBoundary
-public class Invoke extends AbstractDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke";
-	private static Logger logger = Logger.getLogger(Invoke.class);
-	private static Long invocationCount = 0L;
-
-	private MonitorManager monMan;
-
-	private static String getNextProcessID() {
-		long count;
-		synchronized (invocationCount) {
-			count = ++invocationCount;
-		}
-		return "invocation" + count;
-	}
-
-	public Invoke() {
-		super();
-		monMan = MonitorManager.getInstance();
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-		// No configuration, do nothing
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return null;
-	}
-
-	/**
-	 * Receive a job from the layer above and pick the first concrete activity
-	 * from the list to invoke. Invoke this activity, creating a callback which
-	 * will wrap up the result messages in the appropriate collection depth
-	 * before sending them on (in general activities are not aware of their
-	 * invocation context and should not be responsible for providing correct
-	 * index arrays for results)
-	 * <p>
-	 * This layer will invoke the first invokable activity in the activity list,
-	 * so any sane dispatch stack will have narrowed this down to a single item
-	 * list by this point, i.e. by the insertion of a failover layer.
-	 */
-	@Override
-	public void receiveJob(final DispatchJobEvent jobEvent) {
-		for (Activity<?> activity : jobEvent.getActivities())
-			if (activity instanceof AsynchronousActivity) {
-				invoke(jobEvent, (AsynchronousActivity<?>) activity);
-				break;
-			}
-	}
-
-	protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
-		// Register with the monitor
-		final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
-				getNextProcessID()).getOwningProcess();
-		monMan.registerNode(activity, invocationProcessIdentifier,
-				new HashSet<MonitorableProperty<?>>());
-		monMan.registerNode(jobEvent, invocationProcessIdentifier,
-				new HashSet<MonitorableProperty<?>>());
-
-		/*
-		 * The activity is an AsynchronousActivity so we invoke it with an
-		 * AsynchronousActivityCallback object containing appropriate callback
-		 * methods to push results, completions and failures back to the
-		 * invocation layer.
-		 * 
-		 * Get the registered DataManager for this process. In most cases this
-		 * will just be a single DataManager for the entire workflow system but
-		 * it never hurts to generalize
-		 */
-
-		InvocationContext context = jobEvent.getContext();
-		final ReferenceService refService = context.getReferenceService();
-
-		InvocationStartedProvenanceItem invocationItem = null;
-		ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
-		if (provenanceReporter != null) {
-			IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
-			if (intermediateProvenance != null) {
-				invocationItem = new InvocationStartedProvenanceItem();
-				IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
-				invocationItem.setIdentifier(UUID.randomUUID().toString());
-				invocationItem.setActivity(activity);
-				invocationItem.setProcessId(jobEvent.getOwningProcess());
-				invocationItem.setInvocationProcessId(invocationProcessIdentifier);
-				invocationItem.setParentId(parentItem.getIdentifier());
-				invocationItem.setWorkflowId(parentItem.getWorkflowId());
-				invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
-				provenanceReporter.addProvenanceItem(invocationItem);
-			}
-		}
-
-		/*
-		 * Create a Map of EntityIdentifiers named appropriately given the
-		 * activity mapping
-		 */
-		Map<String, T2Reference> inputData = new HashMap<>();
-		for (String inputName : jobEvent.getData().keySet()) {
-			String activityInputName = activity
-					.getInputPortMapping().get(inputName);
-			if (activityInputName != null)
-				inputData.put(activityInputName, jobEvent.getData()
-						.get(inputName));
-		}
-
-		/*
-		 * Create a callback object to receive events, completions and failure
-		 * notifications from the activity
-		 */
-		AsynchronousActivityCallback callback = new InvokeCallBack(
-				jobEvent, refService, invocationProcessIdentifier,
-				activity);
-
-		if (activity instanceof MonitorableAsynchronousActivity<?>) {
-			/*
-			 * Monitorable activity so get the monitorable properties and push
-			 * them into the state tree after launching the job
-			 */
-			MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
-			Set<MonitorableProperty<?>> props = maa
-					.executeAsynchWithMonitoring(inputData, callback);
-			monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
-		} else {
-			/*
-			 * Run the job, passing in the callback we've just created along
-			 * with the (possibly renamed) input data map
-			 */
-			activity.executeAsynch(inputData, callback);
-		}
-	}
-
-	protected IntermediateProvenance findIntermediateProvenance() {
-		for (DispatchLayer<?> layer : getProcessor().getDispatchStack()
-				.getLayers())
-			if (layer instanceof IntermediateProvenance)
-				return (IntermediateProvenance) layer;
-		return null;
-	}
-
-	protected class InvokeCallBack implements AsynchronousActivityCallback {
-		protected final AsynchronousActivity<?> activity;
-		protected final String invocationProcessIdentifier;
-		protected final DispatchJobEvent jobEvent;
-		protected final ReferenceService refService;
-		protected boolean sentJob = false;
-
-		protected InvokeCallBack(DispatchJobEvent jobEvent,
-				ReferenceService refService,
-				String invocationProcessIdentifier,
-				AsynchronousActivity<?> asyncActivity) {
-			this.jobEvent = jobEvent;
-			this.refService = refService;
-			this.invocationProcessIdentifier = invocationProcessIdentifier;
-			this.activity = asyncActivity;
-		}
-
-		@Override
-		public void fail(String message) {
-			fail(message, null);
-		}
-
-		@Override
-		public void fail(String message, Throwable t) {
-			fail(message, t, DispatchErrorType.INVOCATION);
-		}
-
-		@Override
-		public void fail(String message, Throwable t,
-				DispatchErrorType errorType) {
-			logger.warn("Failed (" + errorType + ") invoking " + activity
-					+ " for job " + jobEvent + ": " + message, t);
-			monMan.deregisterNode(
-					invocationProcessIdentifier);
-			getAbove().receiveError(
-					new DispatchErrorEvent(jobEvent.getOwningProcess(),
-							jobEvent.getIndex(), jobEvent.getContext(),
-							message, t, errorType, activity));
-		}
-
-		@Override
-		public InvocationContext getContext() {
-			return jobEvent.getContext();
-		}
-
-		@Override
-		public String getParentProcessIdentifier() {
-			return invocationProcessIdentifier;
-		}
-
-		@Override
-		public void receiveCompletion(int[] completionIndex) {
-			if (completionIndex.length == 0)
-				// Final result, clean up monitor state
-				monMan.deregisterNode(invocationProcessIdentifier);
-			if (sentJob) {
-				int[] newIndex;
-				if (completionIndex.length == 0)
-					newIndex = jobEvent.getIndex();
-				else {
-					newIndex = new int[jobEvent.getIndex().length
-							+ completionIndex.length];
-					int i = 0;
-					for (int indexValue : jobEvent.getIndex())
-						newIndex[i++] = indexValue;
-					for (int indexValue : completionIndex)
-						newIndex[i++] = indexValue;
-				}
-				DispatchCompletionEvent c = new DispatchCompletionEvent(
-						jobEvent.getOwningProcess(), newIndex, jobEvent
-								.getContext());
-				getAbove().receiveResultCompletion(c);
-			} else {
-				/*
-				 * We haven't sent any 'real' data prior to completing a stream.
-				 * This in effect means we're sending an empty top level
-				 * collection so we need to register empty collections for each
-				 * output port with appropriate depth (by definition if we're
-				 * streaming all outputs are collection types of some kind)
-				 */
-				Map<String, T2Reference> emptyListMap = new HashMap<>();
-				for (OutputPort op : activity.getOutputPorts()) {
-					String portName = op.getName();
-					int portDepth = op.getDepth();
-					emptyListMap.put(portName, refService.getListService()
-							.registerEmptyList(portDepth, jobEvent.getContext()).getId());
-				}
-				receiveResult(emptyListMap, new int[0]);
-			}
-		}
-
-		@Override
-		public void receiveResult(Map<String, T2Reference> data, int[] index) {
-			/*
-			 * Construct a new result map using the activity mapping (activity
-			 * output name to processor output name)
-			 */
-			Map<String, T2Reference> resultMap = new HashMap<>();
-			for (String outputName : data.keySet()) {
-				String processorOutputName = activity
-						.getOutputPortMapping().get(outputName);
-				if (processorOutputName != null)
-					resultMap.put(processorOutputName, data.get(outputName));
-			}
-			/*
-			 * Construct a new index array if the specified index is non zero
-			 * length, otherwise just use the original job's index array (means
-			 * we're not streaming)
-			 */
-			int[] newIndex;
-			boolean streaming = false;
-			if (index.length == 0)
-				newIndex = jobEvent.getIndex();
-			else {
-				streaming = true;
-				newIndex = new int[jobEvent.getIndex().length + index.length];
-				int i = 0;
-				for (int indexValue : jobEvent.getIndex())
-					newIndex[i++] = indexValue;
-				for (int indexValue : index)
-					newIndex[i++] = indexValue;
-			}
-			DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent
-					.getOwningProcess(), newIndex, jobEvent.getContext(),
-					resultMap, streaming);
-			if (!streaming) {
-				monMan.registerNode(resultEvent, invocationProcessIdentifier,
-						new HashSet<MonitorableProperty<?>>());
-				// Final result, clean up monitor state
-				monMan.deregisterNode(invocationProcessIdentifier);
-			}
-			// Push the modified data to the layer above in the dispatch stack
-			getAbove().receiveResult(resultEvent);
-
-			sentJob = true;
-		}
-
-		@Override
-		public void requestRun(Runnable runMe) {
-			String newThreadName = jobEvent.toString();
-			Thread thread = new Thread(runMe, newThreadName);
-			thread.setContextClassLoader(activity.getClass()
-					.getClassLoader());
-			thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-				@Override
-				public void uncaughtException(Thread t, Throwable e) {
-					fail("Uncaught exception while invoking " + activity, e);
-				}
-			});
-			thread.start();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
deleted file mode 100644
index d5077a4..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2008 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.AbstractDispatchEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-
-/**
- * A layer that allows while-style loops.
- * <p>
- * The layer is configured with a {@link LoopConfiguration}, where an activity
- * has been set as the
- * {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity)
- * condition}.
- * <p>
- * After a job has been successful further down the dispatch stack, the loop
- * layer will invoke the conditional activity to determine if the job will be
- * invoked again. If {@link LoopConfiguration#isRunFirst()} is false, this test
- * will be performed even before the first invocation. (The default
- * runFirst=true is equivalent to a do..while construct, while runFirst=false is
- * equivalent to a while.. construct.)
- * <p>
- * A job will be resent down the dispatch stack only if the conditional activity
- * returns a reference to a string equal to "true" on its output port "loop".
- * <p>
- * If a job or the conditional activity fails, the while-loop is interrupted and
- * the error is sent further up.
- * <p>
- * Note that the LoopLayer will be invoked for each item in an iteration, if you
- * want to do the loop for the whole collection (ie. re-iterating if the
- * loop-condition fails after processing the full list) - create a nested
- * workflow with the desired depths on it's input ports and insert this
- * LoopLayer in the stack of the nested workflow's processor in parent workflow.
- * <p>
- * It is recommended that the LoopLayer is to be inserted after the
- * {@link ErrorBounce} layer, as this layer is needed for registering errors
- * produced by the LoopLayer. If the user requires {@link Retry retries} and
- * {@link Failover failovers} before checking the while condition, such layers
- * should be below LoopLayer.
- *
- * @author Stian Soiland-Reyes
- */
-// FIXME Doesn't work
-@SuppressWarnings({"unchecked","rawtypes"})
-public class Loop extends AbstractDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop";
-	private static Logger logger = Logger.getLogger(Loop.class);
-
-	private JsonNode config = JsonNodeFactory.instance.objectNode();
-
-	protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>();
-	protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>();
-
-	@Override
-	public void configure(JsonNode config) {
-		this.config = config;
-	}
-
-	@Override
-	public void finishedWith(String owningProcess) {
-		String prefix = owningProcess + "[";
-		synchronized (outgoingJobs) {
-			for (String key : new ArrayList<>(outgoingJobs.keySet()))
-				if (key.startsWith(prefix))
-					outgoingJobs.remove(key);
-		}
-		synchronized (incomingJobs) {
-			for (String key : new ArrayList<>(incomingJobs.keySet()))
-				if (key.startsWith(prefix))
-					incomingJobs.remove(key);
-		}
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return config;
-	}
-
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-		synchronized (incomingJobs) {
-			incomingJobs.put(jobIdentifier(jobEvent), jobEvent);
-		}
-		if (config.get("runFirst").asBoolean()) {
-			// We'll do the conditional in receiveResult instead
-			super.receiveJob(jobEvent);
-			return;
-		}
-		checkCondition(jobEvent);
-	}
-
-	@Override
-	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
-		synchronized (incomingJobs) {
-			incomingJobs.put(jobIdentifier(jobQueueEvent), jobQueueEvent);
-		}
-		if (config.get("runFirst").asBoolean()) {
-			// We'll do the conditional in receiveResult instead
-			super.receiveJobQueue(jobQueueEvent);
-			return;
-		}
-		checkCondition(jobQueueEvent);
-	}
-	
-	private Activity<?> getCondition() {
-		//return config.getCondition();
-		return null;
-	}
-
-	@Override
-	public void receiveResult(DispatchResultEvent resultEvent) {
-		Activity<?> condition = getCondition();
-		if (condition == null) {
-			super.receiveResult(resultEvent);
-			return;
-		}
-		synchronized (outgoingJobs) {
-			outgoingJobs.put(jobIdentifier(resultEvent), resultEvent);
-		}
-		checkCondition(resultEvent);
-	}
-
-	@Override
-	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
-		Activity<?> condition = getCondition();
-		if (condition == null) {
-			super.receiveResultCompletion(completionEvent);
-			return;
-		}
-		synchronized (outgoingJobs) {
-			outgoingJobs.put(jobIdentifier(completionEvent), completionEvent);
-		}
-		checkCondition(completionEvent);
-	}
-
-	private void checkCondition(AbstractDispatchEvent event) {
-		Activity<?> condition = getCondition();
-		if (condition == null) {
-			super.receiveError(new DispatchErrorEvent(event.getOwningProcess(),
-					event.getIndex(), event.getContext(),
-					"Can't invoke condition service: null", null,
-					DispatchErrorType.INVOCATION, condition));
-			return;
-		}
-		if (!(condition instanceof AbstractAsynchronousActivity)) {
-			DispatchErrorEvent errorEvent = new DispatchErrorEvent(
-					event.getOwningProcess(),
-					event.getIndex(),
-					event.getContext(),
-					"Can't invoke condition service "
-							+ condition
-							+ " is not an instance of AbstractAsynchronousActivity",
-					null, DispatchErrorType.INVOCATION, condition);
-			super.receiveError(errorEvent);
-			return;
-		}
-		AbstractAsynchronousActivity asyncCondition = (AbstractAsynchronousActivity) condition;
-		String jobIdentifier = jobIdentifier(event);
-		Map<String, T2Reference> inputs = prepareInputs(asyncCondition,
-				jobIdentifier);
-		AsynchronousActivityCallback callback = new ConditionCallBack(
-				jobIdentifier);
-		asyncCondition.executeAsynch(inputs, callback);
-	}
-
-	private Map<String, T2Reference> prepareInputs(
-			AbstractAsynchronousActivity asyncCondition, String jobIdentifier) {
-		Map<String, T2Reference> inputs = new HashMap<>();
-		Map<String, T2Reference> inData = getInData(jobIdentifier);
-		Map<String, T2Reference> outData = getOutData(jobIdentifier);
-
-		Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts();
-		for (ActivityInputPort conditionIn : inputPorts) {
-			String conditionPort = conditionIn.getName();
-			if (outData.containsKey(conditionPort))
-				// Copy from previous output
-				inputs.put(conditionPort, outData.get(conditionPort));
-			else if (inData.containsKey(conditionPort))
-				// Copy from original input
-				inputs.put(conditionPort, inData.get(conditionPort));
-		}
-		return inputs;
-	}
-
-	private Map<String, T2Reference> getInData(String jobIdentifier) {
-		AbstractDispatchEvent inEvent;
-		synchronized (incomingJobs) {
-			inEvent = incomingJobs.get(jobIdentifier);
-		}
-		Map<String, T2Reference> inData = new HashMap<>();
-		if (inEvent instanceof DispatchJobEvent)
-			inData = ((DispatchJobEvent) inEvent).getData();
-		return inData;
-	}
-
-	private Map<String, T2Reference> getOutData(String jobIdentifier) {
-		AbstractDispatchEvent outEvent;
-		synchronized (outgoingJobs) {
-			outEvent = outgoingJobs.get(jobIdentifier);
-		}
-		Map<String, T2Reference> outData = new HashMap<>();
-		if (outEvent instanceof DispatchResultEvent)
-			outData = ((DispatchResultEvent) outEvent).getData();
-		return outData;
-	}
-
-	private String jobIdentifier(AbstractDispatchEvent event) {
-		String jobId = event.getOwningProcess()
-				+ Arrays.toString(event.getIndex());
-		return jobId;
-	}
-
-	public static final String LOOP_PORT = "loop";
-
-	public class ConditionCallBack implements AsynchronousActivityCallback {
-		private InvocationContext context;
-		private final String jobIdentifier;
-		private String processId;
-
-		public ConditionCallBack(String jobIdentifier) {
-			this.jobIdentifier = jobIdentifier;
-			AbstractDispatchEvent originalEvent;
-			synchronized (incomingJobs) {
-				originalEvent = incomingJobs.get(jobIdentifier);
-			}
-			context = originalEvent.getContext();
-			processId = originalEvent.getOwningProcess() + ":condition";
-		}
-
-		@Override
-		public void fail(String message) {
-			fail(message, null, DispatchErrorType.INVOCATION);
-		}
-
-		@Override
-		public void fail(String message, Throwable t) {
-			fail(message, t, DispatchErrorType.INVOCATION);
-		}
-
-		@Override
-		public void fail(String message, Throwable t,
-				DispatchErrorType errorType) {
-			logger.warn("Failed (" + errorType + ") invoking condition service "
-					+ jobIdentifier + ":" + message, t);
-
-			AbstractDispatchEvent originalEvent;
-			synchronized (incomingJobs) {
-				originalEvent = incomingJobs.get(jobIdentifier);
-			}
-			receiveError(new DispatchErrorEvent(originalEvent
-					.getOwningProcess(), originalEvent.getIndex(),
-					originalEvent.getContext(),
-					"Can't invoke condition service ", t,
-					DispatchErrorType.INVOCATION, null));
-		}
-
-		@Override
-		public InvocationContext getContext() {
-			return context;
-		}
-
-		@Override
-		public String getParentProcessIdentifier() {
-			return processId;
-		}
-
-		@Override
-		public void receiveCompletion(int[] completionIndex) {
-			// Ignore streaming
-		}
-
-		@Override
-		public void receiveResult(Map<String, T2Reference> data, int[] index) {
-			if (index.length > 0) {
-				// Ignore streaming
-				return;
-			}
-			T2Reference loopRef = data.get(LOOP_PORT);
-			if (loopRef == null) {
-				fail("Condition service didn't contain output port " + LOOP_PORT);
-				return;
-			}
-			if (loopRef.containsErrors()) {
-				fail("Condition service failed: " + loopRef);
-				return;
-			}
-			if (loopRef.getDepth() != 0) {
-				fail("Condition service output " + LOOP_PORT
-						+ " depth is not 0, but " + loopRef.getDepth());
-			}
-			ReferenceService referenceService = context.getReferenceService();
-			String loop = (String) referenceService.renderIdentifier(loopRef,
-					String.class, context);
-
-			if (Boolean.parseBoolean(loop)) {
-				// Push it down again
-				AbstractDispatchEvent dispatchEvent;
-				synchronized (incomingJobs) {
-					dispatchEvent = incomingJobs.get(jobIdentifier);
-				}
-				if (dispatchEvent == null) {
-					fail("Unknown job identifier " + jobIdentifier);
-				}
-				if (dispatchEvent instanceof DispatchJobEvent) {
-					DispatchJobEvent newJobEvent = prepareNewJobEvent(data,
-							dispatchEvent);
-					getBelow().receiveJob(newJobEvent);
-				} else if (dispatchEvent instanceof DispatchJobQueueEvent) {
-					getBelow().receiveJobQueue(
-							(DispatchJobQueueEvent) dispatchEvent);
-				} else {
-					fail("Unknown type of incoming event " + dispatchEvent);
-				}
-				return;
-
-			} else {
-				// We'll push it up, end of loop for now
-
-				AbstractDispatchEvent outgoingEvent;
-				synchronized (outgoingJobs) {
-					outgoingEvent = outgoingJobs.get(jobIdentifier);
-				}
-				if (outgoingEvent == null && !config.get("runFirst").asBoolean()) {
-					fail("Initial loop condition failed");
-				}
-				if (outgoingEvent instanceof DispatchCompletionEvent) {
-					getAbove().receiveResultCompletion(
-							(DispatchCompletionEvent) outgoingEvent);
-				} else if (outgoingEvent instanceof DispatchResultEvent) {
-					getAbove().receiveResult(
-							(DispatchResultEvent) outgoingEvent);
-				} else {
-					fail("Unknown type of outgoing event " + outgoingEvent);
-				}
-			}
-
-		}
-
-		private DispatchJobEvent prepareNewJobEvent(
-				Map<String, T2Reference> data,
-				AbstractDispatchEvent dispatchEvent) {
-			DispatchJobEvent dispatchJobEvent = (DispatchJobEvent) dispatchEvent;
-			Map<String, T2Reference> newInputs = new HashMap<String, T2Reference>(
-					dispatchJobEvent.getData());
-			newInputs.putAll(data);
-			DispatchJobEvent newJobEvent = new DispatchJobEvent(dispatchEvent
-					.getOwningProcess(), dispatchEvent.getIndex(),
-					dispatchEvent.getContext(), newInputs,
-					((DispatchJobEvent) dispatchEvent).getActivities());
-			/*
-			 * TODO: Should this be registered as an incomingJobs? If so the
-			 * conditional could even feed to itself, and we should also keep a
-			 * list of originalJobs.
-			 */
-			return newJobEvent;
-		}
-
-		@Override
-		public void requestRun(Runnable runMe) {
-			String newThreadName = "Condition service "
-					+ getParentProcessIdentifier();
-			Thread thread = new Thread(runMe, newThreadName);
-			thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-				@Override
-				public void uncaughtException(Thread t, Throwable e) {
-					fail("Uncaught exception while invoking " + jobIdentifier,
-							e);
-				}
-			});
-			thread.start();
-		}
-	}
-
-	@Override
-	public Processor getProcessor() {
-		if (dispatchStack == null)
-			return null;
-		return dispatchStack.getProcessor();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
deleted file mode 100644
index 7cfa2a5..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.util.Properties;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-/**
- * Configuration bean for the {@link Loop}.
- * <p>
- * Set the {@link #setCondition(Activity)} for an activity with an output port
- * called "loop". The LoopLayer will re-send a job only if this port exist and
- * it's output can be dereferenced to a string equal to "true".
- * </p>
- * <p>
- * If {@link #isRunFirst()} is false, the loop layer will check the condition
- * before invoking the job for the first time, otherwise the condition will be
- * invoked after the job has come back with successful results.
- * </p>
- * 
- * @author Stian Soiland-Reyes
- * 
- */
-@ConfigurationBean(uri = Loop.URI + "#Config")
-public class LoopConfiguration implements Cloneable {
-	private Activity<?> condition = null;
-	private Boolean runFirst;
-	private Properties properties;
-
-	public Properties getProperties() {
-		synchronized (this) {
-			if (properties == null)
-				properties = new Properties();
-		}
-		return properties;
-	}
-
-	public void setProperties(Properties properties) {
-		this.properties = properties;
-	}
-
-	@Override
-	public LoopConfiguration clone() {
-		LoopConfiguration clone;
-		try {
-			clone = (LoopConfiguration) super.clone();
-			clone.condition = null;
-		} catch (CloneNotSupportedException e) {
-			throw new RuntimeException("Unexpected CloneNotSupportedException",
-					e);
-		}
-		return clone;
-	}
-
-	public Activity<?> getCondition() {
-		return condition;
-	}
-
-	public boolean isRunFirst() {
-		if (runFirst == null)
-			return true;
-		return runFirst;
-	}
-
-	@ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required = false)
-	public void setCondition(Activity<?> activity) {
-		this.condition = activity;
-	}
-
-	@ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required = false)
-	public void setRunFirst(boolean runFirst) {
-		this.runFirst = runFirst;
-	}
-}


[6/6] incubator-taverna-engine git commit: updated scufl2 dependencies

Posted by st...@apache.org.
updated scufl2 dependencies


Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/commit/315a829c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/tree/315a829c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/diff/315a829c

Branch: refs/heads/master
Commit: 315a829c3d7740e51726d2477273196338dca58e
Parents: 4252fa9
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Fri Feb 27 15:26:24 2015 +0000
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Fri Feb 27 15:26:24 2015 +0000

----------------------------------------------------------------------
 taverna-platform-integration-tests/pom.xml | 17 +----------------
 taverna-prov/pom.xml                       |  4 +++-
 taverna-report-api/pom.xml                 |  2 +-
 3 files changed, 5 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/315a829c/taverna-platform-integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-platform-integration-tests/pom.xml b/taverna-platform-integration-tests/pom.xml
index a958d1c..e99c469 100644
--- a/taverna-platform-integration-tests/pom.xml
+++ b/taverna-platform-integration-tests/pom.xml
@@ -362,7 +362,7 @@
 		</dependency>
 		<dependency>
 			<groupId>org.apache.taverna.language</groupId>
-			<artifactId>taverna-scufl2-rdfxml</artifactId>
+			<artifactId>taverna-scufl2-wfbundle</artifactId>
 			<version>${taverna.language.version}</version>
 		</dependency>
 		<dependency>
@@ -370,21 +370,6 @@
 			<artifactId>taverna-scufl2-ucfpackage</artifactId>
 			<version>${taverna.language.version}</version>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.taverna.language</groupId>
-			<artifactId>taverna-scufl2-validation</artifactId>
-			<version>${taverna.language.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.taverna.language</groupId>
-			<artifactId>taverna-scufl2-validation-correctness</artifactId>
-			<version>${taverna.language.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.taverna.language</groupId>
-			<artifactId>taverna-scufl2-validation-structural</artifactId>
-			<version>${taverna.language.version}</version>
-		</dependency>
 
 		<!-- Taverna CommandLine Tool -->
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/315a829c/taverna-prov/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-prov/pom.xml b/taverna-prov/pom.xml
index 2eebd56..79e38ff 100644
--- a/taverna-prov/pom.xml
+++ b/taverna-prov/pom.xml
@@ -40,14 +40,16 @@
         </dependency>
         <dependency>
             <groupId>org.apache.taverna.language</groupId>
-            <artifactId>taverna-scufl2-rdfxml</artifactId>
+            <artifactId>taverna-scufl2-wfbundle</artifactId>
             <version>${taverna.language.version}</version>
         </dependency>
+        <!--
         <dependency>
             <groupId>org.apache.taverna.language</groupId>
             <artifactId>taverna-scufl2-wfdesc</artifactId>
             <version>${taverna.language.version}</version>
         </dependency>
+        -->
 
         <dependency>
             <groupId>org.apache.taverna.language</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/315a829c/taverna-report-api/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-report-api/pom.xml b/taverna-report-api/pom.xml
index 159f06f..117cf86 100644
--- a/taverna-report-api/pom.xml
+++ b/taverna-report-api/pom.xml
@@ -16,7 +16,7 @@
 		</dependency>
 		<dependency>
 			<groupId>org.apache.taverna.language</groupId>
-			<artifactId>taverna-scufl2-rdfxml</artifactId>
+			<artifactId>taverna-scufl2-wfbundle</artifactId>
 			<version>${taverna.language.version}</version>
 		</dependency>
 		<dependency>


[3/6] incubator-taverna-engine git commit: taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
new file mode 100644
index 0000000..718079a
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
@@ -0,0 +1,508 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester   
+ * 
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ * 
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *    
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *    
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static java.lang.System.currentTimeMillis;
+
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import net.sf.taverna.t2.invocation.Event;
+import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
+import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
+import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
+import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ProvenanceItem;
+import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
+import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.workflowmodel.Dataflow;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Sits above the {@link Invoke} layer and collects information about the
+ * current workflow run to be stored by the {@link ProvenanceConnector}.
+ * 
+ * @author Ian Dunlop
+ * @author Stian Soiland-Reyes
+ * 
+ */
+public class IntermediateProvenance extends AbstractDispatchLayer<String> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
+	private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
+
+	private ProvenanceReporter reporter;
+	private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
+	private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
+	private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
+
+	// private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
+	// private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
+
+	private WorkflowProvenanceItem workflowItem;
+
+	@Override
+	public void configure(String o) {
+	}
+
+	/**
+	 * A set of provenance events for a particular owning process has been
+	 * finished with so you can remove all the {@link IterationProvenanceItem}s
+	 * from the map
+	 */
+	@Override
+	public void finishedWith(String owningProcess) {
+		processToIndexes.remove(owningProcess);
+	}
+
+	@Override
+	public String getConfiguration() {
+		return null;
+	}
+
+	protected Map<String, IterationProvenanceItem> getIndexesByProcess(
+			String owningProcess) {
+		synchronized (processToIndexes) {
+			Map<String, IterationProvenanceItem> indexes = processToIndexes
+					.get(owningProcess);
+			if (indexes == null) {
+				indexes = new HashMap<>();
+				processToIndexes.put(owningProcess, indexes);
+			}
+			return indexes;
+		}
+	}
+
+	protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
+		String owningProcess = event.getOwningProcess();
+		int[] originalIndex = event.getIndex();
+		int[] index = event.getIndex();
+		String indexStr = indexStr(index);
+		Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
+		IterationProvenanceItem iterationProvenanceItem = null;
+		synchronized (indexes) {
+			// find the iteration item for the int index eg [1]
+			iterationProvenanceItem = indexes.get(indexStr);
+			// if it is null then strip the index and look again
+
+			while (iterationProvenanceItem == null) {
+				try {
+					index = removeLastIndex(index);
+					iterationProvenanceItem = indexes.get(indexStr(index));
+					/*
+					 * if we have a 'parent' iteration then create a new
+					 * iteration for the original index and link it to the
+					 * activity and the input data
+					 * 
+					 * FIXME should this be linked to the parent iteration
+					 * instead?
+					 */
+					if (iterationProvenanceItem != null) {
+						// set the index to the one from the event
+						IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
+						iterationProvenanceItem1.setIteration(originalIndex);
+						iterationProvenanceItem1.setProcessId(owningProcess);
+						iterationProvenanceItem1.setIdentifier(UUID
+								.randomUUID().toString());
+						iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
+						iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
+						iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
+						iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());
+
+//						for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
+//								.entrySet()) {
+//							List<Object> value = entrySet.getValue();
+//							int[] newIndex = (int[]) value.get(0);
+//							String owner = (String) value.get(1);
+//							String indexString = indexStr(newIndex);
+//							String indexString2 = indexStr(index);
+//
+//							if (owningProcess.equalsIgnoreCase(owner)
+//									&& indexString
+//											.equalsIgnoreCase(indexString2))
+//								iterationProvenanceItem1.setParentId(entrySet
+//										.getKey().getIdentifier());
+//						}
+//						for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
+//								.entrySet()) {
+//							List<Object> value = entrySet.getValue();
+//							int[] newIndex = (int[]) value.get(0);
+//							String owner = (String) value.get(1);
+//							String indexString = indexStr(newIndex);
+//							String indexString2 = indexStr(index);
+//							if (owningProcess.equalsIgnoreCase(owner)
+//									&& indexString
+//											.equalsIgnoreCase(indexString2))
+//								iterationProvenanceItem1
+//										.setInputDataItem(entrySet.getKey());
+//						}
+
+						// for (ActivityProvenanceItem item :
+						// activityProvenanceItemList) {
+						// if (owningProcess.equalsIgnoreCase(item
+						// .getProcessId())) {
+						// iterationProvenanceItem1.setParentId(item
+						// .getIdentifier());
+						// }
+						// }
+						// for (InputDataProvenanceItem item :
+						// inputDataProvenanceItemList) {
+						// if (owningProcess.equalsIgnoreCase(item
+						// .getProcessId())) {
+						// iterationProvenanceItem1.setInputDataItem(item);
+						// }
+						// indexes.put(indexStr, iterationProvenanceItem1);
+						// return iterationProvenanceItem1;
+						// // }
+						// }
+
+						// add this new iteration item to the map
+						getIndexesByProcess(event.getOwningProcess()).put(
+								indexStr(event.getIndex()),
+								iterationProvenanceItem1);
+						return iterationProvenanceItem1;
+					}
+					/*
+					 * if we have not found an iteration items and the index is
+					 * [] then something is wrong remove the last index in the
+					 * int array before we go back through the while
+					 */
+				} catch (IllegalStateException e) {
+					logger
+							.warn("Cannot find a parent iteration with index [] for owning process: "
+									+ owningProcess
+									+ "Workflow invocation is in an illegal state");
+					throw e;
+				}
+			}
+
+			// if (iterationProvenanceItem == null) {
+			// logger.info("Iteration item was null for: "
+			// + event.getOwningProcess() + " " + event.getIndex());
+			// System.out.println("Iteration item was null for: "
+			// + event.getOwningProcess() + " " + event.getIndex());
+			// iterationProvenanceItem = new IterationProvenanceItem(index);
+			// iterationProvenanceItem.setProcessId(owningProcess);
+			// iterationProvenanceItem.setIdentifier(UUID.randomUUID()
+			// .toString());
+			// // for (ActivityProvenanceItem
+			// item:activityProvenanceItemList)
+			// // {
+			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
+			// // iterationProvenanceItem.setParentId(item.getIdentifier());
+			// // }
+			// // }
+			// // for (InputDataProvenanceItem item:
+			// // inputDataProvenanceItemList) {
+			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
+			// // iterationProvenanceItem.setInputDataItem(item);
+			// // }
+			// // }
+			// indexes.put(indexStr, iterationProvenanceItem);
+
+		}
+		return iterationProvenanceItem;
+	}
+
+	private String indexStr(int[] index) {
+		StringBuilder indexStr = new StringBuilder();
+		for (int ind : index)
+			indexStr.append(":").append(ind);
+		return indexStr.toString();
+	}
+
+	/**
+	 * Remove the last index of the int array in the form 1:2:3 etc
+	 * 
+	 * @param index
+	 * @return
+	 */
+	@SuppressWarnings("unused")
+	private String[] stripLastIndex(int[] index) {
+		// will be in form :1:2:3
+		return indexStr(index).split(":");
+	}
+
+	/**
+	 * Remove the last value in the int array
+	 * 
+	 * @param index
+	 * @return
+	 */
+	private int[] removeLastIndex(int[] index) {
+		if (index.length == 0)
+			throw new IllegalStateException(
+					"There is no parent iteration of index [] for this result");
+		int[] newIntArray = new int[index.length - 1];
+		for (int i = 0; i < index.length - 1; i++)
+			newIntArray[i] = index[i];
+		return newIntArray;
+	}
+
+	private static String uuid() {
+		return UUID.randomUUID().toString();
+	}
+
+	/**
+	 * Create an {@link ErrorProvenanceItem} and send across to the
+	 * {@link ProvenanceConnector}
+	 */
+	@Override
+	public void receiveError(DispatchErrorEvent errorEvent) {
+		IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
+		// get using errorEvent.getOwningProcess();
+		
+		ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
+		errorItem.setCause(errorEvent
+				.getCause());
+		errorItem.setErrorType(errorEvent
+				.getFailureType().toString());
+		errorItem.setMessage(errorEvent.getMessage());
+		
+		errorItem.setProcessId(errorEvent.getOwningProcess());
+		errorItem.setIdentifier(uuid());
+		errorItem.setParentId(iterationProvItem.getIdentifier());
+		// iterationProvItem.setErrorItem(errorItem);
+		// FIXME don't need to add to the processor item earlier
+		getReporter().addProvenanceItem(errorItem);
+		super.receiveError(errorEvent);
+	}
+
+	/**
+	 * Create the {@link ProvenanceItem}s and send them all across to the
+	 * {@link ProvenanceConnector} except for the
+	 * {@link IterationProvenanceItem}, this one is told what it's inputs are
+	 * but has to wait until the results are received before being sent across.
+	 * Each item has a unique identifier and also knows who its parent item is
+	 */
+	@Override
+	public void receiveJob(DispatchJobEvent jobEvent) {
+			try {
+			// FIXME do we need this ProcessProvenanceItem?
+			ProcessProvenanceItem provenanceItem;
+			String[] split = jobEvent.getOwningProcess().split(":");
+			provenanceItem = new ProcessProvenanceItem();
+			String parentDataflowId = workflowItem.getParentId();
+			provenanceItem.setWorkflowId(parentDataflowId);
+			provenanceItem.setFacadeID(split[0]);
+			provenanceItem.setDataflowID(split[1]);
+			provenanceItem.setProcessId(jobEvent.getOwningProcess());
+			provenanceItem.setIdentifier(uuid());
+			provenanceItem.setParentId(workflowItem.getIdentifier());
+			ProcessorProvenanceItem processorProvItem;
+			processorProvItem = new ProcessorProvenanceItem();
+			processorProvItem.setWorkflowId(parentDataflowId);
+			processorProvItem.setProcessId(jobEvent
+					.getOwningProcess());
+			processorProvItem.setIdentifier(uuid());
+			processorProvItem.setParentId(provenanceItem.getIdentifier());
+			provenanceItem.setProcessId(jobEvent.getOwningProcess());
+			getReporter().addProvenanceItem(provenanceItem);
+			getReporter().addProvenanceItem(processorProvItem);
+	
+			IterationProvenanceItem iterationProvItem = null;
+			iterationProvItem = new IterationProvenanceItem();
+			iterationProvItem.setWorkflowId(parentDataflowId);
+			iterationProvItem.setIteration(jobEvent.getIndex());
+			iterationProvItem.setIdentifier(uuid());
+			
+			ReferenceService referenceService = jobEvent.getContext()
+					.getReferenceService();
+	
+			InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
+			inputDataItem.setDataMap(jobEvent.getData());
+			inputDataItem.setReferenceService(referenceService);
+			inputDataItem.setIdentifier(uuid());
+			inputDataItem.setParentId(iterationProvItem.getIdentifier());
+			inputDataItem.setProcessId(jobEvent.getOwningProcess());
+	
+			List<Object> inputIndexOwnerList = new ArrayList<>();
+			inputIndexOwnerList.add(jobEvent.getIndex());
+			inputIndexOwnerList.add(jobEvent.getOwningProcess());
+			inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
+	
+			// inputDataProvenanceItemList.add(inputDataItem);
+			iterationProvItem.setInputDataItem(inputDataItem);
+			iterationProvItem.setIteration(jobEvent.getIndex());
+			iterationProvItem.setProcessId(jobEvent.getOwningProcess());
+	
+			for (Activity<?> activity : jobEvent.getActivities())
+				if (activity instanceof AsynchronousActivity) {
+					ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
+					activityProvItem.setWorkflowId(parentDataflowId);
+					activityProvItem.setIdentifier(uuid());
+					iterationProvItem.setParentId(activityProvItem.getIdentifier());
+					// getConnector().addProvenanceItem(iterationProvItem);
+					activityProvItem.setParentId(processorProvItem.getIdentifier());
+					// processorProvItem.setActivityProvenanceItem(activityProvItem);
+					activityProvItem.setProcessId(jobEvent.getOwningProcess());
+					List<Object> activityIndexOwnerList = new ArrayList<>();
+					activityIndexOwnerList.add(jobEvent.getOwningProcess());
+					activityIndexOwnerList.add(jobEvent.getIndex());
+					activityProvenanceItemMap.put(activityProvItem,
+							inputIndexOwnerList);
+					// activityProvenanceItemList.add(activityProvItem);
+					// activityProvItem.setIterationProvenanceItem(iterationProvItem);
+					getReporter().addProvenanceItem(activityProvItem);
+					break;
+				}
+			getIndexesByProcess(jobEvent.getOwningProcess()).put(
+					indexStr(jobEvent.getIndex()), iterationProvItem);
+			iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
+			getReporter().addProvenanceItem(iterationProvItem);
+		} catch (RuntimeException ex) {
+			logger.error("Could not store provenance for " + jobEvent, ex);
+		}
+		
+		super.receiveJob(jobEvent);
+	}
+
+	@Override
+	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+		super.receiveJobQueue(jobQueueEvent);
+	}
+
+	/**
+	 * Populate an {@link OutputDataProvenanceItem} with the results and attach
+	 * it to the appropriate {@link IterationProvenanceItem}. Then send the
+	 * {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
+	 */
+	@Override
+	public void receiveResult(DispatchResultEvent resultEvent) {
+		try {
+			// FIXME use the connector from the result event context
+			IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
+			iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
+			
+			ReferenceService referenceService = resultEvent.getContext()
+					.getReferenceService();
+
+			OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
+			outputDataItem.setDataMap(resultEvent.getData());
+			outputDataItem.setReferenceService(referenceService);
+			outputDataItem.setIdentifier(uuid());
+			outputDataItem.setProcessId(resultEvent.getOwningProcess());
+			outputDataItem.setParentId(iterationProvItem.getIdentifier());
+			iterationProvItem.setOutputDataItem(outputDataItem);
+			
+			getReporter().addProvenanceItem(iterationProvItem);
+			// getConnector().addProvenanceItem(outputDataItem);
+	
+			// PM -- testing
+			// add xencoding of data value here??
+	//		Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
+	//		for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
+	//			// create a simpler bean that we can serialize?
+	//			
+	//			T2Reference ref = entry.getValue();
+	//			
+	//			SimplerT2Reference t2RefBean = new SimplerT2Reference();
+	//			t2RefBean.setReferenceType(ref.getReferenceType());
+	//			t2RefBean.setDepth(ref.getDepth());
+	//			t2RefBean.setLocalPart(ref.getLocalPart());
+	//			t2RefBean.setNamespacePart(ref.getNamespacePart());
+	//						
+	//			System.out.println("data ref: "+ref);
+	//			String serializedInput = SerializeParam(t2RefBean);
+	//			System.out.println("serialized reference:" + serializedInput);
+	//			
+	//			System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
+//		}
+		} catch (Exception ex) {
+			logger.error("Could not store provenance for "
+					+ resultEvent.getOwningProcess() + " "
+					+ Arrays.toString(resultEvent.getIndex()), ex);
+			// But don't break super.receiveResult() !!
+		}
+		super.receiveResult(resultEvent);
+	}
+
+	@Override
+	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
+		super.receiveResultCompletion(completionEvent);
+	}
+
+	/**
+	 * Tell this layer what {@link ProvenanceConnector} implementation is being
+	 * used to capture the {@link ProvenanceItem}s. NOTE: should probably use
+	 * the connector from the result events context where possible
+	 * 
+	 * @param connector
+	 */
+	public void setReporter(ProvenanceReporter connector) {
+		this.reporter = connector;
+	}
+
+	public ProvenanceReporter getReporter() {
+		return reporter;
+	}
+
+	/**
+	 * So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
+	 * enacted this layer has to know about the {@link WorkflowProvenanceItem}
+	 * 
+	 * @param workflowItem
+	 */
+	public void setWorkflow(WorkflowProvenanceItem workflowItem) {
+		this.workflowItem = workflowItem;
+	}
+
+	// TODO is this unused?
+	public static String SerializeParam(Object ParamValue) {
+		ByteArrayOutputStream BStream = new ByteArrayOutputStream();
+		XMLEncoder encoder = new XMLEncoder(BStream);
+		encoder.writeObject(ParamValue);
+		encoder.close();
+		return BStream.toString();
+	}
+
+	// TODO is this unused?
+	public static Object DeserializeParam(String SerializedParam) {
+		InputStream IStream = new ByteArrayInputStream(
+				SerializedParam.getBytes());
+		XMLDecoder decoder = new XMLDecoder(IStream);
+		Object output = decoder.readObject();
+		decoder.close();
+		return output;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
new file mode 100644
index 0000000..f8403df
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
@@ -0,0 +1,369 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.ERROR;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT_COMPLETION;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.sql.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import net.sf.taverna.t2.invocation.InvocationContext;
+import net.sf.taverna.t2.monitor.MonitorManager;
+import net.sf.taverna.t2.monitor.MonitorableProperty;
+import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
+import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
+import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.reference.T2Reference;
+import net.sf.taverna.t2.workflowmodel.ControlBoundary;
+import net.sf.taverna.t2.workflowmodel.OutputPort;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
+import net.sf.taverna.t2.workflowmodel.processor.activity.MonitorableAsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Context free invoker layer, does not pass index arrays of jobs into activity
+ * instances.
+ * <p>
+ * This layer will invoke the first invokable activity in the activity list, so
+ * any sane dispatch stack will have narrowed this down to a single item list by
+ * this point, i.e. by the insertion of a failover layer.
+ * <p>
+ * Currently only handles activities implementing {@link AsynchronousActivity}.
+ *
+ * @author Tom Oinn
+ * @author Stian Soiland-Reyes
+ *
+ */
+@DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {})
+@ControlBoundary
+public class Invoke extends AbstractDispatchLayer<JsonNode> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke";
+	private static Logger logger = Logger.getLogger(Invoke.class);
+	private static Long invocationCount = 0L;
+
+	private MonitorManager monMan;
+
+	private static String getNextProcessID() {
+		long count;
+		synchronized (invocationCount) {
+			count = ++invocationCount;
+		}
+		return "invocation" + count;
+	}
+
+	public Invoke() {
+		super();
+		monMan = MonitorManager.getInstance();
+	}
+
+	@Override
+	public void configure(JsonNode config) {
+		// No configuration, do nothing
+	}
+
+	@Override
+	public JsonNode getConfiguration() {
+		return null;
+	}
+
+	/**
+	 * Receive a job from the layer above and pick the first concrete activity
+	 * from the list to invoke. Invoke this activity, creating a callback which
+	 * will wrap up the result messages in the appropriate collection depth
+	 * before sending them on (in general activities are not aware of their
+	 * invocation context and should not be responsible for providing correct
+	 * index arrays for results)
+	 * <p>
+	 * This layer will invoke the first invokable activity in the activity list,
+	 * so any sane dispatch stack will have narrowed this down to a single item
+	 * list by this point, i.e. by the insertion of a failover layer.
+	 */
+	@Override
+	public void receiveJob(final DispatchJobEvent jobEvent) {
+		for (Activity<?> activity : jobEvent.getActivities())
+			if (activity instanceof AsynchronousActivity) {
+				invoke(jobEvent, (AsynchronousActivity<?>) activity);
+				break;
+			}
+	}
+
+	protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
+		// Register with the monitor
+		final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
+				getNextProcessID()).getOwningProcess();
+		monMan.registerNode(activity, invocationProcessIdentifier,
+				new HashSet<MonitorableProperty<?>>());
+		monMan.registerNode(jobEvent, invocationProcessIdentifier,
+				new HashSet<MonitorableProperty<?>>());
+
+		/*
+		 * The activity is an AsynchronousActivity so we invoke it with an
+		 * AsynchronousActivityCallback object containing appropriate callback
+		 * methods to push results, completions and failures back to the
+		 * invocation layer.
+		 * 
+		 * Get the registered DataManager for this process. In most cases this
+		 * will just be a single DataManager for the entire workflow system but
+		 * it never hurts to generalize
+		 */
+
+		InvocationContext context = jobEvent.getContext();
+		final ReferenceService refService = context.getReferenceService();
+
+		InvocationStartedProvenanceItem invocationItem = null;
+		ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
+		if (provenanceReporter != null) {
+			IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
+			if (intermediateProvenance != null) {
+				invocationItem = new InvocationStartedProvenanceItem();
+				IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
+				invocationItem.setIdentifier(UUID.randomUUID().toString());
+				invocationItem.setActivity(activity);
+				invocationItem.setProcessId(jobEvent.getOwningProcess());
+				invocationItem.setInvocationProcessId(invocationProcessIdentifier);
+				invocationItem.setParentId(parentItem.getIdentifier());
+				invocationItem.setWorkflowId(parentItem.getWorkflowId());
+				invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
+				provenanceReporter.addProvenanceItem(invocationItem);
+			}
+		}
+
+		/*
+		 * Create a Map of EntityIdentifiers named appropriately given the
+		 * activity mapping
+		 */
+		Map<String, T2Reference> inputData = new HashMap<>();
+		for (String inputName : jobEvent.getData().keySet()) {
+			String activityInputName = activity
+					.getInputPortMapping().get(inputName);
+			if (activityInputName != null)
+				inputData.put(activityInputName, jobEvent.getData()
+						.get(inputName));
+		}
+
+		/*
+		 * Create a callback object to receive events, completions and failure
+		 * notifications from the activity
+		 */
+		AsynchronousActivityCallback callback = new InvokeCallBack(
+				jobEvent, refService, invocationProcessIdentifier,
+				activity);
+
+		if (activity instanceof MonitorableAsynchronousActivity<?>) {
+			/*
+			 * Monitorable activity so get the monitorable properties and push
+			 * them into the state tree after launching the job
+			 */
+			MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
+			Set<MonitorableProperty<?>> props = maa
+					.executeAsynchWithMonitoring(inputData, callback);
+			monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
+		} else {
+			/*
+			 * Run the job, passing in the callback we've just created along
+			 * with the (possibly renamed) input data map
+			 */
+			activity.executeAsynch(inputData, callback);
+		}
+	}
+
+	protected IntermediateProvenance findIntermediateProvenance() {
+		for (DispatchLayer<?> layer : getProcessor().getDispatchStack()
+				.getLayers())
+			if (layer instanceof IntermediateProvenance)
+				return (IntermediateProvenance) layer;
+		return null;
+	}
+
+	protected class InvokeCallBack implements AsynchronousActivityCallback {
+		protected final AsynchronousActivity<?> activity;
+		protected final String invocationProcessIdentifier;
+		protected final DispatchJobEvent jobEvent;
+		protected final ReferenceService refService;
+		protected boolean sentJob = false;
+
+		protected InvokeCallBack(DispatchJobEvent jobEvent,
+				ReferenceService refService,
+				String invocationProcessIdentifier,
+				AsynchronousActivity<?> asyncActivity) {
+			this.jobEvent = jobEvent;
+			this.refService = refService;
+			this.invocationProcessIdentifier = invocationProcessIdentifier;
+			this.activity = asyncActivity;
+		}
+
+		@Override
+		public void fail(String message) {
+			fail(message, null);
+		}
+
+		@Override
+		public void fail(String message, Throwable t) {
+			fail(message, t, DispatchErrorType.INVOCATION);
+		}
+
+		@Override
+		public void fail(String message, Throwable t,
+				DispatchErrorType errorType) {
+			logger.warn("Failed (" + errorType + ") invoking " + activity
+					+ " for job " + jobEvent + ": " + message, t);
+			monMan.deregisterNode(
+					invocationProcessIdentifier);
+			getAbove().receiveError(
+					new DispatchErrorEvent(jobEvent.getOwningProcess(),
+							jobEvent.getIndex(), jobEvent.getContext(),
+							message, t, errorType, activity));
+		}
+
+		@Override
+		public InvocationContext getContext() {
+			return jobEvent.getContext();
+		}
+
+		@Override
+		public String getParentProcessIdentifier() {
+			return invocationProcessIdentifier;
+		}
+
+		@Override
+		public void receiveCompletion(int[] completionIndex) {
+			if (completionIndex.length == 0)
+				// Final result, clean up monitor state
+				monMan.deregisterNode(invocationProcessIdentifier);
+			if (sentJob) {
+				int[] newIndex;
+				if (completionIndex.length == 0)
+					newIndex = jobEvent.getIndex();
+				else {
+					newIndex = new int[jobEvent.getIndex().length
+							+ completionIndex.length];
+					int i = 0;
+					for (int indexValue : jobEvent.getIndex())
+						newIndex[i++] = indexValue;
+					for (int indexValue : completionIndex)
+						newIndex[i++] = indexValue;
+				}
+				DispatchCompletionEvent c = new DispatchCompletionEvent(
+						jobEvent.getOwningProcess(), newIndex, jobEvent
+								.getContext());
+				getAbove().receiveResultCompletion(c);
+			} else {
+				/*
+				 * We haven't sent any 'real' data prior to completing a stream.
+				 * This in effect means we're sending an empty top level
+				 * collection so we need to register empty collections for each
+				 * output port with appropriate depth (by definition if we're
+				 * streaming all outputs are collection types of some kind)
+				 */
+				Map<String, T2Reference> emptyListMap = new HashMap<>();
+				for (OutputPort op : activity.getOutputPorts()) {
+					String portName = op.getName();
+					int portDepth = op.getDepth();
+					emptyListMap.put(portName, refService.getListService()
+							.registerEmptyList(portDepth, jobEvent.getContext()).getId());
+				}
+				receiveResult(emptyListMap, new int[0]);
+			}
+		}
+
+		@Override
+		public void receiveResult(Map<String, T2Reference> data, int[] index) {
+			/*
+			 * Construct a new result map using the activity mapping (activity
+			 * output name to processor output name)
+			 */
+			Map<String, T2Reference> resultMap = new HashMap<>();
+			for (String outputName : data.keySet()) {
+				String processorOutputName = activity
+						.getOutputPortMapping().get(outputName);
+				if (processorOutputName != null)
+					resultMap.put(processorOutputName, data.get(outputName));
+			}
+			/*
+			 * Construct a new index array if the specified index is non zero
+			 * length, otherwise just use the original job's index array (means
+			 * we're not streaming)
+			 */
+			int[] newIndex;
+			boolean streaming = false;
+			if (index.length == 0)
+				newIndex = jobEvent.getIndex();
+			else {
+				streaming = true;
+				newIndex = new int[jobEvent.getIndex().length + index.length];
+				int i = 0;
+				for (int indexValue : jobEvent.getIndex())
+					newIndex[i++] = indexValue;
+				for (int indexValue : index)
+					newIndex[i++] = indexValue;
+			}
+			DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent
+					.getOwningProcess(), newIndex, jobEvent.getContext(),
+					resultMap, streaming);
+			if (!streaming) {
+				monMan.registerNode(resultEvent, invocationProcessIdentifier,
+						new HashSet<MonitorableProperty<?>>());
+				// Final result, clean up monitor state
+				monMan.deregisterNode(invocationProcessIdentifier);
+			}
+			// Push the modified data to the layer above in the dispatch stack
+			getAbove().receiveResult(resultEvent);
+
+			sentJob = true;
+		}
+
+		@Override
+		public void requestRun(Runnable runMe) {
+			String newThreadName = jobEvent.toString();
+			Thread thread = new Thread(runMe, newThreadName);
+			thread.setContextClassLoader(activity.getClass()
+					.getClassLoader());
+			thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+				@Override
+				public void uncaughtException(Thread t, Throwable e) {
+					fail("Uncaught exception while invoking " + activity, e);
+				}
+			});
+			thread.start();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
new file mode 100644
index 0000000..d5077a4
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
@@ -0,0 +1,424 @@
+/*******************************************************************************
+ * Copyright (C) 2008 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import net.sf.taverna.t2.invocation.InvocationContext;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.reference.T2Reference;
+import net.sf.taverna.t2.workflowmodel.Processor;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.AbstractDispatchEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+
+/**
+ * A layer that allows while-style loops.
+ * <p>
+ * The layer is configured with a {@link LoopConfiguration}, where an activity
+ * has been set as the
+ * {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity)
+ * condition}.
+ * <p>
+ * After a job has been successful further down the dispatch stack, the loop
+ * layer will invoke the conditional activity to determine if the job will be
+ * invoked again. If {@link LoopConfiguration#isRunFirst()} is false, this test
+ * will be performed even before the first invocation. (The default
+ * runFirst=true is equivalent to a do..while construct, while runFirst=false is
+ * equivalent to a while.. construct.)
+ * <p>
+ * A job will be resent down the dispatch stack only if the conditional activity
+ * returns a reference to a string equal to "true" on its output port "loop".
+ * <p>
+ * If a job or the conditional activity fails, the while-loop is interrupted and
+ * the error is sent further up.
+ * <p>
+ * Note that the LoopLayer will be invoked for each item in an iteration, if you
+ * want to do the loop for the whole collection (ie. re-iterating if the
+ * loop-condition fails after processing the full list) - create a nested
+ * workflow with the desired depths on it's input ports and insert this
+ * LoopLayer in the stack of the nested workflow's processor in parent workflow.
+ * <p>
+ * It is recommended that the LoopLayer is to be inserted after the
+ * {@link ErrorBounce} layer, as this layer is needed for registering errors
+ * produced by the LoopLayer. If the user requires {@link Retry retries} and
+ * {@link Failover failovers} before checking the while condition, such layers
+ * should be below LoopLayer.
+ *
+ * @author Stian Soiland-Reyes
+ */
+// FIXME Doesn't work
+@SuppressWarnings({"unchecked","rawtypes"})
+public class Loop extends AbstractDispatchLayer<JsonNode> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop";
+	private static Logger logger = Logger.getLogger(Loop.class);
+
+	private JsonNode config = JsonNodeFactory.instance.objectNode();
+
+	protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>();
+	protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>();
+
+	@Override
+	public void configure(JsonNode config) {
+		this.config = config;
+	}
+
+	@Override
+	public void finishedWith(String owningProcess) {
+		String prefix = owningProcess + "[";
+		synchronized (outgoingJobs) {
+			for (String key : new ArrayList<>(outgoingJobs.keySet()))
+				if (key.startsWith(prefix))
+					outgoingJobs.remove(key);
+		}
+		synchronized (incomingJobs) {
+			for (String key : new ArrayList<>(incomingJobs.keySet()))
+				if (key.startsWith(prefix))
+					incomingJobs.remove(key);
+		}
+	}
+
+	@Override
+	public JsonNode getConfiguration() {
+		return config;
+	}
+
+	@Override
+	public void receiveJob(DispatchJobEvent jobEvent) {
+		synchronized (incomingJobs) {
+			incomingJobs.put(jobIdentifier(jobEvent), jobEvent);
+		}
+		if (config.get("runFirst").asBoolean()) {
+			// We'll do the conditional in receiveResult instead
+			super.receiveJob(jobEvent);
+			return;
+		}
+		checkCondition(jobEvent);
+	}
+
+	@Override
+	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+		synchronized (incomingJobs) {
+			incomingJobs.put(jobIdentifier(jobQueueEvent), jobQueueEvent);
+		}
+		if (config.get("runFirst").asBoolean()) {
+			// We'll do the conditional in receiveResult instead
+			super.receiveJobQueue(jobQueueEvent);
+			return;
+		}
+		checkCondition(jobQueueEvent);
+	}
+	
+	private Activity<?> getCondition() {
+		//return config.getCondition();
+		return null;
+	}
+
+	@Override
+	public void receiveResult(DispatchResultEvent resultEvent) {
+		Activity<?> condition = getCondition();
+		if (condition == null) {
+			super.receiveResult(resultEvent);
+			return;
+		}
+		synchronized (outgoingJobs) {
+			outgoingJobs.put(jobIdentifier(resultEvent), resultEvent);
+		}
+		checkCondition(resultEvent);
+	}
+
+	@Override
+	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
+		Activity<?> condition = getCondition();
+		if (condition == null) {
+			super.receiveResultCompletion(completionEvent);
+			return;
+		}
+		synchronized (outgoingJobs) {
+			outgoingJobs.put(jobIdentifier(completionEvent), completionEvent);
+		}
+		checkCondition(completionEvent);
+	}
+
+	private void checkCondition(AbstractDispatchEvent event) {
+		Activity<?> condition = getCondition();
+		if (condition == null) {
+			super.receiveError(new DispatchErrorEvent(event.getOwningProcess(),
+					event.getIndex(), event.getContext(),
+					"Can't invoke condition service: null", null,
+					DispatchErrorType.INVOCATION, condition));
+			return;
+		}
+		if (!(condition instanceof AbstractAsynchronousActivity)) {
+			DispatchErrorEvent errorEvent = new DispatchErrorEvent(
+					event.getOwningProcess(),
+					event.getIndex(),
+					event.getContext(),
+					"Can't invoke condition service "
+							+ condition
+							+ " is not an instance of AbstractAsynchronousActivity",
+					null, DispatchErrorType.INVOCATION, condition);
+			super.receiveError(errorEvent);
+			return;
+		}
+		AbstractAsynchronousActivity asyncCondition = (AbstractAsynchronousActivity) condition;
+		String jobIdentifier = jobIdentifier(event);
+		Map<String, T2Reference> inputs = prepareInputs(asyncCondition,
+				jobIdentifier);
+		AsynchronousActivityCallback callback = new ConditionCallBack(
+				jobIdentifier);
+		asyncCondition.executeAsynch(inputs, callback);
+	}
+
+	private Map<String, T2Reference> prepareInputs(
+			AbstractAsynchronousActivity asyncCondition, String jobIdentifier) {
+		Map<String, T2Reference> inputs = new HashMap<>();
+		Map<String, T2Reference> inData = getInData(jobIdentifier);
+		Map<String, T2Reference> outData = getOutData(jobIdentifier);
+
+		Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts();
+		for (ActivityInputPort conditionIn : inputPorts) {
+			String conditionPort = conditionIn.getName();
+			if (outData.containsKey(conditionPort))
+				// Copy from previous output
+				inputs.put(conditionPort, outData.get(conditionPort));
+			else if (inData.containsKey(conditionPort))
+				// Copy from original input
+				inputs.put(conditionPort, inData.get(conditionPort));
+		}
+		return inputs;
+	}
+
+	private Map<String, T2Reference> getInData(String jobIdentifier) {
+		AbstractDispatchEvent inEvent;
+		synchronized (incomingJobs) {
+			inEvent = incomingJobs.get(jobIdentifier);
+		}
+		Map<String, T2Reference> inData = new HashMap<>();
+		if (inEvent instanceof DispatchJobEvent)
+			inData = ((DispatchJobEvent) inEvent).getData();
+		return inData;
+	}
+
+	private Map<String, T2Reference> getOutData(String jobIdentifier) {
+		AbstractDispatchEvent outEvent;
+		synchronized (outgoingJobs) {
+			outEvent = outgoingJobs.get(jobIdentifier);
+		}
+		Map<String, T2Reference> outData = new HashMap<>();
+		if (outEvent instanceof DispatchResultEvent)
+			outData = ((DispatchResultEvent) outEvent).getData();
+		return outData;
+	}
+
+	private String jobIdentifier(AbstractDispatchEvent event) {
+		String jobId = event.getOwningProcess()
+				+ Arrays.toString(event.getIndex());
+		return jobId;
+	}
+
+	public static final String LOOP_PORT = "loop";
+
+	public class ConditionCallBack implements AsynchronousActivityCallback {
+		private InvocationContext context;
+		private final String jobIdentifier;
+		private String processId;
+
+		public ConditionCallBack(String jobIdentifier) {
+			this.jobIdentifier = jobIdentifier;
+			AbstractDispatchEvent originalEvent;
+			synchronized (incomingJobs) {
+				originalEvent = incomingJobs.get(jobIdentifier);
+			}
+			context = originalEvent.getContext();
+			processId = originalEvent.getOwningProcess() + ":condition";
+		}
+
+		@Override
+		public void fail(String message) {
+			fail(message, null, DispatchErrorType.INVOCATION);
+		}
+
+		@Override
+		public void fail(String message, Throwable t) {
+			fail(message, t, DispatchErrorType.INVOCATION);
+		}
+
+		@Override
+		public void fail(String message, Throwable t,
+				DispatchErrorType errorType) {
+			logger.warn("Failed (" + errorType + ") invoking condition service "
+					+ jobIdentifier + ":" + message, t);
+
+			AbstractDispatchEvent originalEvent;
+			synchronized (incomingJobs) {
+				originalEvent = incomingJobs.get(jobIdentifier);
+			}
+			receiveError(new DispatchErrorEvent(originalEvent
+					.getOwningProcess(), originalEvent.getIndex(),
+					originalEvent.getContext(),
+					"Can't invoke condition service ", t,
+					DispatchErrorType.INVOCATION, null));
+		}
+
+		@Override
+		public InvocationContext getContext() {
+			return context;
+		}
+
+		@Override
+		public String getParentProcessIdentifier() {
+			return processId;
+		}
+
+		@Override
+		public void receiveCompletion(int[] completionIndex) {
+			// Ignore streaming
+		}
+
+		@Override
+		public void receiveResult(Map<String, T2Reference> data, int[] index) {
+			if (index.length > 0) {
+				// Ignore streaming
+				return;
+			}
+			T2Reference loopRef = data.get(LOOP_PORT);
+			if (loopRef == null) {
+				fail("Condition service didn't contain output port " + LOOP_PORT);
+				return;
+			}
+			if (loopRef.containsErrors()) {
+				fail("Condition service failed: " + loopRef);
+				return;
+			}
+			if (loopRef.getDepth() != 0) {
+				fail("Condition service output " + LOOP_PORT
+						+ " depth is not 0, but " + loopRef.getDepth());
+			}
+			ReferenceService referenceService = context.getReferenceService();
+			String loop = (String) referenceService.renderIdentifier(loopRef,
+					String.class, context);
+
+			if (Boolean.parseBoolean(loop)) {
+				// Push it down again
+				AbstractDispatchEvent dispatchEvent;
+				synchronized (incomingJobs) {
+					dispatchEvent = incomingJobs.get(jobIdentifier);
+				}
+				if (dispatchEvent == null) {
+					fail("Unknown job identifier " + jobIdentifier);
+				}
+				if (dispatchEvent instanceof DispatchJobEvent) {
+					DispatchJobEvent newJobEvent = prepareNewJobEvent(data,
+							dispatchEvent);
+					getBelow().receiveJob(newJobEvent);
+				} else if (dispatchEvent instanceof DispatchJobQueueEvent) {
+					getBelow().receiveJobQueue(
+							(DispatchJobQueueEvent) dispatchEvent);
+				} else {
+					fail("Unknown type of incoming event " + dispatchEvent);
+				}
+				return;
+
+			} else {
+				// We'll push it up, end of loop for now
+
+				AbstractDispatchEvent outgoingEvent;
+				synchronized (outgoingJobs) {
+					outgoingEvent = outgoingJobs.get(jobIdentifier);
+				}
+				if (outgoingEvent == null && !config.get("runFirst").asBoolean()) {
+					fail("Initial loop condition failed");
+				}
+				if (outgoingEvent instanceof DispatchCompletionEvent) {
+					getAbove().receiveResultCompletion(
+							(DispatchCompletionEvent) outgoingEvent);
+				} else if (outgoingEvent instanceof DispatchResultEvent) {
+					getAbove().receiveResult(
+							(DispatchResultEvent) outgoingEvent);
+				} else {
+					fail("Unknown type of outgoing event " + outgoingEvent);
+				}
+			}
+
+		}
+
+		private DispatchJobEvent prepareNewJobEvent(
+				Map<String, T2Reference> data,
+				AbstractDispatchEvent dispatchEvent) {
+			DispatchJobEvent dispatchJobEvent = (DispatchJobEvent) dispatchEvent;
+			Map<String, T2Reference> newInputs = new HashMap<String, T2Reference>(
+					dispatchJobEvent.getData());
+			newInputs.putAll(data);
+			DispatchJobEvent newJobEvent = new DispatchJobEvent(dispatchEvent
+					.getOwningProcess(), dispatchEvent.getIndex(),
+					dispatchEvent.getContext(), newInputs,
+					((DispatchJobEvent) dispatchEvent).getActivities());
+			/*
+			 * TODO: Should this be registered as an incomingJobs? If so the
+			 * conditional could even feed to itself, and we should also keep a
+			 * list of originalJobs.
+			 */
+			return newJobEvent;
+		}
+
+		@Override
+		public void requestRun(Runnable runMe) {
+			String newThreadName = "Condition service "
+					+ getParentProcessIdentifier();
+			Thread thread = new Thread(runMe, newThreadName);
+			thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+				@Override
+				public void uncaughtException(Thread t, Throwable e) {
+					fail("Uncaught exception while invoking " + jobIdentifier,
+							e);
+				}
+			});
+			thread.start();
+		}
+	}
+
+	@Override
+	public Processor getProcessor() {
+		if (dispatchStack == null)
+			return null;
+		return dispatchStack.getProcessor();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
new file mode 100644
index 0000000..7cfa2a5
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
@@ -0,0 +1,75 @@
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.util.Properties;
+
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
+
+/**
+ * Configuration bean for the {@link Loop}.
+ * <p>
+ * Set the {@link #setCondition(Activity)} for an activity with an output port
+ * called "loop". The LoopLayer will re-send a job only if this port exist and
+ * it's output can be dereferenced to a string equal to "true".
+ * </p>
+ * <p>
+ * If {@link #isRunFirst()} is false, the loop layer will check the condition
+ * before invoking the job for the first time, otherwise the condition will be
+ * invoked after the job has come back with successful results.
+ * </p>
+ * 
+ * @author Stian Soiland-Reyes
+ * 
+ */
+@ConfigurationBean(uri = Loop.URI + "#Config")
+public class LoopConfiguration implements Cloneable {
+	private Activity<?> condition = null;
+	private Boolean runFirst;
+	private Properties properties;
+
+	public Properties getProperties() {
+		synchronized (this) {
+			if (properties == null)
+				properties = new Properties();
+		}
+		return properties;
+	}
+
+	public void setProperties(Properties properties) {
+		this.properties = properties;
+	}
+
+	@Override
+	public LoopConfiguration clone() {
+		LoopConfiguration clone;
+		try {
+			clone = (LoopConfiguration) super.clone();
+			clone.condition = null;
+		} catch (CloneNotSupportedException e) {
+			throw new RuntimeException("Unexpected CloneNotSupportedException",
+					e);
+		}
+		return clone;
+	}
+
+	public Activity<?> getCondition() {
+		return condition;
+	}
+
+	public boolean isRunFirst() {
+		if (runFirst == null)
+			return true;
+		return runFirst;
+	}
+
+	@ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required = false)
+	public void setCondition(Activity<?> activity) {
+		this.condition = activity;
+	}
+
+	@ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required = false)
+	public void setRunFirst(boolean runFirst) {
+		this.runFirst = runFirst;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
new file mode 100644
index 0000000..bd0e69a
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
@@ -0,0 +1,463 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import net.sf.taverna.t2.invocation.Completion;
+import net.sf.taverna.t2.invocation.IterationInternalEvent;
+import net.sf.taverna.t2.monitor.MonitorManager;
+import net.sf.taverna.t2.monitor.MonitorableProperty;
+import net.sf.taverna.t2.monitor.NoSuchPropertyException;
+import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.NotifiableLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobQueueReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Dispatch layer which consumes a queue of events and fires off a fixed number
+ * of simultaneous jobs to the layer below. It observes failure, data and
+ * completion events coming up and uses these to determine when to push more
+ * jobs downwards into the stack as well as when it can safely emit completion
+ * events from the queue.
+ *
+ * @author Tom Oinn
+ *
+ */
+@DispatchLayerErrorReaction(emits = {}, relaysUnmodified = true, stateEffects = {
+		REMOVE_PROCESS_STATE, NO_EFFECT })
+@DispatchLayerJobQueueReaction(emits = { JOB }, relaysUnmodified = false, stateEffects = { CREATE_PROCESS_STATE })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {
+		REMOVE_PROCESS_STATE, NO_EFFECT })
+@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {
+		REMOVE_PROCESS_STATE, NO_EFFECT })
+@SupportsStreamedResult
+public class Parallelize extends AbstractDispatchLayer<JsonNode>
+		implements NotifiableLayer,
+		PropertyContributingDispatchLayer<JsonNode> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Parallelize";
+	private static Logger logger = Logger.getLogger(Parallelize.class);
+
+	private Map<String, StateModel> stateMap = new HashMap<>();
+	private JsonNode config = JsonNodeFactory.instance.objectNode();
+	int sentJobsCount = 0;
+	int completedJobsCount = 0;
+
+	public Parallelize() {
+		super();
+	}
+
+	/**
+	 * Test constructor, only used by unit tests, should probably not be public
+	 * access here?
+	 *
+	 * @param maxJobs
+	 */
+	public Parallelize(int maxJobs) {
+		super();
+		((ObjectNode)config).put("maxJobs", maxJobs);
+	}
+
+	@Override
+	public void eventAdded(String owningProcess) {
+		StateModel stateModel;
+		synchronized (stateMap) {
+			stateModel = stateMap.get(owningProcess);
+		}
+		if (stateModel == null)
+			/*
+			 * Should never see this here, it means we've had duplicate
+			 * completion events from upstream
+			 */
+			throw new WorkflowStructureException(
+					"Unknown owning process " + owningProcess);
+		synchronized (stateModel) {
+			stateModel.fillFromQueue();
+		}
+	}
+
+	@Override
+	public void receiveJobQueue(DispatchJobQueueEvent queueEvent) {
+		StateModel model = new StateModel(queueEvent,
+				config.has("maxJobs") ? config.get("maxJobs").intValue() : 1);
+		synchronized (stateMap) {
+			stateMap.put(queueEvent.getOwningProcess(), model);
+		}
+		model.fillFromQueue();
+	}
+
+	public void receiveJob(Job job, List<? extends Activity<?>> activities) {
+		throw new WorkflowStructureException(
+				"Parallelize layer cannot handle job events");
+	}
+
+	@Override
+	public void receiveError(DispatchErrorEvent errorEvent) {
+		StateModel model;
+		String owningProcess = errorEvent.getOwningProcess();
+		synchronized(stateMap) {
+			model = stateMap.get(owningProcess);
+		}
+		if (model == null) {
+			logger.warn("Error received for unknown owning process: " + owningProcess);
+			return;
+		}
+		model.finishWith(errorEvent.getIndex());
+		getAbove().receiveError(errorEvent);
+	}
+
+	@Override
+	public void receiveResult(DispatchResultEvent resultEvent) {
+		StateModel model;
+		String owningProcess = resultEvent.getOwningProcess();
+		synchronized(stateMap) {
+			model = stateMap.get(owningProcess);
+		}
+		if (model == null) {
+			logger.warn("Error received for unknown owning process: " + owningProcess);
+			return;
+		}
+		if (!resultEvent.isStreamingEvent()) {
+			MonitorManager.getInstance().registerNode(resultEvent,
+					owningProcess,
+					new HashSet<MonitorableProperty<?>>());
+		}
+		model.finishWith(resultEvent.getIndex());
+		getAbove().receiveResult(resultEvent);
+	}
+
+	/**
+	 * Only going to receive this if the activity invocation was streaming, in
+	 * which case we need to handle all completion events and pass them up the
+	 * stack.
+	 */
+	@Override
+	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
+		StateModel model;
+		String owningProcess = completionEvent.getOwningProcess();
+		synchronized(stateMap) {
+			model = stateMap.get(owningProcess);
+		}
+		if (model == null) {
+			logger.warn("Error received for unknown owning process: " + owningProcess);
+			return;
+		}
+		model.finishWith(completionEvent.getIndex());
+		getAbove().receiveResultCompletion(completionEvent);
+	}
+
+	@Override
+	public void finishedWith(final String owningProcess) {
+		// Delay the removal of the state to give the monitor a chance to poll
+		cleanupTimer.schedule(new TimerTask() {
+			@Override
+			public void run() {
+				synchronized(stateMap) {
+					stateMap.remove(owningProcess);
+				}
+			}
+		}, CLEANUP_DELAY_MS);
+	}
+
+	@Override
+	public void configure(JsonNode config) {
+		this.config = config;
+	}
+
+	@Override
+	public JsonNode getConfiguration() {
+		return this.config;
+	}
+
+	/**
+	 * Injects the following properties into its parent processor's property set:
+	 * <ul>
+	 * <li><code>dispatch.parallelize.queuesize [Integer]</code><br/>The current
+	 * size of the incomming job queue, or -1 if the state isn't defined for the
+	 * registered process identifier (which will be the case if the process
+	 * hasn't started or has had its state purged after a final completion of
+	 * some kind.</li>
+	 * </ul>
+	 */
+	@Override
+	public void injectPropertiesFor(final String owningProcess) {
+		/**
+		 * Property for the queue depth, will evaluate to -1 if there isn't a
+		 * queue in the state model for this identifier (which will be the case
+		 * if we haven't created the state yet or the queue has been collected)
+		 */
+		MonitorableProperty<Integer> queueSizeProperty = new MonitorableProperty<Integer>() {
+			@Override
+			public Date getLastModified() {
+				return new Date();
+			}
+
+			@Override
+			public String[] getName() {
+				return new String[] { "dispatch", "parallelize", "queuesize" };
+			}
+
+			@Override
+			public Integer getValue() throws NoSuchPropertyException {
+				StateModel model;
+				synchronized(stateMap) {
+					model = stateMap.get(owningProcess);
+				}
+				if (model == null)
+					return -1;
+				return model.queueSize();
+			}
+		};
+		dispatchStack.receiveMonitorableProperty(queueSizeProperty,
+				owningProcess);
+
+		MonitorableProperty<Integer> sentJobsProperty = new MonitorableProperty<Integer>() {
+			@Override
+			public Date getLastModified() {
+				return new Date();
+			}
+
+			@Override
+			public String[] getName() {
+				return new String[] { "dispatch", "parallelize", "sentjobs" };
+			}
+
+			@Override
+			public Integer getValue() throws NoSuchPropertyException {
+				return sentJobsCount;
+			}
+		};
+		dispatchStack.receiveMonitorableProperty(sentJobsProperty,
+				owningProcess);
+
+		MonitorableProperty<Integer> completedJobsProperty = new MonitorableProperty<Integer>() {
+			@Override
+			public Date getLastModified() {
+				return new Date();
+			}
+
+			@Override
+			public String[] getName() {
+				return new String[] { "dispatch", "parallelize",
+						"completedjobs" };
+			}
+
+			@Override
+			public Integer getValue() throws NoSuchPropertyException {
+				return completedJobsCount;
+			}
+		};
+		dispatchStack.receiveMonitorableProperty(completedJobsProperty,
+				owningProcess);
+	}
+
+	/**
+	 * Holds the state for a given owning process
+	 *
+	 * @author Tom Oinn
+	 *
+	 */
+	// suppressed to avoid jdk1.5 error messages caused by the declaration
+	// IterationInternalEvent<? extends IterationInternalEvent<?>> e
+	@SuppressWarnings("rawtypes")
+	class StateModel {
+		private DispatchJobQueueEvent queueEvent;
+		private BlockingQueue<IterationInternalEvent> pendingEvents = new LinkedBlockingQueue<>();
+		private int activeJobs = 0;
+		private int maximumJobs;
+
+		/**
+		 * Construct state model for a particular owning process
+		 *
+		 * @param owningProcess
+		 *            Process to track parallel execution
+		 * @param queue
+		 *            reference to the queue into which jobs are inserted by the
+		 *            iteration strategy
+		 * @param activities
+		 *            activities to pass along with job events down into the
+		 *            stack below
+		 * @param maxJobs
+		 *            maximum number of concurrent jobs to keep 'hot' at any
+		 *            given point
+		 */
+		protected StateModel(DispatchJobQueueEvent queueEvent, int maxJobs) {
+			this.queueEvent = queueEvent;
+			this.maximumJobs = maxJobs;
+		}
+
+		Integer queueSize() {
+			return queueEvent.getQueue().size();
+		}
+
+		/**
+		 * Poll the queue repeatedly until either the queue is empty or we have
+		 * enough jobs pulled from it. The semantics for this are:
+		 * <ul>
+		 * <li>If the head of the queue is a Job and activeJobs < maximumJobs
+		 * then increment activeJobs, add the Job to the pending events list at
+		 * the end and send the message down the stack
+		 * <li>If the head of the queue is a Completion and the pending jobs
+		 * list is empty then send it to the layer above
+		 * <li>If the head of the queue is a Completion and the pending jobs
+		 * list is not empty then add the Completion to the end of the pending
+		 * jobs list and return
+		 * </ul>
+		 */
+		protected void fillFromQueue() {
+			synchronized (this) {
+				while (queueEvent.getQueue().peek() != null
+						&& activeJobs < maximumJobs) {
+					final IterationInternalEvent e = queueEvent.getQueue()
+							.remove();
+
+					if (e instanceof Completion && pendingEvents.peek() == null) {
+						new Thread(new Runnable() {
+							@Override
+							public void run() {
+								getAbove().receiveResultCompletion(
+										new DispatchCompletionEvent(e
+												.getOwningProcess(), e
+												.getIndex(), e.getContext()));
+							}
+						}, "Parallelize " + e.getOwningProcess()).start();
+						// getAbove().receiveResultCompletion((Completion) e);
+					} else {
+						pendingEvents.add(e);
+					}
+					if (e instanceof Job) {
+						synchronized (this) {
+							activeJobs++;
+						}
+						sentJobsCount++;
+
+						DispatchJobEvent dispatchJobEvent = new DispatchJobEvent(e
+								.getOwningProcess(), e
+								.getIndex(), e.getContext(),
+								((Job) e).getData(), queueEvent
+										.getActivities());
+						// Register with the monitor
+						MonitorManager.getInstance().registerNode(dispatchJobEvent,
+								e.getOwningProcess(),
+								new HashSet<MonitorableProperty<?>>());
+
+						getBelow().receiveJob(dispatchJobEvent);
+					}
+				}
+			}
+		}
+
+		/**
+		 * Returns true if the index matched an existing Job exactly, if this
+		 * method returns false then you have a partial completion event which
+		 * should be sent up the stack without modification.
+		 *
+		 * @param index
+		 * @return
+		 */
+		protected boolean finishWith(int[] index) {
+			synchronized (this) {
+				for (IterationInternalEvent e : new ArrayList<>(pendingEvents)) {
+					if (!(e instanceof Job))
+						continue;
+					Job j = (Job) e;
+					if (!arrayEquals(j.getIndex(), index))
+						continue;
+
+					/*
+					 * Found a job in the pending events list which has the
+					 * same index, remove it and decrement the current count
+					 * of active jobs
+					 */
+					pendingEvents.remove(e);
+					activeJobs--;
+					completedJobsCount++;
+					/*
+					 * Now pull any completion events that have reached the head
+					 * of the queue - this indicates that all the job events
+					 * which came in before them have been processed and we can
+					 * emit the completions
+					 */
+					while (pendingEvents.peek() != null
+							&& pendingEvents.peek() instanceof Completion) {
+						Completion c = (Completion) pendingEvents.remove();
+						getAbove().receiveResultCompletion(
+								new DispatchCompletionEvent(c
+										.getOwningProcess(), c.getIndex(), c
+										.getContext()));
+					}
+					/*
+					 * Refresh from the queue; as we've just decremented the
+					 * active job count there should be a worker available
+					 */
+					fillFromQueue();
+					/*
+					 * Return true to indicate that we removed a job event from
+					 * the queue, that is to say that the index wasn't that of a
+					 * partial completion.
+					 */
+					return true;
+				}
+			}
+			return false;
+		}
+
+		private boolean arrayEquals(int[] a, int[] b) {
+			if (a.length != b.length)
+				return false;
+			for (int i = 0; i < a.length; i++)
+				if (a[i] != b[i])
+					return false;
+			return true;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
new file mode 100644
index 0000000..29d69d6
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
+
+/**
+ * Bean to hold the configuration for the parallelize layer, specifically a
+ * single int property defining the number of concurrent jobs in that processor
+ * instance per owning process ID.
+ * 
+ * @author Tom Oinn
+ */
+@ConfigurationBean(uri = Parallelize.URI + "#Config")
+public class ParallelizeConfig {
+	private int maxJobs;
+
+	public ParallelizeConfig() {
+		super();
+		this.maxJobs = 1;
+	}
+
+	@ConfigurationProperty(name = "maxJobs", label = "Maximum Parallel Jobs", description = "The maximum number of jobs that can run in parallel", required = false)
+	public void setMaximumJobs(int maxJobs) {
+		this.maxJobs = maxJobs;
+	}
+
+	public int getMaximumJobs() {
+		return this.maxJobs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
new file mode 100644
index 0000000..f2054d9
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
@@ -0,0 +1,180 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
+
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+
+/**
+ * Implements retry policy with delay between retries and exponential backoff
+ * <p>
+ * Default properties are as follows :
+ * <ul>
+ * <li>maxRetries = 0 (int)</li>
+ * <li>initialDelay = 1000 (milliseconds)</li>
+ * <li>maxDelay = 2000 (milliseconds)</li>
+ * <li>backoffFactor = 1.0 (double)</li>
+ * </ul>
+ *
+ * @author Tom Oinn
+ * @author David Withers
+ * @author Stian Soiland-Reyes
+ */
+@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
+		UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
+@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
+public class Retry extends AbstractErrorHandlerLayer<JsonNode> {
+	private static final String BACKOFF_FACTOR = "backoffFactor";
+    private static final String MAX_DELAY = "maxDelay";
+    private static final String MAX_RETRIES = "maxRetries";
+    private static final String INITIAL_DELAY = "initialDelay";
+    public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Retry";
+
+	private ObjectNode config;
+    private int maxRetries;
+    private int initialDelay;
+    private int maxDelay;
+    private double backoffFactor;
+
+	private static Timer retryTimer = new Timer("Retry timer", true);
+
+	public Retry() {
+		super();
+		configure(JsonNodeFactory.instance.objectNode());
+	}
+
+	public Retry(int maxRetries, int initialDelay, int maxDelay,
+			double backoffFactor) {
+		super();
+		ObjectNode conf = JsonNodeFactory.instance.objectNode();
+		conf.put(MAX_RETRIES, maxRetries);
+		conf.put(INITIAL_DELAY, initialDelay);
+		conf.put(MAX_DELAY, maxDelay);
+		conf.put(BACKOFF_FACTOR, backoffFactor);
+		configure(conf);
+	}
+
+	class RetryState extends JobState {
+		int currentRetryCount = 0;
+
+		public RetryState(DispatchJobEvent jobEvent) {
+			super(jobEvent);
+		}
+
+		/**
+		 * Try to schedule a retry, returns true if a retry is scheduled, false
+		 * if the retry count has already been reached (in which case no retry
+		 * is scheduled
+		 *
+		 * @return
+		 */
+		@Override
+		public boolean handleError() {
+			if (currentRetryCount >= maxRetries)
+				return false;
+			int delay = (int) (initialDelay * Math.pow(backoffFactor, currentRetryCount));
+			delay = Math.min(delay, maxDelay);
+			TimerTask task = new TimerTask() {
+				@Override
+				public void run() {
+					currentRetryCount++;
+					getBelow().receiveJob(jobEvent);
+				}
+			};
+			retryTimer.schedule(task, delay);
+			return true;
+		}
+	}
+
+	@Override
+	protected JobState getStateObject(DispatchJobEvent jobEvent) {
+		return new RetryState(jobEvent);
+	}
+
+	@Override
+	public void configure(JsonNode config) {
+	    ObjectNode defaultConfig = defaultConfig();
+        setAllMissingFields((ObjectNode) config, defaultConfig);
+        checkConfig((ObjectNode)config);
+        this.config = (ObjectNode) config;
+        maxRetries = config.get(MAX_RETRIES).intValue();
+        initialDelay = config.get(INITIAL_DELAY).intValue();
+        maxDelay = config.get(MAX_DELAY).intValue();
+        backoffFactor = config.get(BACKOFF_FACTOR).doubleValue();       
+	}
+
+    private void setAllMissingFields(ObjectNode config, ObjectNode defaults) {
+        for (String fieldName : forEach(defaults.fieldNames()))
+	        if (! config.has(fieldName) || config.get(fieldName).isNull())
+	            config.put(fieldName, defaults.get(fieldName));
+    }
+
+	private <T> Iterable<T> forEach(final Iterator<T> iterator) {
+	    return new Iterable<T>() {
+            @Override
+            public Iterator<T> iterator() {
+                return iterator;
+            }
+        };
+    }
+
+    private void checkConfig(ObjectNode conf) {
+        if (conf.get(MAX_RETRIES).intValue() < 0)
+            throw new IllegalArgumentException("maxRetries < 0");
+        if (conf.get(INITIAL_DELAY).intValue() < 0)
+            throw new IllegalArgumentException("initialDelay < 0");
+        if (conf.get(MAX_DELAY).intValue() < conf.get(INITIAL_DELAY).intValue())
+            throw new IllegalArgumentException("maxDelay < initialDelay");
+        if (conf.get(BACKOFF_FACTOR).doubleValue() < 0.0)
+            throw new IllegalArgumentException("backoffFactor < 0.0");
+    }
+
+    public static ObjectNode defaultConfig() {
+	    ObjectNode conf = JsonNodeFactory.instance.objectNode();
+	    conf.put(MAX_RETRIES, 0);
+	    conf.put(INITIAL_DELAY, 1000);
+	    conf.put(MAX_DELAY, 5000);
+	    conf.put(BACKOFF_FACTOR, 1.0);
+	    return conf;
+    }
+
+    @Override
+	public JsonNode getConfiguration() {
+		return this.config;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
new file mode 100644
index 0000000..39dedbf
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
@@ -0,0 +1,97 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
+
+@ConfigurationBean(uri = Retry.URI + "#Config")
+public class RetryConfig {
+	private static final float BACKOFF_FACTOR = 1.0f;
+	private static final int MAX_DELAY = 5000;
+	private static final int INITIAL_DELAY = 1000;
+	private static final int MAX_RETRIES = 0;
+
+	private float backoffFactor = BACKOFF_FACTOR;
+	private int initialDelay = INITIAL_DELAY;
+	private int maxDelay = MAX_DELAY;
+	private int maxRetries = MAX_RETRIES;
+
+	/**
+	 * Factor by which the initial delay is multiplied for each retry after the
+	 * first, this allows for exponential backoff of retry times up to a certain
+	 * ceiling
+	 *
+	 * @return
+	 */
+	public float getBackoffFactor() {
+		return this.backoffFactor;
+	}
+
+	/**
+	 * Delay in milliseconds between the initial failure message and the first
+	 * attempt to retry the failed job
+	 *
+	 * @return
+	 */
+	public int getInitialDelay() {
+		return this.initialDelay;
+	}
+
+	/**
+	 * Maximum delay in milliseconds between failure reception and retry. This
+	 * acts as a ceiling for the exponential backoff factor allowing the retry
+	 * delay to initially increase to a certain value then remain constant after
+	 * that point rather than exploding to unreasonable levels.
+	 */
+	public int getMaxDelay() {
+		return this.maxDelay;
+	}
+
+	/**
+	 * Maximum number of retries for a failing process
+	 *
+	 * @return
+	 */
+	public int getMaxRetries() {
+		return this.maxRetries;
+	}
+
+	@ConfigurationProperty(name = "backoffFactor", label = "Backoff Factor", description = "Factor by which the initial delay is multiplied for each retry after the first retry", required=false)
+	public void setBackoffFactor(float factor) {
+		this.backoffFactor = factor;
+	}
+
+	@ConfigurationProperty(name = "initialDelay", label = "Initial Delay", description = "Delay in milliseconds between the initial failure message and the first attempt to retry the failed job", required=false)
+	public void setInitialDelay(int delay) {
+		this.initialDelay = delay;
+	}
+
+	@ConfigurationProperty(name = "maxDelay", label = "Maximum Delay", description = "Maximum delay in milliseconds between failure reception and retry", required=false)
+	public void setMaxDelay(int delay) {
+		this.maxDelay = delay;
+	}
+
+	@ConfigurationProperty(name = "maxRetries", label = "Maximum Retries", description = "Maximum number of retries for a failing process", required=false)
+	public void setMaxRetries(int max) {
+		this.maxRetries = max;
+	}
+}


[4/6] incubator-taverna-engine git commit: taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions

Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
deleted file mode 100644
index bd0e69a..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import net.sf.taverna.t2.invocation.Completion;
-import net.sf.taverna.t2.invocation.IterationInternalEvent;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.NotifiableLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobQueueReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Dispatch layer which consumes a queue of events and fires off a fixed number
- * of simultaneous jobs to the layer below. It observes failure, data and
- * completion events coming up and uses these to determine when to push more
- * jobs downwards into the stack as well as when it can safely emit completion
- * events from the queue.
- *
- * @author Tom Oinn
- *
- */
-@DispatchLayerErrorReaction(emits = {}, relaysUnmodified = true, stateEffects = {
-		REMOVE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerJobQueueReaction(emits = { JOB }, relaysUnmodified = false, stateEffects = { CREATE_PROCESS_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {
-		REMOVE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {
-		REMOVE_PROCESS_STATE, NO_EFFECT })
-@SupportsStreamedResult
-public class Parallelize extends AbstractDispatchLayer<JsonNode>
-		implements NotifiableLayer,
-		PropertyContributingDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Parallelize";
-	private static Logger logger = Logger.getLogger(Parallelize.class);
-
-	private Map<String, StateModel> stateMap = new HashMap<>();
-	private JsonNode config = JsonNodeFactory.instance.objectNode();
-	int sentJobsCount = 0;
-	int completedJobsCount = 0;
-
-	public Parallelize() {
-		super();
-	}
-
-	/**
-	 * Test constructor, only used by unit tests, should probably not be public
-	 * access here?
-	 *
-	 * @param maxJobs
-	 */
-	public Parallelize(int maxJobs) {
-		super();
-		((ObjectNode)config).put("maxJobs", maxJobs);
-	}
-
-	@Override
-	public void eventAdded(String owningProcess) {
-		StateModel stateModel;
-		synchronized (stateMap) {
-			stateModel = stateMap.get(owningProcess);
-		}
-		if (stateModel == null)
-			/*
-			 * Should never see this here, it means we've had duplicate
-			 * completion events from upstream
-			 */
-			throw new WorkflowStructureException(
-					"Unknown owning process " + owningProcess);
-		synchronized (stateModel) {
-			stateModel.fillFromQueue();
-		}
-	}
-
-	@Override
-	public void receiveJobQueue(DispatchJobQueueEvent queueEvent) {
-		StateModel model = new StateModel(queueEvent,
-				config.has("maxJobs") ? config.get("maxJobs").intValue() : 1);
-		synchronized (stateMap) {
-			stateMap.put(queueEvent.getOwningProcess(), model);
-		}
-		model.fillFromQueue();
-	}
-
-	public void receiveJob(Job job, List<? extends Activity<?>> activities) {
-		throw new WorkflowStructureException(
-				"Parallelize layer cannot handle job events");
-	}
-
-	@Override
-	public void receiveError(DispatchErrorEvent errorEvent) {
-		StateModel model;
-		String owningProcess = errorEvent.getOwningProcess();
-		synchronized(stateMap) {
-			model = stateMap.get(owningProcess);
-		}
-		if (model == null) {
-			logger.warn("Error received for unknown owning process: " + owningProcess);
-			return;
-		}
-		model.finishWith(errorEvent.getIndex());
-		getAbove().receiveError(errorEvent);
-	}
-
-	@Override
-	public void receiveResult(DispatchResultEvent resultEvent) {
-		StateModel model;
-		String owningProcess = resultEvent.getOwningProcess();
-		synchronized(stateMap) {
-			model = stateMap.get(owningProcess);
-		}
-		if (model == null) {
-			logger.warn("Error received for unknown owning process: " + owningProcess);
-			return;
-		}
-		if (!resultEvent.isStreamingEvent()) {
-			MonitorManager.getInstance().registerNode(resultEvent,
-					owningProcess,
-					new HashSet<MonitorableProperty<?>>());
-		}
-		model.finishWith(resultEvent.getIndex());
-		getAbove().receiveResult(resultEvent);
-	}
-
-	/**
-	 * Only going to receive this if the activity invocation was streaming, in
-	 * which case we need to handle all completion events and pass them up the
-	 * stack.
-	 */
-	@Override
-	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
-		StateModel model;
-		String owningProcess = completionEvent.getOwningProcess();
-		synchronized(stateMap) {
-			model = stateMap.get(owningProcess);
-		}
-		if (model == null) {
-			logger.warn("Error received for unknown owning process: " + owningProcess);
-			return;
-		}
-		model.finishWith(completionEvent.getIndex());
-		getAbove().receiveResultCompletion(completionEvent);
-	}
-
-	@Override
-	public void finishedWith(final String owningProcess) {
-		// Delay the removal of the state to give the monitor a chance to poll
-		cleanupTimer.schedule(new TimerTask() {
-			@Override
-			public void run() {
-				synchronized(stateMap) {
-					stateMap.remove(owningProcess);
-				}
-			}
-		}, CLEANUP_DELAY_MS);
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-		this.config = config;
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return this.config;
-	}
-
-	/**
-	 * Injects the following properties into its parent processor's property set:
-	 * <ul>
-	 * <li><code>dispatch.parallelize.queuesize [Integer]</code><br/>The current
-	 * size of the incomming job queue, or -1 if the state isn't defined for the
-	 * registered process identifier (which will be the case if the process
-	 * hasn't started or has had its state purged after a final completion of
-	 * some kind.</li>
-	 * </ul>
-	 */
-	@Override
-	public void injectPropertiesFor(final String owningProcess) {
-		/**
-		 * Property for the queue depth, will evaluate to -1 if there isn't a
-		 * queue in the state model for this identifier (which will be the case
-		 * if we haven't created the state yet or the queue has been collected)
-		 */
-		MonitorableProperty<Integer> queueSizeProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "parallelize", "queuesize" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				StateModel model;
-				synchronized(stateMap) {
-					model = stateMap.get(owningProcess);
-				}
-				if (model == null)
-					return -1;
-				return model.queueSize();
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(queueSizeProperty,
-				owningProcess);
-
-		MonitorableProperty<Integer> sentJobsProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "parallelize", "sentjobs" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				return sentJobsCount;
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(sentJobsProperty,
-				owningProcess);
-
-		MonitorableProperty<Integer> completedJobsProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "parallelize",
-						"completedjobs" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				return completedJobsCount;
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(completedJobsProperty,
-				owningProcess);
-	}
-
-	/**
-	 * Holds the state for a given owning process
-	 *
-	 * @author Tom Oinn
-	 *
-	 */
-	// suppressed to avoid jdk1.5 error messages caused by the declaration
-	// IterationInternalEvent<? extends IterationInternalEvent<?>> e
-	@SuppressWarnings("rawtypes")
-	class StateModel {
-		private DispatchJobQueueEvent queueEvent;
-		private BlockingQueue<IterationInternalEvent> pendingEvents = new LinkedBlockingQueue<>();
-		private int activeJobs = 0;
-		private int maximumJobs;
-
-		/**
-		 * Construct state model for a particular owning process
-		 *
-		 * @param owningProcess
-		 *            Process to track parallel execution
-		 * @param queue
-		 *            reference to the queue into which jobs are inserted by the
-		 *            iteration strategy
-		 * @param activities
-		 *            activities to pass along with job events down into the
-		 *            stack below
-		 * @param maxJobs
-		 *            maximum number of concurrent jobs to keep 'hot' at any
-		 *            given point
-		 */
-		protected StateModel(DispatchJobQueueEvent queueEvent, int maxJobs) {
-			this.queueEvent = queueEvent;
-			this.maximumJobs = maxJobs;
-		}
-
-		Integer queueSize() {
-			return queueEvent.getQueue().size();
-		}
-
-		/**
-		 * Poll the queue repeatedly until either the queue is empty or we have
-		 * enough jobs pulled from it. The semantics for this are:
-		 * <ul>
-		 * <li>If the head of the queue is a Job and activeJobs < maximumJobs
-		 * then increment activeJobs, add the Job to the pending events list at
-		 * the end and send the message down the stack
-		 * <li>If the head of the queue is a Completion and the pending jobs
-		 * list is empty then send it to the layer above
-		 * <li>If the head of the queue is a Completion and the pending jobs
-		 * list is not empty then add the Completion to the end of the pending
-		 * jobs list and return
-		 * </ul>
-		 */
-		protected void fillFromQueue() {
-			synchronized (this) {
-				while (queueEvent.getQueue().peek() != null
-						&& activeJobs < maximumJobs) {
-					final IterationInternalEvent e = queueEvent.getQueue()
-							.remove();
-
-					if (e instanceof Completion && pendingEvents.peek() == null) {
-						new Thread(new Runnable() {
-							@Override
-							public void run() {
-								getAbove().receiveResultCompletion(
-										new DispatchCompletionEvent(e
-												.getOwningProcess(), e
-												.getIndex(), e.getContext()));
-							}
-						}, "Parallelize " + e.getOwningProcess()).start();
-						// getAbove().receiveResultCompletion((Completion) e);
-					} else {
-						pendingEvents.add(e);
-					}
-					if (e instanceof Job) {
-						synchronized (this) {
-							activeJobs++;
-						}
-						sentJobsCount++;
-
-						DispatchJobEvent dispatchJobEvent = new DispatchJobEvent(e
-								.getOwningProcess(), e
-								.getIndex(), e.getContext(),
-								((Job) e).getData(), queueEvent
-										.getActivities());
-						// Register with the monitor
-						MonitorManager.getInstance().registerNode(dispatchJobEvent,
-								e.getOwningProcess(),
-								new HashSet<MonitorableProperty<?>>());
-
-						getBelow().receiveJob(dispatchJobEvent);
-					}
-				}
-			}
-		}
-
-		/**
-		 * Returns true if the index matched an existing Job exactly, if this
-		 * method returns false then you have a partial completion event which
-		 * should be sent up the stack without modification.
-		 *
-		 * @param index
-		 * @return
-		 */
-		protected boolean finishWith(int[] index) {
-			synchronized (this) {
-				for (IterationInternalEvent e : new ArrayList<>(pendingEvents)) {
-					if (!(e instanceof Job))
-						continue;
-					Job j = (Job) e;
-					if (!arrayEquals(j.getIndex(), index))
-						continue;
-
-					/*
-					 * Found a job in the pending events list which has the
-					 * same index, remove it and decrement the current count
-					 * of active jobs
-					 */
-					pendingEvents.remove(e);
-					activeJobs--;
-					completedJobsCount++;
-					/*
-					 * Now pull any completion events that have reached the head
-					 * of the queue - this indicates that all the job events
-					 * which came in before them have been processed and we can
-					 * emit the completions
-					 */
-					while (pendingEvents.peek() != null
-							&& pendingEvents.peek() instanceof Completion) {
-						Completion c = (Completion) pendingEvents.remove();
-						getAbove().receiveResultCompletion(
-								new DispatchCompletionEvent(c
-										.getOwningProcess(), c.getIndex(), c
-										.getContext()));
-					}
-					/*
-					 * Refresh from the queue; as we've just decremented the
-					 * active job count there should be a worker available
-					 */
-					fillFromQueue();
-					/*
-					 * Return true to indicate that we removed a job event from
-					 * the queue, that is to say that the index wasn't that of a
-					 * partial completion.
-					 */
-					return true;
-				}
-			}
-			return false;
-		}
-
-		private boolean arrayEquals(int[] a, int[] b) {
-			if (a.length != b.length)
-				return false;
-			for (int i = 0; i < a.length; i++)
-				if (a[i] != b[i])
-					return false;
-			return true;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
deleted file mode 100644
index 29d69d6..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-/**
- * Bean to hold the configuration for the parallelize layer, specifically a
- * single int property defining the number of concurrent jobs in that processor
- * instance per owning process ID.
- * 
- * @author Tom Oinn
- */
-@ConfigurationBean(uri = Parallelize.URI + "#Config")
-public class ParallelizeConfig {
-	private int maxJobs;
-
-	public ParallelizeConfig() {
-		super();
-		this.maxJobs = 1;
-	}
-
-	@ConfigurationProperty(name = "maxJobs", label = "Maximum Parallel Jobs", description = "The maximum number of jobs that can run in parallel", required = false)
-	public void setMaximumJobs(int maxJobs) {
-		this.maxJobs = maxJobs;
-	}
-
-	public int getMaximumJobs() {
-		return this.maxJobs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
deleted file mode 100644
index f2054d9..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.Iterator;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-
-/**
- * Implements retry policy with delay between retries and exponential backoff
- * <p>
- * Default properties are as follows :
- * <ul>
- * <li>maxRetries = 0 (int)</li>
- * <li>initialDelay = 1000 (milliseconds)</li>
- * <li>maxDelay = 2000 (milliseconds)</li>
- * <li>backoffFactor = 1.0 (double)</li>
- * </ul>
- *
- * @author Tom Oinn
- * @author David Withers
- * @author Stian Soiland-Reyes
- */
-@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
-		UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
-@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
-public class Retry extends AbstractErrorHandlerLayer<JsonNode> {
-	private static final String BACKOFF_FACTOR = "backoffFactor";
-    private static final String MAX_DELAY = "maxDelay";
-    private static final String MAX_RETRIES = "maxRetries";
-    private static final String INITIAL_DELAY = "initialDelay";
-    public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Retry";
-
-	private ObjectNode config;
-    private int maxRetries;
-    private int initialDelay;
-    private int maxDelay;
-    private double backoffFactor;
-
-	private static Timer retryTimer = new Timer("Retry timer", true);
-
-	public Retry() {
-		super();
-		configure(JsonNodeFactory.instance.objectNode());
-	}
-
-	public Retry(int maxRetries, int initialDelay, int maxDelay,
-			double backoffFactor) {
-		super();
-		ObjectNode conf = JsonNodeFactory.instance.objectNode();
-		conf.put(MAX_RETRIES, maxRetries);
-		conf.put(INITIAL_DELAY, initialDelay);
-		conf.put(MAX_DELAY, maxDelay);
-		conf.put(BACKOFF_FACTOR, backoffFactor);
-		configure(conf);
-	}
-
-	class RetryState extends JobState {
-		int currentRetryCount = 0;
-
-		public RetryState(DispatchJobEvent jobEvent) {
-			super(jobEvent);
-		}
-
-		/**
-		 * Try to schedule a retry, returns true if a retry is scheduled, false
-		 * if the retry count has already been reached (in which case no retry
-		 * is scheduled
-		 *
-		 * @return
-		 */
-		@Override
-		public boolean handleError() {
-			if (currentRetryCount >= maxRetries)
-				return false;
-			int delay = (int) (initialDelay * Math.pow(backoffFactor, currentRetryCount));
-			delay = Math.min(delay, maxDelay);
-			TimerTask task = new TimerTask() {
-				@Override
-				public void run() {
-					currentRetryCount++;
-					getBelow().receiveJob(jobEvent);
-				}
-			};
-			retryTimer.schedule(task, delay);
-			return true;
-		}
-	}
-
-	@Override
-	protected JobState getStateObject(DispatchJobEvent jobEvent) {
-		return new RetryState(jobEvent);
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-	    ObjectNode defaultConfig = defaultConfig();
-        setAllMissingFields((ObjectNode) config, defaultConfig);
-        checkConfig((ObjectNode)config);
-        this.config = (ObjectNode) config;
-        maxRetries = config.get(MAX_RETRIES).intValue();
-        initialDelay = config.get(INITIAL_DELAY).intValue();
-        maxDelay = config.get(MAX_DELAY).intValue();
-        backoffFactor = config.get(BACKOFF_FACTOR).doubleValue();       
-	}
-
-    private void setAllMissingFields(ObjectNode config, ObjectNode defaults) {
-        for (String fieldName : forEach(defaults.fieldNames()))
-	        if (! config.has(fieldName) || config.get(fieldName).isNull())
-	            config.put(fieldName, defaults.get(fieldName));
-    }
-
-	private <T> Iterable<T> forEach(final Iterator<T> iterator) {
-	    return new Iterable<T>() {
-            @Override
-            public Iterator<T> iterator() {
-                return iterator;
-            }
-        };
-    }
-
-    private void checkConfig(ObjectNode conf) {
-        if (conf.get(MAX_RETRIES).intValue() < 0)
-            throw new IllegalArgumentException("maxRetries < 0");
-        if (conf.get(INITIAL_DELAY).intValue() < 0)
-            throw new IllegalArgumentException("initialDelay < 0");
-        if (conf.get(MAX_DELAY).intValue() < conf.get(INITIAL_DELAY).intValue())
-            throw new IllegalArgumentException("maxDelay < initialDelay");
-        if (conf.get(BACKOFF_FACTOR).doubleValue() < 0.0)
-            throw new IllegalArgumentException("backoffFactor < 0.0");
-    }
-
-    public static ObjectNode defaultConfig() {
-	    ObjectNode conf = JsonNodeFactory.instance.objectNode();
-	    conf.put(MAX_RETRIES, 0);
-	    conf.put(INITIAL_DELAY, 1000);
-	    conf.put(MAX_DELAY, 5000);
-	    conf.put(BACKOFF_FACTOR, 1.0);
-	    return conf;
-    }
-
-    @Override
-	public JsonNode getConfiguration() {
-		return this.config;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
deleted file mode 100644
index 39dedbf..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-@ConfigurationBean(uri = Retry.URI + "#Config")
-public class RetryConfig {
-	private static final float BACKOFF_FACTOR = 1.0f;
-	private static final int MAX_DELAY = 5000;
-	private static final int INITIAL_DELAY = 1000;
-	private static final int MAX_RETRIES = 0;
-
-	private float backoffFactor = BACKOFF_FACTOR;
-	private int initialDelay = INITIAL_DELAY;
-	private int maxDelay = MAX_DELAY;
-	private int maxRetries = MAX_RETRIES;
-
-	/**
-	 * Factor by which the initial delay is multiplied for each retry after the
-	 * first, this allows for exponential backoff of retry times up to a certain
-	 * ceiling
-	 *
-	 * @return
-	 */
-	public float getBackoffFactor() {
-		return this.backoffFactor;
-	}
-
-	/**
-	 * Delay in milliseconds between the initial failure message and the first
-	 * attempt to retry the failed job
-	 *
-	 * @return
-	 */
-	public int getInitialDelay() {
-		return this.initialDelay;
-	}
-
-	/**
-	 * Maximum delay in milliseconds between failure reception and retry. This
-	 * acts as a ceiling for the exponential backoff factor allowing the retry
-	 * delay to initially increase to a certain value then remain constant after
-	 * that point rather than exploding to unreasonable levels.
-	 */
-	public int getMaxDelay() {
-		return this.maxDelay;
-	}
-
-	/**
-	 * Maximum number of retries for a failing process
-	 *
-	 * @return
-	 */
-	public int getMaxRetries() {
-		return this.maxRetries;
-	}
-
-	@ConfigurationProperty(name = "backoffFactor", label = "Backoff Factor", description = "Factor by which the initial delay is multiplied for each retry after the first retry", required=false)
-	public void setBackoffFactor(float factor) {
-		this.backoffFactor = factor;
-	}
-
-	@ConfigurationProperty(name = "initialDelay", label = "Initial Delay", description = "Delay in milliseconds between the initial failure message and the first attempt to retry the failed job", required=false)
-	public void setInitialDelay(int delay) {
-		this.initialDelay = delay;
-	}
-
-	@ConfigurationProperty(name = "maxDelay", label = "Maximum Delay", description = "Maximum delay in milliseconds between failure reception and retry", required=false)
-	public void setMaxDelay(int delay) {
-		this.maxDelay = delay;
-	}
-
-	@ConfigurationProperty(name = "maxRetries", label = "Maximum Retries", description = "Maximum number of retries for a failing process", required=false)
-	public void setMaxRetries(int max) {
-		this.maxRetries = max;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
deleted file mode 100644
index 3169f8c..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- *
- */
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.reference.WorkflowRunIdEntity;
-import net.sf.taverna.t2.workflowmodel.ConfigurationException;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * This layer allows for the cancellation, pausing and resuming of workflow
- * runs. It does so by intercepting jobs sent to the layer.
- *
- * @author alanrw
- */
-public class Stop extends AbstractDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop";
-	/**
-	 * The set of ids of workflow runs that have been cancelled.
-	 */
-	private static Set<String> cancelledWorkflowRuns = new HashSet<>();
-	/**
-	 * A map from workflow run ids to the set of Stop layers where jobs have
-	 * been intercepted for that run.
-	 */
-	private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<>();
-	/**
-	 * A map for a given Stop from ids of suspended workflow runs to the jobs
-	 * that have been intercepted.
-	 */
-	private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<>();
-
-	@Override
-	public void configure(JsonNode conf) throws ConfigurationException {
-		// nothing
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return null;
-	}
-
-	@Override
-	public void receiveJob(final DispatchJobEvent jobEvent) {
-		List<WorkflowRunIdEntity> entities = jobEvent.getContext().getEntities(
-				WorkflowRunIdEntity.class);
-		if (entities != null && !entities.isEmpty()) {
-			final String wfRunId = entities.get(0).getWorkflowRunId();
-			// If the workflow run is cancelled then simply "eat" the jobEvent.
-			// This does a hard-cancel.
-			if (cancelledWorkflowRuns.contains(wfRunId))
-				return;
-			// If the workflow run is paused
-			if (pausedLayerMap.containsKey(wfRunId))
-				synchronized (Stop.class) {
-					// double check as pausedLayerMap may have been changed
-					// waiting for the lock
-					if (pausedLayerMap.containsKey(wfRunId)) {
-						// Remember that this Stop layer was affected by the
-						// workflow pause
-						pausedLayerMap.get(wfRunId).add(this);
-						if (!suspendedJobEventMap.containsKey(wfRunId))
-							suspendedJobEventMap.put(wfRunId,
-									new HashSet<DispatchJobEvent>());
-						// Remember the suspended jobEvent
-						suspendedJobEventMap.get(wfRunId).add(jobEvent);
-						return;
-					}
-				}
-		}
-		// By default pass the jobEvent down to the next layer
-		super.receiveJob(jobEvent);
-	}
-
-	@Override
-	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
-		super.receiveJobQueue(jobQueueEvent);
-	}
-
-	/**
-	 * Cancel the workflow run with the specified id
-	 *
-	 * @param workflowRunId
-	 *            The id of the workflow run to cancel
-	 * @return If the workflow run was cancelled then true. If it was already
-	 *         cancelled then false.
-	 */
-	public static synchronized boolean cancelWorkflow(String workflowRunId) {
-		if (cancelledWorkflowRuns.contains(workflowRunId))
-			return false;
-		Set<String> cancelledWorkflowRunsCopy = new HashSet<>(
-				cancelledWorkflowRuns);
-		cancelledWorkflowRunsCopy.add(workflowRunId);
-		cancelledWorkflowRuns = cancelledWorkflowRunsCopy;
-		return true;
-	}
-
-	/**
-	 * Pause the workflow run with the specified id
-	 *
-	 * @param workflowRunId
-	 *            The id of the workflow run to pause
-	 * @return If the workflow run was paused then true. If it was already
-	 *         paused or cancelled then false.
-	 */
-	public static synchronized boolean pauseWorkflow(String workflowRunId) {
-		if (cancelledWorkflowRuns.contains(workflowRunId))
-			return false;
-		if (pausedLayerMap.containsKey(workflowRunId))
-			return false;
-		Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>(pausedLayerMap);
-		pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
-		pausedLayerMap = pausedLayerMapCopy;
-		return true;
-	}
-
-	/**
-	 * Resume the workflow run with the specified id
-	 *
-	 * @param workflowRunId
-	 *            The id of the workflow run to resume
-	 * @return If the workflow run was resumed then true. If the workflow run
-	 *         was not paused or it was cancelled, then false.
-	 */
-	public static synchronized boolean resumeWorkflow(String workflowRunId) {
-		if (cancelledWorkflowRuns.contains(workflowRunId))
-			return false;
-		if (!pausedLayerMap.containsKey(workflowRunId))
-			return false;
-		Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>();
-		pausedLayerMapCopy.putAll(pausedLayerMap);
-		Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
-		pausedLayerMap = pausedLayerMapCopy;
-		for (Stop s : stops)
-			s.resumeLayerWorkflow(workflowRunId);
-		return true;
-	}
-
-	/**
-	 * Resume the workflow run with the specified id on this Stop layer. This
-	 * method processes any suspended job events.
-	 *
-	 * @param workflowRunId
-	 *            The id of the workflow run to resume.
-	 */
-	private void resumeLayerWorkflow(String workflowRunId) {
-		synchronized (Stop.class) {
-			for (DispatchJobEvent dje : suspendedJobEventMap
-					.remove(workflowRunId))
-				receiveJob(dje);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
deleted file mode 100644
index fe6e73f..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
+++ /dev/null
@@ -1,4 +0,0 @@
-<body>
-Contains implementations of DispatchLayer defined by the core Taverna 2
-specification.
-</body>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml b/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
deleted file mode 100644
index dc55fc7..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans:beans xmlns="http://www.springframework.org/schema/osgi" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xmlns:beans="http://www.springframework.org/schema/beans"
-	xsi:schemaLocation="http://www.springframework.org/schema/beans 
-                                 http://www.springframework.org/schema/beans/spring-beans.xsd
-                                 http://www.springframework.org/schema/osgi 
-                                 http://www.springframework.org/schema/osgi/spring-osgi.xsd" >
-
-	<service ref="coreDispatchLayerFactory" interface="net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory" />
-
-</beans:beans>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml b/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
deleted file mode 100644
index 90ed75f..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
+++ /dev/null
@@ -1,9 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://www.springframework.org/schema/beans 
-                           http://www.springframework.org/schema/beans/spring-beans.xsd">
-                           
-	<bean id="coreDispatchLayerFactory" class="net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.CoreDispatchLayerFactory" />
-
-</beans>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java b/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
deleted file mode 100644
index e202df1..0000000
--- a/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import static org.junit.Assert.*;
-
-public class TestRetry {
-    
-    @Test
-    public void defaultConfig() throws Exception {
-        Retry retry = new Retry();
-        JsonNode configuration = retry.getConfiguration();
-        assertEquals(0, configuration.get("maxRetries").intValue());
-        assertEquals(1000, configuration.get("initialDelay").intValue());
-        assertEquals(5000, configuration.get("maxDelay").intValue());
-        assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
-    }
-
-    @Test
-    public void customConfig() throws Exception {
-        Retry retry = new Retry(15, 150, 1200, 1.2);
-        JsonNode configuration = retry.getConfiguration();
-        assertEquals(15, configuration.get("maxRetries").intValue());
-        assertEquals(150, configuration.get("initialDelay").intValue());
-        assertEquals(1200, configuration.get("maxDelay").intValue());
-        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
-    }
-    
-    @Test
-    public void configureEmpty() throws Exception {
-        Retry retry = new Retry(15, 150, 1200, 1.2);
-        JsonNode empty = JsonNodeFactory.instance.objectNode();
-        retry.configure(empty);
-        // We would expect missing values to be replaced with the
-        // DEFAULT values rather than the previous values
-        JsonNode configuration = retry.getConfiguration();
-        assertEquals(0, configuration.get("maxRetries").intValue());
-        assertEquals(1000, configuration.get("initialDelay").intValue());
-        assertEquals(5000, configuration.get("maxDelay").intValue());
-        assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
-    }
-
-    @Test
-    public void configurePartly() throws Exception {
-        Retry retry = new Retry(15, 150, 1200, 1.2);
-        ObjectNode conf = JsonNodeFactory.instance.objectNode();
-        conf.put("maxRetries", 15);
-        conf.put("backoffFactor", 1.2);
-        retry.configure(conf);
-        // We would expect to see the new values
-        JsonNode configuration = retry.getConfiguration();
-        assertEquals(15, configuration.get("maxRetries").intValue());
-        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
-        // And the default values (not the previous values!)
-        assertEquals(1000, configuration.get("initialDelay").intValue());
-        assertEquals(5000, configuration.get("maxDelay").intValue());
-    }    
-    
-    @Test(expected=IllegalArgumentException.class)
-    public void invalidMaxRetries() throws Exception {
-        Retry retry = new Retry();
-        ObjectNode conf = JsonNodeFactory.instance.objectNode();
-        conf.put("maxRetries", -15);
-        retry.configure(conf);
-    }
-
-    @Test(expected=IllegalArgumentException.class)
-    public void invalidInitialDelay() throws Exception {
-        Retry retry = new Retry();
-        ObjectNode conf = JsonNodeFactory.instance.objectNode();
-        conf.put("initialDelay", -15);
-        retry.configure(conf);
-    }
-    
-    @Test(expected=IllegalArgumentException.class)
-    public void invalidMaxDelay() throws Exception {
-        Retry retry = new Retry();
-        ObjectNode conf = JsonNodeFactory.instance.objectNode();
-        conf.put("maxDelay", 150);
-        // Valid on its own, but less than the default initialDelay of 1000!
-        retry.configure(conf);
-    }
-
-    
-    @Test
-    public void invalidConfigureRecovers() throws Exception {
-        Retry retry = new Retry(15, 150, 1200, 1.2);
-        ObjectNode conf = JsonNodeFactory.instance.objectNode();
-        conf.put("maxRetries", -15);
-        try { 
-            retry.configure(conf);
-        } catch (IllegalArgumentException ex) {
-            // As expected
-        }
-        // We would expect the earlier values to persist
-        JsonNode configuration = retry.getConfiguration();
-        assertEquals(15, configuration.get("maxRetries").intValue());
-        assertEquals(150, configuration.get("initialDelay").intValue());
-        assertEquals(1200, configuration.get("maxDelay").intValue());
-        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
-    }
-    
-    // TODO: Testing the Retry layer without making a big dispatch stack and job context
-    
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/.gitignore
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/.gitignore b/taverna-workflowmodel-extensions/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/taverna-workflowmodel-extensions/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/pom.xml b/taverna-workflowmodel-extensions/pom.xml
new file mode 100644
index 0000000..4f1be7a
--- /dev/null
+++ b/taverna-workflowmodel-extensions/pom.xml
@@ -0,0 +1,30 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+    <parent>
+			<groupId>org.apache.taverna.engine</groupId>
+			<artifactId>taverna-engine</artifactId>
+			<version>3.1.0-incubating-SNAPSHOT</version>
+    </parent>
+	<artifactId>taverna-workflowmodel-extensions</artifactId>
+	<packaging>bundle</packaging>
+	<name>Apache Taverna Workflow Model Extension Points</name>
+	<description>Implementation of core extension points to the workflow model</description>
+	<dependencies>
+    <dependency>
+        <groupId>${project.parent.groupId}</groupId>
+        <artifactId>taverna-workflowmodel-api</artifactId>
+        <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${jackson.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>junit</groupId>
+        <artifactId>junit</artifactId>
+        <version>${junit.version}</version>
+        <scope>test</scope>
+    </dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
new file mode 100644
index 0000000..0b11627
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Copyright (C) 2011 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Factory for creating core dispatch layers.
+ *
+ * The core dispatch layers are :
+ * <ul>
+ * <li>ErrorBounce</li>
+ * <li>Parallelize</li>
+ * <li>Failover</li>
+ * <li>Retry</li>
+ * <li>Stop</li>
+ * <li>Invoke</li>
+ * <li>Loop</li>
+ * <li>IntermediateProvenance</li>
+ * </ul>
+ *
+ * @author David Withers
+ */
+public class CoreDispatchLayerFactory implements DispatchLayerFactory {
+	private static final URI parallelizeLayer = URI.create(Parallelize.URI);
+	private static final URI errorBounceLayer = URI.create(ErrorBounce.URI);
+	private static final URI failoverLayer = URI.create(Failover.URI);
+	private static final URI retryLayer = URI.create(Retry.URI);
+	private static final URI invokeLayer = URI.create(Invoke.URI);
+	private static final URI loopLayer = URI.create(Loop.URI);
+	private static final URI intermediateProvenanceLayer = URI.create(IntermediateProvenance.URI);
+	private static final URI stopLayer = URI.create(Stop.URI);
+
+	private final static Set<URI> dispatchLayerURIs = new HashSet<URI>();
+
+	static {
+		dispatchLayerURIs.add(parallelizeLayer);
+		dispatchLayerURIs.add(errorBounceLayer);
+		dispatchLayerURIs.add(failoverLayer);
+		dispatchLayerURIs.add(retryLayer);
+		dispatchLayerURIs.add(invokeLayer);
+		dispatchLayerURIs.add(loopLayer);
+		dispatchLayerURIs.add(intermediateProvenanceLayer);
+		dispatchLayerURIs.add(stopLayer);
+	}
+
+	@Override
+	public DispatchLayer<?> createDispatchLayer(URI uri) {
+		if (parallelizeLayer.equals(uri))
+			return new Parallelize();
+		else if (errorBounceLayer.equals(uri))
+			return new ErrorBounce();
+		else if (failoverLayer.equals(uri))
+			return new Failover();
+		else if (retryLayer.equals(uri))
+			return new Retry();
+		else if (invokeLayer.equals(uri))
+			return new Invoke();
+		else if (loopLayer.equals(uri))
+			return new Loop();
+		else if (intermediateProvenanceLayer.equals(uri))
+			return new IntermediateProvenance();
+		else if (stopLayer.equals(uri))
+			return new Stop();
+		return null;
+	}
+
+	@Override
+	public JsonNode getDispatchLayerConfigurationSchema(URI uri) {
+		// TODO
+		return null;
+	}
+
+	@Override
+	public Set<URI> getDispatchLayerTypes() {
+		return dispatchLayerURIs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
new file mode 100644
index 0000000..dfde240
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
@@ -0,0 +1,324 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import net.sf.taverna.t2.invocation.Event;
+import net.sf.taverna.t2.monitor.MonitorableProperty;
+import net.sf.taverna.t2.monitor.NoSuchPropertyException;
+import net.sf.taverna.t2.reference.ErrorDocument;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.reference.T2Reference;
+import net.sf.taverna.t2.workflowmodel.OutputPort;
+import net.sf.taverna.t2.workflowmodel.Processor;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+/**
+ * Receives job events, checks to see whether any parameters in the job are
+ * error tokens or collections which contain errors. If so then sends a
+ * corresponding result message back where all outputs are error tokens having
+ * registered such with the invocation context's data manager. It also re-writes
+ * any failure messages as result messages containing error tokens at the
+ * appropriate depth - this means that it must be placed above any error
+ * handling layers in order for those to have an effect at all. In general this
+ * layer should be placed immediately below the parallelize layer in most
+ * default cases (this will guarantee the processor never sees a failure message
+ * though, which may or may not be desirable)
+ * 
+ * @author Tom Oinn
+ * 
+ */
+@DispatchLayerErrorReaction(emits = { RESULT }, relaysUnmodified = false, stateEffects = {
+		CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE })
+@DispatchLayerJobReaction(emits = { RESULT }, relaysUnmodified = true, stateEffects = {
+		CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE, NO_EFFECT })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
+@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
+@SupportsStreamedResult
+public class ErrorBounce extends AbstractDispatchLayer<JsonNode> implements
+		PropertyContributingDispatchLayer<JsonNode> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce";
+
+	/**
+	 * Track the number of reflected and translated errors handled by this error
+	 * bounce instance
+	 */
+	private Map<String, ErrorBounceState> state = new ConcurrentHashMap<>();
+
+	private int totalTranslatedErrors = 0;
+	private int totalReflectedErrors = 0;
+
+	private synchronized ErrorBounceState getState(String owningProcess) {
+		if (state.containsKey(owningProcess))
+			return state.get(owningProcess);
+		ErrorBounceState ebs = new ErrorBounceState();
+		state.put(owningProcess, ebs);
+		return ebs;
+	}
+
+	/**
+	 * If the job contains errors, or collections which contain errors
+	 * themselves then bounce a result message with error documents in back up
+	 * to the layer above
+	 */
+	@Override
+	public void receiveJob(DispatchJobEvent jobEvent) {
+		Set<T2Reference> errorReferences = new HashSet<>();
+		for (T2Reference ei : jobEvent.getData().values())
+			if (ei.containsErrors())
+				errorReferences.add(ei);
+		if (errorReferences.isEmpty())
+			// relay the message down...
+			getBelow().receiveJob(jobEvent);
+		else {
+			getState(jobEvent.getOwningProcess()).incrementErrorsReflected();
+			sendErrorOutput(jobEvent, null, errorReferences);
+		}
+	}
+
+	/**
+	 * Always send the error document job result on receiving a failure, at
+	 * least for now! This should be configurable, in effect this is the part
+	 * that ensures the processor never sees a top level failure.
+	 */
+	@Override
+	public void receiveError(DispatchErrorEvent errorEvent) {
+		getState(errorEvent.getOwningProcess()).incrementErrorsTranslated();
+		sendErrorOutput(errorEvent, errorEvent.getCause(), null);
+	}
+
+	/**
+	 * Construct and send a new result message with error documents in place of
+	 * all outputs at the appropriate depth
+	 * 
+	 * @param event
+	 * @param cause
+	 * @param errorReferences
+	 */
+	private void sendErrorOutput(Event<?> event, Throwable cause,
+			Set<T2Reference> errorReferences) {
+		ReferenceService rs = event.getContext().getReferenceService();
+
+		Processor p = dispatchStack.getProcessor();
+		Map<String, T2Reference> outputDataMap = new HashMap<>();
+		String[] owningProcessArray = event.getOwningProcess().split(":");
+		String processor = owningProcessArray[owningProcessArray.length - 1];
+		for (OutputPort op : p.getOutputPorts()) {
+			String message = "Processor '" + processor + "' - Port '"
+					+ op.getName() + "'";
+			if (event instanceof DispatchErrorEvent)
+				message += ": " + ((DispatchErrorEvent) event).getMessage();
+			ErrorDocument ed;
+			if (cause != null)
+				ed = rs.getErrorDocumentService().registerError(message, cause,
+						op.getDepth(), event.getContext());
+			else
+				ed = rs.getErrorDocumentService().registerError(message,
+						errorReferences, op.getDepth(), event.getContext());
+			outputDataMap.put(op.getName(), ed.getId());
+		}
+		DispatchResultEvent dre = new DispatchResultEvent(
+				event.getOwningProcess(), event.getIndex(), event.getContext(),
+				outputDataMap, false);
+		getAbove().receiveResult(dre);
+	}
+
+	@Override
+	public void configure(JsonNode config) {
+		// Do nothing - no configuration required
+	}
+
+	@Override
+	public JsonNode getConfiguration() {
+		// Layer has no configuration associated
+		return null;
+	}
+
+	@Override
+	public void finishedWith(final String owningProcess) {
+		/*
+		 * Delay the removal of the state to give the monitor a chance to poll
+		 */
+		cleanupTimer.schedule(new TimerTask() {
+			@Override
+			public void run() {
+				state.remove(owningProcess);
+			}
+		}, CLEANUP_DELAY_MS);
+	}
+
+	/**
+	 * Two properties, dispatch.errorbounce.reflected(integer) is the number of
+	 * incoming jobs which have been bounced back as results with errors,
+	 * dispatch.errorbounce.translated(integer) is the number of failures from
+	 * downstream in the stack that have been re-written as complete results
+	 * containing error documents.
+	 */
+	@Override
+	public void injectPropertiesFor(final String owningProcess) {
+		MonitorableProperty<Integer> errorsReflectedProperty = new MonitorableProperty<Integer>() {
+			@Override
+			public Date getLastModified() {
+				return new Date();
+			}
+
+			@Override
+			public String[] getName() {
+				return new String[] { "dispatch", "errorbounce", "reflected" };
+			}
+
+			@Override
+			public Integer getValue() throws NoSuchPropertyException {
+				ErrorBounceState ebs = state.get(owningProcess);
+				if (ebs == null)
+					return 0;
+				return ebs.getErrorsReflected();
+			}
+		};
+		dispatchStack.receiveMonitorableProperty(errorsReflectedProperty,
+				owningProcess);
+
+		MonitorableProperty<Integer> errorsTranslatedProperty = new MonitorableProperty<Integer>() {
+			@Override
+			public Date getLastModified() {
+				return new Date();
+			}
+
+			@Override
+			public String[] getName() {
+				return new String[] { "dispatch", "errorbounce", "translated" };
+			}
+
+			@Override
+			public Integer getValue() throws NoSuchPropertyException {
+				ErrorBounceState ebs = state.get(owningProcess);
+				if (ebs == null)
+					return 0;
+				return ebs.getErrorsTranslated();
+			}
+		};
+		dispatchStack.receiveMonitorableProperty(errorsTranslatedProperty,
+				owningProcess);
+
+		MonitorableProperty<Integer> totalTranslatedTranslatedProperty = new MonitorableProperty<Integer>() {
+			@Override
+			public Date getLastModified() {
+				return new Date();
+			}
+
+			@Override
+			public String[] getName() {
+				return new String[] { "dispatch", "errorbounce",
+						"totalTranslated" };
+			}
+
+			@Override
+			public Integer getValue() throws NoSuchPropertyException {
+				return totalTranslatedErrors;
+			}
+		};
+		dispatchStack.receiveMonitorableProperty(
+				totalTranslatedTranslatedProperty, owningProcess);
+
+		MonitorableProperty<Integer> totalReflectedTranslatedProperty = new MonitorableProperty<Integer>() {
+			@Override
+			public Date getLastModified() {
+				return new Date();
+			}
+
+			@Override
+			public String[] getName() {
+				return new String[] { "dispatch", "errorbounce",
+						"totalReflected" };
+			}
+
+			@Override
+			public Integer getValue() throws NoSuchPropertyException {
+				return totalReflectedErrors;
+			}
+		};
+		dispatchStack.receiveMonitorableProperty(
+				totalReflectedTranslatedProperty, owningProcess);
+	}
+
+	public class ErrorBounceState {
+		private int errorsReflected = 0;
+		private int errorsTranslated = 0;
+
+		/**
+		 * Number of times the bounce layer has converted an incoming job event
+		 * where the input data contained error tokens into a result event
+		 * containing all errors.
+		 */
+		int getErrorsReflected() {
+			return this.errorsReflected;
+		}
+
+		/**
+		 * Number of times the bounce layer has converted an incoming failure
+		 * event into a result containing error tokens
+		 */
+		int getErrorsTranslated() {
+			return errorsTranslated;
+		}
+
+		void incrementErrorsReflected() {
+			synchronized (this) {
+				errorsReflected++;
+			}
+			synchronized (ErrorBounce.this) {
+				totalReflectedErrors++;
+			}
+		}
+
+		void incrementErrorsTranslated() {
+			synchronized (this) {
+				errorsTranslated++;
+			}
+			synchronized (ErrorBounce.this) {
+				totalTranslatedErrors++;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
new file mode 100644
index 0000000..1c5ef03
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Failure handling dispatch layer, consumes job events with multiple activities
+ * and emits the same job but with only the first activity. On failures the job
+ * is resent to the layer below with a new activity list containing the second
+ * in the original list and so on. If a failure is received and there are no
+ * further activities to use the job fails and the failure is sent back up to
+ * the layer above.
+ * 
+ * @author Tom Oinn
+ * @author Stian Soiland-Reyes
+ */
+@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
+		UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
+@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
+public class Failover extends AbstractErrorHandlerLayer<JsonNode> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover";
+
+	@Override
+	protected JobState getStateObject(DispatchJobEvent jobEvent) {
+		return new FailoverState(jobEvent);
+	}
+
+	/**
+	 * Receive a job from the layer above, store it in the state map then relay
+	 * it to the layer below with a modified activity list containing only the
+	 * activity at index 0
+	 */
+	@Override
+	public void receiveJob(DispatchJobEvent jobEvent) {
+		addJobToStateList(jobEvent);
+		List<Activity<?>> newActivityList = new ArrayList<>();
+		newActivityList.add(jobEvent.getActivities().get(0));
+		getBelow().receiveJob(
+				new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
+						.getIndex(), jobEvent.getContext(), jobEvent.getData(),
+						newActivityList));
+	}
+
+	class FailoverState extends JobState {
+		int currentActivityIndex = 0;
+
+		public FailoverState(DispatchJobEvent jobEvent) {
+			super(jobEvent);
+		}
+
+		@Override
+		public boolean handleError() {
+			currentActivityIndex++;
+			if (currentActivityIndex == jobEvent.getActivities().size())
+				return false;
+			List<Activity<?>> newActivityList = new ArrayList<>();
+			newActivityList.add(jobEvent.getActivities().get(
+					currentActivityIndex));
+			getBelow().receiveJob(
+					new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
+							.getIndex(), jobEvent.getContext(), jobEvent
+							.getData(), newActivityList));
+			return true;
+		}
+	}
+
+	@Override
+	public void configure(JsonNode config) {
+		// Do nothing - there is no configuration to do
+	}
+
+	@Override
+	public JsonNode getConfiguration() {
+		return null;
+	}
+}