You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/27 16:26:48 UTC
[1/6] incubator-taverna-engine git commit: simpler names in pom.xml
Repository: incubator-taverna-engine
Updated Branches:
refs/heads/master 44a6bb386 -> 315a829c3
simpler names in pom.xml
Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/commit/46cc1539
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/tree/46cc1539
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/diff/46cc1539
Branch: refs/heads/master
Commit: 46cc1539242308e966eb6c9d381922edc226127f
Parents: 44a6bb3
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Mon Feb 23 15:45:41 2015 +0000
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Mon Feb 23 15:45:41 2015 +0000
----------------------------------------------------------------------
taverna-capability-impl/pom.xml | 2 +-
taverna-credential-manager-impl/pom.xml | 2 +-
taverna-database-configuration-impl/pom.xml | 2 +-
taverna-execution-impl/pom.xml | 2 +-
taverna-observer/pom.xml | 2 +-
taverna-reference-impl/pom.xml | 2 +-
taverna-reference-testhelpers/pom.xml | 2 +-
taverna-run-impl/pom.xml | 2 +-
taverna-services-impl/pom.xml | 2 +-
taverna-workflowmodel-core-extensions/pom.xml | 2 +-
taverna-workflowmodel-impl/pom.xml | 2 +-
11 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-capability-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-capability-impl/pom.xml b/taverna-capability-impl/pom.xml
index 14c6212..c4fa489 100644
--- a/taverna-capability-impl/pom.xml
+++ b/taverna-capability-impl/pom.xml
@@ -7,7 +7,7 @@
</parent>
<artifactId>taverna-capability-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Platform Capability Implementation</name>
+ <name>Apache Taverna Platform Capability impl</name>
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-credential-manager-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-credential-manager-impl/pom.xml b/taverna-credential-manager-impl/pom.xml
index ab1536c..ddd5409 100644
--- a/taverna-credential-manager-impl/pom.xml
+++ b/taverna-credential-manager-impl/pom.xml
@@ -8,7 +8,7 @@
</parent>
<artifactId>taverna-credential-manager-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Credential Manager Implementation</name>
+ <name>Apache Taverna Credential Manager impl</name>
<profiles>
<profile>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-database-configuration-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-database-configuration-impl/pom.xml b/taverna-database-configuration-impl/pom.xml
index 07bd9b7..39034f4 100644
--- a/taverna-database-configuration-impl/pom.xml
+++ b/taverna-database-configuration-impl/pom.xml
@@ -7,7 +7,7 @@
</parent>
<artifactId>taverna-database-configuration-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Database Configuration implementation</name>
+ <name>Apache Taverna Database Configuration impl</name>
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-execution-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-execution-impl/pom.xml b/taverna-execution-impl/pom.xml
index ec7a6b0..3c1f0e8 100644
--- a/taverna-execution-impl/pom.xml
+++ b/taverna-execution-impl/pom.xml
@@ -7,7 +7,7 @@
</parent>
<artifactId>taverna-execution-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Platform Execution Service Implementation</name>
+ <name>Apache Taverna Platform Execution Service impl</name>
<description>A Service for executing Taverna workflows</description>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-observer/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-observer/pom.xml b/taverna-observer/pom.xml
index 1ec7d62..4b94a2e 100644
--- a/taverna-observer/pom.xml
+++ b/taverna-observer/pom.xml
@@ -9,7 +9,7 @@
<artifactId>taverna-observer</artifactId>
<packaging>bundle</packaging>
<name>Apache Taverna Observer pattern</name>
- <description>Implementation of the Observer pattern</description>
+ <description>impl of the Observer pattern</description>
<dependencies>
<dependency>
<groupId>log4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-reference-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-reference-impl/pom.xml b/taverna-reference-impl/pom.xml
index 0eb9e8c..98f3b4d 100644
--- a/taverna-reference-impl/pom.xml
+++ b/taverna-reference-impl/pom.xml
@@ -8,7 +8,7 @@
</parent>
<artifactId>taverna-reference-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Reference Manager Implementation</name>
+ <name>Apache Taverna Reference Manager impl</name>
<description>
Implementations of the core APIs, not including extension point
implementations.
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-reference-testhelpers/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-reference-testhelpers/pom.xml b/taverna-reference-testhelpers/pom.xml
index 0f99adf..a9730e3 100644
--- a/taverna-reference-testhelpers/pom.xml
+++ b/taverna-reference-testhelpers/pom.xml
@@ -15,7 +15,7 @@
we need a module that is entire external
to t2reference and to the test cases. If the test
implementations are included in either the api, core
- implementation or test modules they will be loaded by the root
+ implementations or test modules they will be loaded by the root
classloader of the test runner - by putting them in an
independent artifact we allow them to be loaded through
various SPI discovery mechanisms as they would be in a 'real'
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-run-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-run-impl/pom.xml b/taverna-run-impl/pom.xml
index a451f6c..80429a1 100644
--- a/taverna-run-impl/pom.xml
+++ b/taverna-run-impl/pom.xml
@@ -7,7 +7,7 @@
</parent>
<artifactId>taverna-run-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Platform Run Service Implementation</name>
+ <name>Apache Taverna Platform Run Service impl</name>
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-services-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-services-impl/pom.xml b/taverna-services-impl/pom.xml
index c975a97..a588e4d 100644
--- a/taverna-services-impl/pom.xml
+++ b/taverna-services-impl/pom.xml
@@ -7,7 +7,7 @@
</parent>
<artifactId>taverna-services-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Platform Services Implementation</name>
+ <name>Apache Taverna Platform Services impl</name>
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-workflowmodel-core-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/pom.xml b/taverna-workflowmodel-core-extensions/pom.xml
index b19714f..700e56d 100644
--- a/taverna-workflowmodel-core-extensions/pom.xml
+++ b/taverna-workflowmodel-core-extensions/pom.xml
@@ -7,7 +7,7 @@
</parent>
<artifactId>taverna-workflowmodel-core-extensions</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Workflow Model Core Extension Points</name>
+ <name>Apache Taverna Workflow Model Extension Points</name>
<description>Implementation of core extension points to the workflow model</description>
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/46cc1539/taverna-workflowmodel-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/pom.xml b/taverna-workflowmodel-impl/pom.xml
index 7231e9d..93dfea1 100644
--- a/taverna-workflowmodel-impl/pom.xml
+++ b/taverna-workflowmodel-impl/pom.xml
@@ -8,7 +8,7 @@
</parent>
<artifactId>taverna-workflowmodel-impl</artifactId>
<packaging>bundle</packaging>
- <name>Apache Taverna Workflow Model implementation</name>
+ <name>Apache Taverna Workflow Model impl</name>
<description> Implementation of the core workflow object model for
Taverna workflows including
concrete instances of the workflow
[2/6] incubator-taverna-engine git commit:
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
new file mode 100644
index 0000000..3169f8c
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
@@ -0,0 +1,163 @@
+/**
+ *
+ */
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.sf.taverna.t2.reference.WorkflowRunIdEntity;
+import net.sf.taverna.t2.workflowmodel.ConfigurationException;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * This layer allows for the cancellation, pausing and resuming of workflow
+ * runs. It does so by intercepting jobs sent to the layer.
+ *
+ * @author alanrw
+ */
+public class Stop extends AbstractDispatchLayer<JsonNode> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop";
+ /**
+ * The set of ids of workflow runs that have been cancelled.
+ */
+ private static Set<String> cancelledWorkflowRuns = new HashSet<>();
+ /**
+ * A map from workflow run ids to the set of Stop layers where jobs have
+ * been intercepted for that run.
+ */
+ private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<>();
+ /**
+ * A map for a given Stop from ids of suspended workflow runs to the jobs
+ * that have been intercepted.
+ */
+ private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<>();
+
+ @Override
+ public void configure(JsonNode conf) throws ConfigurationException {
+ // nothing
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void receiveJob(final DispatchJobEvent jobEvent) {
+ List<WorkflowRunIdEntity> entities = jobEvent.getContext().getEntities(
+ WorkflowRunIdEntity.class);
+ if (entities != null && !entities.isEmpty()) {
+ final String wfRunId = entities.get(0).getWorkflowRunId();
+ // If the workflow run is cancelled then simply "eat" the jobEvent.
+ // This does a hard-cancel.
+ if (cancelledWorkflowRuns.contains(wfRunId))
+ return;
+ // If the workflow run is paused
+ if (pausedLayerMap.containsKey(wfRunId))
+ synchronized (Stop.class) {
+ // double check as pausedLayerMap may have been changed
+ // waiting for the lock
+ if (pausedLayerMap.containsKey(wfRunId)) {
+ // Remember that this Stop layer was affected by the
+ // workflow pause
+ pausedLayerMap.get(wfRunId).add(this);
+ if (!suspendedJobEventMap.containsKey(wfRunId))
+ suspendedJobEventMap.put(wfRunId,
+ new HashSet<DispatchJobEvent>());
+ // Remember the suspended jobEvent
+ suspendedJobEventMap.get(wfRunId).add(jobEvent);
+ return;
+ }
+ }
+ }
+ // By default pass the jobEvent down to the next layer
+ super.receiveJob(jobEvent);
+ }
+
+ @Override
+ public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+ super.receiveJobQueue(jobQueueEvent);
+ }
+
+ /**
+ * Cancel the workflow run with the specified id
+ *
+ * @param workflowRunId
+ * The id of the workflow run to cancel
+ * @return If the workflow run was cancelled then true. If it was already
+ * cancelled then false.
+ */
+ public static synchronized boolean cancelWorkflow(String workflowRunId) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
+ return false;
+ Set<String> cancelledWorkflowRunsCopy = new HashSet<>(
+ cancelledWorkflowRuns);
+ cancelledWorkflowRunsCopy.add(workflowRunId);
+ cancelledWorkflowRuns = cancelledWorkflowRunsCopy;
+ return true;
+ }
+
+ /**
+ * Pause the workflow run with the specified id
+ *
+ * @param workflowRunId
+ * The id of the workflow run to pause
+ * @return If the workflow run was paused then true. If it was already
+ * paused or cancelled then false.
+ */
+ public static synchronized boolean pauseWorkflow(String workflowRunId) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
+ return false;
+ if (pausedLayerMap.containsKey(workflowRunId))
+ return false;
+ Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>(pausedLayerMap);
+ pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
+ pausedLayerMap = pausedLayerMapCopy;
+ return true;
+ }
+
+ /**
+ * Resume the workflow run with the specified id
+ *
+ * @param workflowRunId
+ * The id of the workflow run to resume
+ * @return If the workflow run was resumed then true. If the workflow run
+ * was not paused or it was cancelled, then false.
+ */
+ public static synchronized boolean resumeWorkflow(String workflowRunId) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
+ return false;
+ if (!pausedLayerMap.containsKey(workflowRunId))
+ return false;
+ Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>();
+ pausedLayerMapCopy.putAll(pausedLayerMap);
+ Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
+ pausedLayerMap = pausedLayerMapCopy;
+ for (Stop s : stops)
+ s.resumeLayerWorkflow(workflowRunId);
+ return true;
+ }
+
+ /**
+ * Resume the workflow run with the specified id on this Stop layer. This
+ * method processes any suspended job events.
+ *
+ * @param workflowRunId
+ * The id of the workflow run to resume.
+ */
+ private void resumeLayerWorkflow(String workflowRunId) {
+ synchronized (Stop.class) {
+ for (DispatchJobEvent dje : suspendedJobEventMap
+ .remove(workflowRunId))
+ receiveJob(dje);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
new file mode 100644
index 0000000..fe6e73f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
@@ -0,0 +1,4 @@
+<body>
+Contains implementations of DispatchLayer defined by the core Taverna 2
+specification.
+</body>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
new file mode 100644
index 0000000..dc55fc7
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans:beans xmlns="http://www.springframework.org/schema/osgi" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:beans="http://www.springframework.org/schema/beans"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/osgi
+ http://www.springframework.org/schema/osgi/spring-osgi.xsd" >
+
+ <service ref="coreDispatchLayerFactory" interface="net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory" />
+
+</beans:beans>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
new file mode 100644
index 0000000..90ed75f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <bean id="coreDispatchLayerFactory" class="net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.CoreDispatchLayerFactory" />
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
new file mode 100644
index 0000000..e202df1
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
@@ -0,0 +1,110 @@
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import static org.junit.Assert.*;
+
+public class TestRetry {
+
+ @Test
+ public void defaultConfig() throws Exception {
+ Retry retry = new Retry();
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(0, configuration.get("maxRetries").intValue());
+ assertEquals(1000, configuration.get("initialDelay").intValue());
+ assertEquals(5000, configuration.get("maxDelay").intValue());
+ assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ @Test
+ public void customConfig() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(15, configuration.get("maxRetries").intValue());
+ assertEquals(150, configuration.get("initialDelay").intValue());
+ assertEquals(1200, configuration.get("maxDelay").intValue());
+ assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ @Test
+ public void configureEmpty() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ JsonNode empty = JsonNodeFactory.instance.objectNode();
+ retry.configure(empty);
+ // We would expect missing values to be replaced with the
+ // DEFAULT values rather than the previous values
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(0, configuration.get("maxRetries").intValue());
+ assertEquals(1000, configuration.get("initialDelay").intValue());
+ assertEquals(5000, configuration.get("maxDelay").intValue());
+ assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ @Test
+ public void configurePartly() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxRetries", 15);
+ conf.put("backoffFactor", 1.2);
+ retry.configure(conf);
+ // We would expect to see the new values
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(15, configuration.get("maxRetries").intValue());
+ assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+ // And the default values (not the previous values!)
+ assertEquals(1000, configuration.get("initialDelay").intValue());
+ assertEquals(5000, configuration.get("maxDelay").intValue());
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void invalidMaxRetries() throws Exception {
+ Retry retry = new Retry();
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxRetries", -15);
+ retry.configure(conf);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void invalidInitialDelay() throws Exception {
+ Retry retry = new Retry();
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("initialDelay", -15);
+ retry.configure(conf);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void invalidMaxDelay() throws Exception {
+ Retry retry = new Retry();
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxDelay", 150);
+ // Valid on its own, but less than the default initialDelay of 1000!
+ retry.configure(conf);
+ }
+
+
+ @Test
+ public void invalidConfigureRecovers() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxRetries", -15);
+ try {
+ retry.configure(conf);
+ } catch (IllegalArgumentException ex) {
+ // As expected
+ }
+ // We would expect the earlier values to persist
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(15, configuration.get("maxRetries").intValue());
+ assertEquals(150, configuration.get("initialDelay").intValue());
+ assertEquals(1200, configuration.get("maxDelay").intValue());
+ assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ // TODO: Testing the Retry layer without making a big dispatch stack and job context
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/pom.xml b/taverna-workflowmodel-impl/pom.xml
index 93dfea1..97367e5 100644
--- a/taverna-workflowmodel-impl/pom.xml
+++ b/taverna-workflowmodel-impl/pom.xml
@@ -50,7 +50,7 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>taverna-workflowmodel-core-extensions</artifactId>
+ <artifactId>taverna-workflowmodel-extensions</artifactId>
<version>${project.parent.version}</version>
<!--<scope>test</scope> -->
</dependency>
[5/6] incubator-taverna-engine git commit:
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
Posted by st...@apache.org.
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/commit/4252fa90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/tree/4252fa90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/diff/4252fa90
Branch: refs/heads/master
Commit: 4252fa90112703905bed89dc4b46f59a77b87e9c
Parents: 46cc153
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Mon Feb 23 15:48:10 2015 +0000
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Mon Feb 23 15:48:10 2015 +0000
----------------------------------------------------------------------
pom.xml | 2 +-
taverna-platform-integration-tests/pom.xml | 2 +-
.../.gitignore | 1 -
taverna-workflowmodel-core-extensions/pom.xml | 30 --
.../layers/CoreDispatchLayerFactory.java | 103 ----
.../processor/dispatch/layers/ErrorBounce.java | 324 ------------
.../processor/dispatch/layers/Failover.java | 111 ----
.../dispatch/layers/IntermediateProvenance.java | 508 -------------------
.../processor/dispatch/layers/Invoke.java | 369 --------------
.../processor/dispatch/layers/Loop.java | 424 ----------------
.../dispatch/layers/LoopConfiguration.java | 75 ---
.../processor/dispatch/layers/Parallelize.java | 463 -----------------
.../dispatch/layers/ParallelizeConfig.java | 50 --
.../processor/dispatch/layers/Retry.java | 180 -------
.../processor/dispatch/layers/RetryConfig.java | 97 ----
.../processor/dispatch/layers/Stop.java | 163 ------
.../processor/dispatch/layers/package.html | 4 -
...rkflowmodel-core-extensions-context-osgi.xml | 11 -
.../workflowmodel-core-extensions-context.xml | 9 -
.../processor/dispatch/layers/TestRetry.java | 110 ----
taverna-workflowmodel-extensions/.gitignore | 1 +
taverna-workflowmodel-extensions/pom.xml | 30 ++
.../layers/CoreDispatchLayerFactory.java | 103 ++++
.../processor/dispatch/layers/ErrorBounce.java | 324 ++++++++++++
.../processor/dispatch/layers/Failover.java | 111 ++++
.../dispatch/layers/IntermediateProvenance.java | 508 +++++++++++++++++++
.../processor/dispatch/layers/Invoke.java | 369 ++++++++++++++
.../processor/dispatch/layers/Loop.java | 424 ++++++++++++++++
.../dispatch/layers/LoopConfiguration.java | 75 +++
.../processor/dispatch/layers/Parallelize.java | 463 +++++++++++++++++
.../dispatch/layers/ParallelizeConfig.java | 50 ++
.../processor/dispatch/layers/Retry.java | 180 +++++++
.../processor/dispatch/layers/RetryConfig.java | 97 ++++
.../processor/dispatch/layers/Stop.java | 163 ++++++
.../processor/dispatch/layers/package.html | 4 +
...rkflowmodel-core-extensions-context-osgi.xml | 11 +
.../workflowmodel-core-extensions-context.xml | 9 +
.../processor/dispatch/layers/TestRetry.java | 110 ++++
taverna-workflowmodel-impl/pom.xml | 2 +-
39 files changed, 3035 insertions(+), 3035 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index daef4fd..001d17c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<module>taverna-services-impl</module>
<module>taverna-stringconstant-activity</module>
<module>taverna-workflowmodel-api</module>
- <module>taverna-workflowmodel-core-extensions</module>
+ <module>taverna-workflowmodel-extensions</module>
<module>taverna-workflowmodel-impl</module>
</modules>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-platform-integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-platform-integration-tests/pom.xml b/taverna-platform-integration-tests/pom.xml
index 362fbdc..a958d1c 100644
--- a/taverna-platform-integration-tests/pom.xml
+++ b/taverna-platform-integration-tests/pom.xml
@@ -246,7 +246,7 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>taverna-workflowmodel-core-extensions</artifactId>
+ <artifactId>taverna-workflowmodel-extensions</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/.gitignore
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/.gitignore b/taverna-workflowmodel-core-extensions/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/taverna-workflowmodel-core-extensions/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/pom.xml b/taverna-workflowmodel-core-extensions/pom.xml
deleted file mode 100644
index 700e56d..0000000
--- a/taverna-workflowmodel-core-extensions/pom.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.taverna.engine</groupId>
- <artifactId>taverna-engine</artifactId>
- <version>3.1.0-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>taverna-workflowmodel-core-extensions</artifactId>
- <packaging>bundle</packaging>
- <name>Apache Taverna Workflow Model Extension Points</name>
- <description>Implementation of core extension points to the workflow model</description>
- <dependencies>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>taverna-workflowmodel-api</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
deleted file mode 100644
index 0b11627..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
-
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Factory for creating core dispatch layers.
- *
- * The core dispatch layers are :
- * <ul>
- * <li>ErrorBounce</li>
- * <li>Parallelize</li>
- * <li>Failover</li>
- * <li>Retry</li>
- * <li>Stop</li>
- * <li>Invoke</li>
- * <li>Loop</li>
- * <li>IntermediateProvenance</li>
- * </ul>
- *
- * @author David Withers
- */
-public class CoreDispatchLayerFactory implements DispatchLayerFactory {
- private static final URI parallelizeLayer = URI.create(Parallelize.URI);
- private static final URI errorBounceLayer = URI.create(ErrorBounce.URI);
- private static final URI failoverLayer = URI.create(Failover.URI);
- private static final URI retryLayer = URI.create(Retry.URI);
- private static final URI invokeLayer = URI.create(Invoke.URI);
- private static final URI loopLayer = URI.create(Loop.URI);
- private static final URI intermediateProvenanceLayer = URI.create(IntermediateProvenance.URI);
- private static final URI stopLayer = URI.create(Stop.URI);
-
- private final static Set<URI> dispatchLayerURIs = new HashSet<URI>();
-
- static {
- dispatchLayerURIs.add(parallelizeLayer);
- dispatchLayerURIs.add(errorBounceLayer);
- dispatchLayerURIs.add(failoverLayer);
- dispatchLayerURIs.add(retryLayer);
- dispatchLayerURIs.add(invokeLayer);
- dispatchLayerURIs.add(loopLayer);
- dispatchLayerURIs.add(intermediateProvenanceLayer);
- dispatchLayerURIs.add(stopLayer);
- }
-
- @Override
- public DispatchLayer<?> createDispatchLayer(URI uri) {
- if (parallelizeLayer.equals(uri))
- return new Parallelize();
- else if (errorBounceLayer.equals(uri))
- return new ErrorBounce();
- else if (failoverLayer.equals(uri))
- return new Failover();
- else if (retryLayer.equals(uri))
- return new Retry();
- else if (invokeLayer.equals(uri))
- return new Invoke();
- else if (loopLayer.equals(uri))
- return new Loop();
- else if (intermediateProvenanceLayer.equals(uri))
- return new IntermediateProvenance();
- else if (stopLayer.equals(uri))
- return new Stop();
- return null;
- }
-
- @Override
- public JsonNode getDispatchLayerConfigurationSchema(URI uri) {
- // TODO
- return null;
- }
-
- @Override
- public Set<URI> getDispatchLayerTypes() {
- return dispatchLayerURIs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
deleted file mode 100644
index dfde240..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-import net.sf.taverna.t2.reference.ErrorDocument;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-/**
- * Receives job events, checks to see whether any parameters in the job are
- * error tokens or collections which contain errors. If so then sends a
- * corresponding result message back where all outputs are error tokens having
- * registered such with the invocation context's data manager. It also re-writes
- * any failure messages as result messages containing error tokens at the
- * appropriate depth - this means that it must be placed above any error
- * handling layers in order for those to have an effect at all. In general this
- * layer should be placed immediately below the parallelize layer in most
- * default cases (this will guarantee the processor never sees a failure message
- * though, which may or may not be desirable)
- *
- * @author Tom Oinn
- *
- */
-@DispatchLayerErrorReaction(emits = { RESULT }, relaysUnmodified = false, stateEffects = {
- CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE })
-@DispatchLayerJobReaction(emits = { RESULT }, relaysUnmodified = true, stateEffects = {
- CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@SupportsStreamedResult
-public class ErrorBounce extends AbstractDispatchLayer<JsonNode> implements
- PropertyContributingDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce";
-
- /**
- * Track the number of reflected and translated errors handled by this error
- * bounce instance
- */
- private Map<String, ErrorBounceState> state = new ConcurrentHashMap<>();
-
- private int totalTranslatedErrors = 0;
- private int totalReflectedErrors = 0;
-
- private synchronized ErrorBounceState getState(String owningProcess) {
- if (state.containsKey(owningProcess))
- return state.get(owningProcess);
- ErrorBounceState ebs = new ErrorBounceState();
- state.put(owningProcess, ebs);
- return ebs;
- }
-
- /**
- * If the job contains errors, or collections which contain errors
- * themselves then bounce a result message with error documents in back up
- * to the layer above
- */
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- Set<T2Reference> errorReferences = new HashSet<>();
- for (T2Reference ei : jobEvent.getData().values())
- if (ei.containsErrors())
- errorReferences.add(ei);
- if (errorReferences.isEmpty())
- // relay the message down...
- getBelow().receiveJob(jobEvent);
- else {
- getState(jobEvent.getOwningProcess()).incrementErrorsReflected();
- sendErrorOutput(jobEvent, null, errorReferences);
- }
- }
-
- /**
- * Always send the error document job result on receiving a failure, at
- * least for now! This should be configurable, in effect this is the part
- * that ensures the processor never sees a top level failure.
- */
- @Override
- public void receiveError(DispatchErrorEvent errorEvent) {
- getState(errorEvent.getOwningProcess()).incrementErrorsTranslated();
- sendErrorOutput(errorEvent, errorEvent.getCause(), null);
- }
-
- /**
- * Construct and send a new result message with error documents in place of
- * all outputs at the appropriate depth
- *
- * @param event
- * @param cause
- * @param errorReferences
- */
- private void sendErrorOutput(Event<?> event, Throwable cause,
- Set<T2Reference> errorReferences) {
- ReferenceService rs = event.getContext().getReferenceService();
-
- Processor p = dispatchStack.getProcessor();
- Map<String, T2Reference> outputDataMap = new HashMap<>();
- String[] owningProcessArray = event.getOwningProcess().split(":");
- String processor = owningProcessArray[owningProcessArray.length - 1];
- for (OutputPort op : p.getOutputPorts()) {
- String message = "Processor '" + processor + "' - Port '"
- + op.getName() + "'";
- if (event instanceof DispatchErrorEvent)
- message += ": " + ((DispatchErrorEvent) event).getMessage();
- ErrorDocument ed;
- if (cause != null)
- ed = rs.getErrorDocumentService().registerError(message, cause,
- op.getDepth(), event.getContext());
- else
- ed = rs.getErrorDocumentService().registerError(message,
- errorReferences, op.getDepth(), event.getContext());
- outputDataMap.put(op.getName(), ed.getId());
- }
- DispatchResultEvent dre = new DispatchResultEvent(
- event.getOwningProcess(), event.getIndex(), event.getContext(),
- outputDataMap, false);
- getAbove().receiveResult(dre);
- }
-
- @Override
- public void configure(JsonNode config) {
- // Do nothing - no configuration required
- }
-
- @Override
- public JsonNode getConfiguration() {
- // Layer has no configuration associated
- return null;
- }
-
- @Override
- public void finishedWith(final String owningProcess) {
- /*
- * Delay the removal of the state to give the monitor a chance to poll
- */
- cleanupTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- state.remove(owningProcess);
- }
- }, CLEANUP_DELAY_MS);
- }
-
- /**
- * Two properties, dispatch.errorbounce.reflected(integer) is the number of
- * incoming jobs which have been bounced back as results with errors,
- * dispatch.errorbounce.translated(integer) is the number of failures from
- * downstream in the stack that have been re-written as complete results
- * containing error documents.
- */
- @Override
- public void injectPropertiesFor(final String owningProcess) {
- MonitorableProperty<Integer> errorsReflectedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce", "reflected" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- ErrorBounceState ebs = state.get(owningProcess);
- if (ebs == null)
- return 0;
- return ebs.getErrorsReflected();
- }
- };
- dispatchStack.receiveMonitorableProperty(errorsReflectedProperty,
- owningProcess);
-
- MonitorableProperty<Integer> errorsTranslatedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce", "translated" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- ErrorBounceState ebs = state.get(owningProcess);
- if (ebs == null)
- return 0;
- return ebs.getErrorsTranslated();
- }
- };
- dispatchStack.receiveMonitorableProperty(errorsTranslatedProperty,
- owningProcess);
-
- MonitorableProperty<Integer> totalTranslatedTranslatedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce",
- "totalTranslated" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- return totalTranslatedErrors;
- }
- };
- dispatchStack.receiveMonitorableProperty(
- totalTranslatedTranslatedProperty, owningProcess);
-
- MonitorableProperty<Integer> totalReflectedTranslatedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce",
- "totalReflected" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- return totalReflectedErrors;
- }
- };
- dispatchStack.receiveMonitorableProperty(
- totalReflectedTranslatedProperty, owningProcess);
- }
-
- public class ErrorBounceState {
- private int errorsReflected = 0;
- private int errorsTranslated = 0;
-
- /**
- * Number of times the bounce layer has converted an incoming job event
- * where the input data contained error tokens into a result event
- * containing all errors.
- */
- int getErrorsReflected() {
- return this.errorsReflected;
- }
-
- /**
- * Number of times the bounce layer has converted an incoming failure
- * event into a result containing error tokens
- */
- int getErrorsTranslated() {
- return errorsTranslated;
- }
-
- void incrementErrorsReflected() {
- synchronized (this) {
- errorsReflected++;
- }
- synchronized (ErrorBounce.this) {
- totalReflectedErrors++;
- }
- }
-
- void incrementErrorsTranslated() {
- synchronized (this) {
- errorsTranslated++;
- }
- synchronized (ErrorBounce.this) {
- totalTranslatedErrors++;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
deleted file mode 100644
index 1c5ef03..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Failure handling dispatch layer, consumes job events with multiple activities
- * and emits the same job but with only the first activity. On failures the job
- * is resent to the layer below with a new activity list containing the second
- * in the original list and so on. If a failure is received and there are no
- * further activities to use the job fails and the failure is sent back up to
- * the layer above.
- *
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- */
-@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
- UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
-@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
-public class Failover extends AbstractErrorHandlerLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover";
-
- @Override
- protected JobState getStateObject(DispatchJobEvent jobEvent) {
- return new FailoverState(jobEvent);
- }
-
- /**
- * Receive a job from the layer above, store it in the state map then relay
- * it to the layer below with a modified activity list containing only the
- * activity at index 0
- */
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- addJobToStateList(jobEvent);
- List<Activity<?>> newActivityList = new ArrayList<>();
- newActivityList.add(jobEvent.getActivities().get(0));
- getBelow().receiveJob(
- new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
- .getIndex(), jobEvent.getContext(), jobEvent.getData(),
- newActivityList));
- }
-
- class FailoverState extends JobState {
- int currentActivityIndex = 0;
-
- public FailoverState(DispatchJobEvent jobEvent) {
- super(jobEvent);
- }
-
- @Override
- public boolean handleError() {
- currentActivityIndex++;
- if (currentActivityIndex == jobEvent.getActivities().size())
- return false;
- List<Activity<?>> newActivityList = new ArrayList<>();
- newActivityList.add(jobEvent.getActivities().get(
- currentActivityIndex));
- getBelow().receiveJob(
- new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
- .getIndex(), jobEvent.getContext(), jobEvent
- .getData(), newActivityList));
- return true;
- }
- }
-
- @Override
- public void configure(JsonNode config) {
- // Do nothing - there is no configuration to do
- }
-
- @Override
- public JsonNode getConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
deleted file mode 100644
index 718079a..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static java.lang.System.currentTimeMillis;
-
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-/**
- * Sits above the {@link Invoke} layer and collects information about the
- * current workflow run to be stored by the {@link ProvenanceConnector}.
- *
- * @author Ian Dunlop
- * @author Stian Soiland-Reyes
- *
- */
-public class IntermediateProvenance extends AbstractDispatchLayer<String> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
- private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
-
- private ProvenanceReporter reporter;
- private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
- private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
- private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
-
- // private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
- // private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
-
- private WorkflowProvenanceItem workflowItem;
-
- @Override
- public void configure(String o) {
- }
-
- /**
- * A set of provenance events for a particular owning process has been
- * finished with so you can remove all the {@link IterationProvenanceItem}s
- * from the map
- */
- @Override
- public void finishedWith(String owningProcess) {
- processToIndexes.remove(owningProcess);
- }
-
- @Override
- public String getConfiguration() {
- return null;
- }
-
- protected Map<String, IterationProvenanceItem> getIndexesByProcess(
- String owningProcess) {
- synchronized (processToIndexes) {
- Map<String, IterationProvenanceItem> indexes = processToIndexes
- .get(owningProcess);
- if (indexes == null) {
- indexes = new HashMap<>();
- processToIndexes.put(owningProcess, indexes);
- }
- return indexes;
- }
- }
-
- protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
- String owningProcess = event.getOwningProcess();
- int[] originalIndex = event.getIndex();
- int[] index = event.getIndex();
- String indexStr = indexStr(index);
- Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
- IterationProvenanceItem iterationProvenanceItem = null;
- synchronized (indexes) {
- // find the iteration item for the int index eg [1]
- iterationProvenanceItem = indexes.get(indexStr);
- // if it is null then strip the index and look again
-
- while (iterationProvenanceItem == null) {
- try {
- index = removeLastIndex(index);
- iterationProvenanceItem = indexes.get(indexStr(index));
- /*
- * if we have a 'parent' iteration then create a new
- * iteration for the original index and link it to the
- * activity and the input data
- *
- * FIXME should this be linked to the parent iteration
- * instead?
- */
- if (iterationProvenanceItem != null) {
- // set the index to the one from the event
- IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
- iterationProvenanceItem1.setIteration(originalIndex);
- iterationProvenanceItem1.setProcessId(owningProcess);
- iterationProvenanceItem1.setIdentifier(UUID
- .randomUUID().toString());
- iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
- iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
- iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
- iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());
-
-// for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
-// .entrySet()) {
-// List<Object> value = entrySet.getValue();
-// int[] newIndex = (int[]) value.get(0);
-// String owner = (String) value.get(1);
-// String indexString = indexStr(newIndex);
-// String indexString2 = indexStr(index);
-//
-// if (owningProcess.equalsIgnoreCase(owner)
-// && indexString
-// .equalsIgnoreCase(indexString2))
-// iterationProvenanceItem1.setParentId(entrySet
-// .getKey().getIdentifier());
-// }
-// for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
-// .entrySet()) {
-// List<Object> value = entrySet.getValue();
-// int[] newIndex = (int[]) value.get(0);
-// String owner = (String) value.get(1);
-// String indexString = indexStr(newIndex);
-// String indexString2 = indexStr(index);
-// if (owningProcess.equalsIgnoreCase(owner)
-// && indexString
-// .equalsIgnoreCase(indexString2))
-// iterationProvenanceItem1
-// .setInputDataItem(entrySet.getKey());
-// }
-
- // for (ActivityProvenanceItem item :
- // activityProvenanceItemList) {
- // if (owningProcess.equalsIgnoreCase(item
- // .getProcessId())) {
- // iterationProvenanceItem1.setParentId(item
- // .getIdentifier());
- // }
- // }
- // for (InputDataProvenanceItem item :
- // inputDataProvenanceItemList) {
- // if (owningProcess.equalsIgnoreCase(item
- // .getProcessId())) {
- // iterationProvenanceItem1.setInputDataItem(item);
- // }
- // indexes.put(indexStr, iterationProvenanceItem1);
- // return iterationProvenanceItem1;
- // // }
- // }
-
- // add this new iteration item to the map
- getIndexesByProcess(event.getOwningProcess()).put(
- indexStr(event.getIndex()),
- iterationProvenanceItem1);
- return iterationProvenanceItem1;
- }
- /*
- * if we have not found an iteration items and the index is
- * [] then something is wrong remove the last index in the
- * int array before we go back through the while
- */
- } catch (IllegalStateException e) {
- logger
- .warn("Cannot find a parent iteration with index [] for owning process: "
- + owningProcess
- + "Workflow invocation is in an illegal state");
- throw e;
- }
- }
-
- // if (iterationProvenanceItem == null) {
- // logger.info("Iteration item was null for: "
- // + event.getOwningProcess() + " " + event.getIndex());
- // System.out.println("Iteration item was null for: "
- // + event.getOwningProcess() + " " + event.getIndex());
- // iterationProvenanceItem = new IterationProvenanceItem(index);
- // iterationProvenanceItem.setProcessId(owningProcess);
- // iterationProvenanceItem.setIdentifier(UUID.randomUUID()
- // .toString());
- // // for (ActivityProvenanceItem
- // item:activityProvenanceItemList)
- // // {
- // // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
- // // iterationProvenanceItem.setParentId(item.getIdentifier());
- // // }
- // // }
- // // for (InputDataProvenanceItem item:
- // // inputDataProvenanceItemList) {
- // // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
- // // iterationProvenanceItem.setInputDataItem(item);
- // // }
- // // }
- // indexes.put(indexStr, iterationProvenanceItem);
-
- }
- return iterationProvenanceItem;
- }
-
- private String indexStr(int[] index) {
- StringBuilder indexStr = new StringBuilder();
- for (int ind : index)
- indexStr.append(":").append(ind);
- return indexStr.toString();
- }
-
- /**
- * Remove the last index of the int array in the form 1:2:3 etc
- *
- * @param index
- * @return
- */
- @SuppressWarnings("unused")
- private String[] stripLastIndex(int[] index) {
- // will be in form :1:2:3
- return indexStr(index).split(":");
- }
-
- /**
- * Remove the last value in the int array
- *
- * @param index
- * @return
- */
- private int[] removeLastIndex(int[] index) {
- if (index.length == 0)
- throw new IllegalStateException(
- "There is no parent iteration of index [] for this result");
- int[] newIntArray = new int[index.length - 1];
- for (int i = 0; i < index.length - 1; i++)
- newIntArray[i] = index[i];
- return newIntArray;
- }
-
- private static String uuid() {
- return UUID.randomUUID().toString();
- }
-
- /**
- * Create an {@link ErrorProvenanceItem} and send across to the
- * {@link ProvenanceConnector}
- */
- @Override
- public void receiveError(DispatchErrorEvent errorEvent) {
- IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
- // get using errorEvent.getOwningProcess();
-
- ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
- errorItem.setCause(errorEvent
- .getCause());
- errorItem.setErrorType(errorEvent
- .getFailureType().toString());
- errorItem.setMessage(errorEvent.getMessage());
-
- errorItem.setProcessId(errorEvent.getOwningProcess());
- errorItem.setIdentifier(uuid());
- errorItem.setParentId(iterationProvItem.getIdentifier());
- // iterationProvItem.setErrorItem(errorItem);
- // FIXME don't need to add to the processor item earlier
- getReporter().addProvenanceItem(errorItem);
- super.receiveError(errorEvent);
- }
-
- /**
- * Create the {@link ProvenanceItem}s and send them all across to the
- * {@link ProvenanceConnector} except for the
- * {@link IterationProvenanceItem}, this one is told what it's inputs are
- * but has to wait until the results are received before being sent across.
- * Each item has a unique identifier and also knows who its parent item is
- */
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- try {
- // FIXME do we need this ProcessProvenanceItem?
- ProcessProvenanceItem provenanceItem;
- String[] split = jobEvent.getOwningProcess().split(":");
- provenanceItem = new ProcessProvenanceItem();
- String parentDataflowId = workflowItem.getParentId();
- provenanceItem.setWorkflowId(parentDataflowId);
- provenanceItem.setFacadeID(split[0]);
- provenanceItem.setDataflowID(split[1]);
- provenanceItem.setProcessId(jobEvent.getOwningProcess());
- provenanceItem.setIdentifier(uuid());
- provenanceItem.setParentId(workflowItem.getIdentifier());
- ProcessorProvenanceItem processorProvItem;
- processorProvItem = new ProcessorProvenanceItem();
- processorProvItem.setWorkflowId(parentDataflowId);
- processorProvItem.setProcessId(jobEvent
- .getOwningProcess());
- processorProvItem.setIdentifier(uuid());
- processorProvItem.setParentId(provenanceItem.getIdentifier());
- provenanceItem.setProcessId(jobEvent.getOwningProcess());
- getReporter().addProvenanceItem(provenanceItem);
- getReporter().addProvenanceItem(processorProvItem);
-
- IterationProvenanceItem iterationProvItem = null;
- iterationProvItem = new IterationProvenanceItem();
- iterationProvItem.setWorkflowId(parentDataflowId);
- iterationProvItem.setIteration(jobEvent.getIndex());
- iterationProvItem.setIdentifier(uuid());
-
- ReferenceService referenceService = jobEvent.getContext()
- .getReferenceService();
-
- InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
- inputDataItem.setDataMap(jobEvent.getData());
- inputDataItem.setReferenceService(referenceService);
- inputDataItem.setIdentifier(uuid());
- inputDataItem.setParentId(iterationProvItem.getIdentifier());
- inputDataItem.setProcessId(jobEvent.getOwningProcess());
-
- List<Object> inputIndexOwnerList = new ArrayList<>();
- inputIndexOwnerList.add(jobEvent.getIndex());
- inputIndexOwnerList.add(jobEvent.getOwningProcess());
- inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
-
- // inputDataProvenanceItemList.add(inputDataItem);
- iterationProvItem.setInputDataItem(inputDataItem);
- iterationProvItem.setIteration(jobEvent.getIndex());
- iterationProvItem.setProcessId(jobEvent.getOwningProcess());
-
- for (Activity<?> activity : jobEvent.getActivities())
- if (activity instanceof AsynchronousActivity) {
- ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
- activityProvItem.setWorkflowId(parentDataflowId);
- activityProvItem.setIdentifier(uuid());
- iterationProvItem.setParentId(activityProvItem.getIdentifier());
- // getConnector().addProvenanceItem(iterationProvItem);
- activityProvItem.setParentId(processorProvItem.getIdentifier());
- // processorProvItem.setActivityProvenanceItem(activityProvItem);
- activityProvItem.setProcessId(jobEvent.getOwningProcess());
- List<Object> activityIndexOwnerList = new ArrayList<>();
- activityIndexOwnerList.add(jobEvent.getOwningProcess());
- activityIndexOwnerList.add(jobEvent.getIndex());
- activityProvenanceItemMap.put(activityProvItem,
- inputIndexOwnerList);
- // activityProvenanceItemList.add(activityProvItem);
- // activityProvItem.setIterationProvenanceItem(iterationProvItem);
- getReporter().addProvenanceItem(activityProvItem);
- break;
- }
- getIndexesByProcess(jobEvent.getOwningProcess()).put(
- indexStr(jobEvent.getIndex()), iterationProvItem);
- iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
- getReporter().addProvenanceItem(iterationProvItem);
- } catch (RuntimeException ex) {
- logger.error("Could not store provenance for " + jobEvent, ex);
- }
-
- super.receiveJob(jobEvent);
- }
-
- @Override
- public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
- super.receiveJobQueue(jobQueueEvent);
- }
-
- /**
- * Populate an {@link OutputDataProvenanceItem} with the results and attach
- * it to the appropriate {@link IterationProvenanceItem}. Then send the
- * {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
- */
- @Override
- public void receiveResult(DispatchResultEvent resultEvent) {
- try {
- // FIXME use the connector from the result event context
- IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
- iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
-
- ReferenceService referenceService = resultEvent.getContext()
- .getReferenceService();
-
- OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
- outputDataItem.setDataMap(resultEvent.getData());
- outputDataItem.setReferenceService(referenceService);
- outputDataItem.setIdentifier(uuid());
- outputDataItem.setProcessId(resultEvent.getOwningProcess());
- outputDataItem.setParentId(iterationProvItem.getIdentifier());
- iterationProvItem.setOutputDataItem(outputDataItem);
-
- getReporter().addProvenanceItem(iterationProvItem);
- // getConnector().addProvenanceItem(outputDataItem);
-
- // PM -- testing
- // add xencoding of data value here??
- // Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
- // for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
- // // create a simpler bean that we can serialize?
- //
- // T2Reference ref = entry.getValue();
- //
- // SimplerT2Reference t2RefBean = new SimplerT2Reference();
- // t2RefBean.setReferenceType(ref.getReferenceType());
- // t2RefBean.setDepth(ref.getDepth());
- // t2RefBean.setLocalPart(ref.getLocalPart());
- // t2RefBean.setNamespacePart(ref.getNamespacePart());
- //
- // System.out.println("data ref: "+ref);
- // String serializedInput = SerializeParam(t2RefBean);
- // System.out.println("serialized reference:" + serializedInput);
- //
- // System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
-// }
- } catch (Exception ex) {
- logger.error("Could not store provenance for "
- + resultEvent.getOwningProcess() + " "
- + Arrays.toString(resultEvent.getIndex()), ex);
- // But don't break super.receiveResult() !!
- }
- super.receiveResult(resultEvent);
- }
-
- @Override
- public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
- super.receiveResultCompletion(completionEvent);
- }
-
- /**
- * Tell this layer what {@link ProvenanceConnector} implementation is being
- * used to capture the {@link ProvenanceItem}s. NOTE: should probably use
- * the connector from the result events context where possible
- *
- * @param connector
- */
- public void setReporter(ProvenanceReporter connector) {
- this.reporter = connector;
- }
-
- public ProvenanceReporter getReporter() {
- return reporter;
- }
-
- /**
- * So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
- * enacted this layer has to know about the {@link WorkflowProvenanceItem}
- *
- * @param workflowItem
- */
- public void setWorkflow(WorkflowProvenanceItem workflowItem) {
- this.workflowItem = workflowItem;
- }
-
- // TODO is this unused?
- public static String SerializeParam(Object ParamValue) {
- ByteArrayOutputStream BStream = new ByteArrayOutputStream();
- XMLEncoder encoder = new XMLEncoder(BStream);
- encoder.writeObject(ParamValue);
- encoder.close();
- return BStream.toString();
- }
-
- // TODO is this unused?
- public static Object DeserializeParam(String SerializedParam) {
- InputStream IStream = new ByteArrayInputStream(
- SerializedParam.getBytes());
- XMLDecoder decoder = new XMLDecoder(IStream);
- Object output = decoder.readObject();
- decoder.close();
- return output;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
deleted file mode 100644
index f8403df..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.ERROR;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT_COMPLETION;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.sql.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.ControlBoundary;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.activity.MonitorableAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Context free invoker layer, does not pass index arrays of jobs into activity
- * instances.
- * <p>
- * This layer will invoke the first invokable activity in the activity list, so
- * any sane dispatch stack will have narrowed this down to a single item list by
- * this point, i.e. by the insertion of a failover layer.
- * <p>
- * Currently only handles activities implementing {@link AsynchronousActivity}.
- *
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- *
- */
-@DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {})
-@ControlBoundary
-public class Invoke extends AbstractDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke";
- private static Logger logger = Logger.getLogger(Invoke.class);
- private static Long invocationCount = 0L;
-
- private MonitorManager monMan;
-
- private static String getNextProcessID() {
- long count;
- synchronized (invocationCount) {
- count = ++invocationCount;
- }
- return "invocation" + count;
- }
-
- public Invoke() {
- super();
- monMan = MonitorManager.getInstance();
- }
-
- @Override
- public void configure(JsonNode config) {
- // No configuration, do nothing
- }
-
- @Override
- public JsonNode getConfiguration() {
- return null;
- }
-
- /**
- * Receive a job from the layer above and pick the first concrete activity
- * from the list to invoke. Invoke this activity, creating a callback which
- * will wrap up the result messages in the appropriate collection depth
- * before sending them on (in general activities are not aware of their
- * invocation context and should not be responsible for providing correct
- * index arrays for results)
- * <p>
- * This layer will invoke the first invokable activity in the activity list,
- * so any sane dispatch stack will have narrowed this down to a single item
- * list by this point, i.e. by the insertion of a failover layer.
- */
- @Override
- public void receiveJob(final DispatchJobEvent jobEvent) {
- for (Activity<?> activity : jobEvent.getActivities())
- if (activity instanceof AsynchronousActivity) {
- invoke(jobEvent, (AsynchronousActivity<?>) activity);
- break;
- }
- }
-
- protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
- // Register with the monitor
- final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
- getNextProcessID()).getOwningProcess();
- monMan.registerNode(activity, invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
- monMan.registerNode(jobEvent, invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
-
- /*
- * The activity is an AsynchronousActivity so we invoke it with an
- * AsynchronousActivityCallback object containing appropriate callback
- * methods to push results, completions and failures back to the
- * invocation layer.
- *
- * Get the registered DataManager for this process. In most cases this
- * will just be a single DataManager for the entire workflow system but
- * it never hurts to generalize
- */
-
- InvocationContext context = jobEvent.getContext();
- final ReferenceService refService = context.getReferenceService();
-
- InvocationStartedProvenanceItem invocationItem = null;
- ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
- if (provenanceReporter != null) {
- IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
- if (intermediateProvenance != null) {
- invocationItem = new InvocationStartedProvenanceItem();
- IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
- invocationItem.setIdentifier(UUID.randomUUID().toString());
- invocationItem.setActivity(activity);
- invocationItem.setProcessId(jobEvent.getOwningProcess());
- invocationItem.setInvocationProcessId(invocationProcessIdentifier);
- invocationItem.setParentId(parentItem.getIdentifier());
- invocationItem.setWorkflowId(parentItem.getWorkflowId());
- invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
- provenanceReporter.addProvenanceItem(invocationItem);
- }
- }
-
- /*
- * Create a Map of EntityIdentifiers named appropriately given the
- * activity mapping
- */
- Map<String, T2Reference> inputData = new HashMap<>();
- for (String inputName : jobEvent.getData().keySet()) {
- String activityInputName = activity
- .getInputPortMapping().get(inputName);
- if (activityInputName != null)
- inputData.put(activityInputName, jobEvent.getData()
- .get(inputName));
- }
-
- /*
- * Create a callback object to receive events, completions and failure
- * notifications from the activity
- */
- AsynchronousActivityCallback callback = new InvokeCallBack(
- jobEvent, refService, invocationProcessIdentifier,
- activity);
-
- if (activity instanceof MonitorableAsynchronousActivity<?>) {
- /*
- * Monitorable activity so get the monitorable properties and push
- * them into the state tree after launching the job
- */
- MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
- Set<MonitorableProperty<?>> props = maa
- .executeAsynchWithMonitoring(inputData, callback);
- monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
- } else {
- /*
- * Run the job, passing in the callback we've just created along
- * with the (possibly renamed) input data map
- */
- activity.executeAsynch(inputData, callback);
- }
- }
-
- protected IntermediateProvenance findIntermediateProvenance() {
- for (DispatchLayer<?> layer : getProcessor().getDispatchStack()
- .getLayers())
- if (layer instanceof IntermediateProvenance)
- return (IntermediateProvenance) layer;
- return null;
- }
-
- protected class InvokeCallBack implements AsynchronousActivityCallback {
- protected final AsynchronousActivity<?> activity;
- protected final String invocationProcessIdentifier;
- protected final DispatchJobEvent jobEvent;
- protected final ReferenceService refService;
- protected boolean sentJob = false;
-
- protected InvokeCallBack(DispatchJobEvent jobEvent,
- ReferenceService refService,
- String invocationProcessIdentifier,
- AsynchronousActivity<?> asyncActivity) {
- this.jobEvent = jobEvent;
- this.refService = refService;
- this.invocationProcessIdentifier = invocationProcessIdentifier;
- this.activity = asyncActivity;
- }
-
- @Override
- public void fail(String message) {
- fail(message, null);
- }
-
- @Override
- public void fail(String message, Throwable t) {
- fail(message, t, DispatchErrorType.INVOCATION);
- }
-
- @Override
- public void fail(String message, Throwable t,
- DispatchErrorType errorType) {
- logger.warn("Failed (" + errorType + ") invoking " + activity
- + " for job " + jobEvent + ": " + message, t);
- monMan.deregisterNode(
- invocationProcessIdentifier);
- getAbove().receiveError(
- new DispatchErrorEvent(jobEvent.getOwningProcess(),
- jobEvent.getIndex(), jobEvent.getContext(),
- message, t, errorType, activity));
- }
-
- @Override
- public InvocationContext getContext() {
- return jobEvent.getContext();
- }
-
- @Override
- public String getParentProcessIdentifier() {
- return invocationProcessIdentifier;
- }
-
- @Override
- public void receiveCompletion(int[] completionIndex) {
- if (completionIndex.length == 0)
- // Final result, clean up monitor state
- monMan.deregisterNode(invocationProcessIdentifier);
- if (sentJob) {
- int[] newIndex;
- if (completionIndex.length == 0)
- newIndex = jobEvent.getIndex();
- else {
- newIndex = new int[jobEvent.getIndex().length
- + completionIndex.length];
- int i = 0;
- for (int indexValue : jobEvent.getIndex())
- newIndex[i++] = indexValue;
- for (int indexValue : completionIndex)
- newIndex[i++] = indexValue;
- }
- DispatchCompletionEvent c = new DispatchCompletionEvent(
- jobEvent.getOwningProcess(), newIndex, jobEvent
- .getContext());
- getAbove().receiveResultCompletion(c);
- } else {
- /*
- * We haven't sent any 'real' data prior to completing a stream.
- * This in effect means we're sending an empty top level
- * collection so we need to register empty collections for each
- * output port with appropriate depth (by definition if we're
- * streaming all outputs are collection types of some kind)
- */
- Map<String, T2Reference> emptyListMap = new HashMap<>();
- for (OutputPort op : activity.getOutputPorts()) {
- String portName = op.getName();
- int portDepth = op.getDepth();
- emptyListMap.put(portName, refService.getListService()
- .registerEmptyList(portDepth, jobEvent.getContext()).getId());
- }
- receiveResult(emptyListMap, new int[0]);
- }
- }
-
- @Override
- public void receiveResult(Map<String, T2Reference> data, int[] index) {
- /*
- * Construct a new result map using the activity mapping (activity
- * output name to processor output name)
- */
- Map<String, T2Reference> resultMap = new HashMap<>();
- for (String outputName : data.keySet()) {
- String processorOutputName = activity
- .getOutputPortMapping().get(outputName);
- if (processorOutputName != null)
- resultMap.put(processorOutputName, data.get(outputName));
- }
- /*
- * Construct a new index array if the specified index is non zero
- * length, otherwise just use the original job's index array (means
- * we're not streaming)
- */
- int[] newIndex;
- boolean streaming = false;
- if (index.length == 0)
- newIndex = jobEvent.getIndex();
- else {
- streaming = true;
- newIndex = new int[jobEvent.getIndex().length + index.length];
- int i = 0;
- for (int indexValue : jobEvent.getIndex())
- newIndex[i++] = indexValue;
- for (int indexValue : index)
- newIndex[i++] = indexValue;
- }
- DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent
- .getOwningProcess(), newIndex, jobEvent.getContext(),
- resultMap, streaming);
- if (!streaming) {
- monMan.registerNode(resultEvent, invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
- // Final result, clean up monitor state
- monMan.deregisterNode(invocationProcessIdentifier);
- }
- // Push the modified data to the layer above in the dispatch stack
- getAbove().receiveResult(resultEvent);
-
- sentJob = true;
- }
-
- @Override
- public void requestRun(Runnable runMe) {
- String newThreadName = jobEvent.toString();
- Thread thread = new Thread(runMe, newThreadName);
- thread.setContextClassLoader(activity.getClass()
- .getClassLoader());
- thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- fail("Uncaught exception while invoking " + activity, e);
- }
- });
- thread.start();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
deleted file mode 100644
index d5077a4..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2008 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.AbstractDispatchEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-
-/**
- * A layer that allows while-style loops.
- * <p>
- * The layer is configured with a {@link LoopConfiguration}, where an activity
- * has been set as the
- * {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity)
- * condition}.
- * <p>
- * After a job has been successful further down the dispatch stack, the loop
- * layer will invoke the conditional activity to determine if the job will be
- * invoked again. If {@link LoopConfiguration#isRunFirst()} is false, this test
- * will be performed even before the first invocation. (The default
- * runFirst=true is equivalent to a do..while construct, while runFirst=false is
- * equivalent to a while.. construct.)
- * <p>
- * A job will be resent down the dispatch stack only if the conditional activity
- * returns a reference to a string equal to "true" on its output port "loop".
- * <p>
- * If a job or the conditional activity fails, the while-loop is interrupted and
- * the error is sent further up.
- * <p>
- * Note that the LoopLayer will be invoked for each item in an iteration, if you
- * want to do the loop for the whole collection (ie. re-iterating if the
- * loop-condition fails after processing the full list) - create a nested
- * workflow with the desired depths on it's input ports and insert this
- * LoopLayer in the stack of the nested workflow's processor in parent workflow.
- * <p>
- * It is recommended that the LoopLayer is to be inserted after the
- * {@link ErrorBounce} layer, as this layer is needed for registering errors
- * produced by the LoopLayer. If the user requires {@link Retry retries} and
- * {@link Failover failovers} before checking the while condition, such layers
- * should be below LoopLayer.
- *
- * @author Stian Soiland-Reyes
- */
-// FIXME Doesn't work
-@SuppressWarnings({"unchecked","rawtypes"})
-public class Loop extends AbstractDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop";
- private static Logger logger = Logger.getLogger(Loop.class);
-
- private JsonNode config = JsonNodeFactory.instance.objectNode();
-
- protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>();
- protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>();
-
- @Override
- public void configure(JsonNode config) {
- this.config = config;
- }
-
- @Override
- public void finishedWith(String owningProcess) {
- String prefix = owningProcess + "[";
- synchronized (outgoingJobs) {
- for (String key : new ArrayList<>(outgoingJobs.keySet()))
- if (key.startsWith(prefix))
- outgoingJobs.remove(key);
- }
- synchronized (incomingJobs) {
- for (String key : new ArrayList<>(incomingJobs.keySet()))
- if (key.startsWith(prefix))
- incomingJobs.remove(key);
- }
- }
-
- @Override
- public JsonNode getConfiguration() {
- return config;
- }
-
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- synchronized (incomingJobs) {
- incomingJobs.put(jobIdentifier(jobEvent), jobEvent);
- }
- if (config.get("runFirst").asBoolean()) {
- // We'll do the conditional in receiveResult instead
- super.receiveJob(jobEvent);
- return;
- }
- checkCondition(jobEvent);
- }
-
- @Override
- public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
- synchronized (incomingJobs) {
- incomingJobs.put(jobIdentifier(jobQueueEvent), jobQueueEvent);
- }
- if (config.get("runFirst").asBoolean()) {
- // We'll do the conditional in receiveResult instead
- super.receiveJobQueue(jobQueueEvent);
- return;
- }
- checkCondition(jobQueueEvent);
- }
-
- private Activity<?> getCondition() {
- //return config.getCondition();
- return null;
- }
-
- @Override
- public void receiveResult(DispatchResultEvent resultEvent) {
- Activity<?> condition = getCondition();
- if (condition == null) {
- super.receiveResult(resultEvent);
- return;
- }
- synchronized (outgoingJobs) {
- outgoingJobs.put(jobIdentifier(resultEvent), resultEvent);
- }
- checkCondition(resultEvent);
- }
-
- @Override
- public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
- Activity<?> condition = getCondition();
- if (condition == null) {
- super.receiveResultCompletion(completionEvent);
- return;
- }
- synchronized (outgoingJobs) {
- outgoingJobs.put(jobIdentifier(completionEvent), completionEvent);
- }
- checkCondition(completionEvent);
- }
-
- private void checkCondition(AbstractDispatchEvent event) {
- Activity<?> condition = getCondition();
- if (condition == null) {
- super.receiveError(new DispatchErrorEvent(event.getOwningProcess(),
- event.getIndex(), event.getContext(),
- "Can't invoke condition service: null", null,
- DispatchErrorType.INVOCATION, condition));
- return;
- }
- if (!(condition instanceof AbstractAsynchronousActivity)) {
- DispatchErrorEvent errorEvent = new DispatchErrorEvent(
- event.getOwningProcess(),
- event.getIndex(),
- event.getContext(),
- "Can't invoke condition service "
- + condition
- + " is not an instance of AbstractAsynchronousActivity",
- null, DispatchErrorType.INVOCATION, condition);
- super.receiveError(errorEvent);
- return;
- }
- AbstractAsynchronousActivity asyncCondition = (AbstractAsynchronousActivity) condition;
- String jobIdentifier = jobIdentifier(event);
- Map<String, T2Reference> inputs = prepareInputs(asyncCondition,
- jobIdentifier);
- AsynchronousActivityCallback callback = new ConditionCallBack(
- jobIdentifier);
- asyncCondition.executeAsynch(inputs, callback);
- }
-
- private Map<String, T2Reference> prepareInputs(
- AbstractAsynchronousActivity asyncCondition, String jobIdentifier) {
- Map<String, T2Reference> inputs = new HashMap<>();
- Map<String, T2Reference> inData = getInData(jobIdentifier);
- Map<String, T2Reference> outData = getOutData(jobIdentifier);
-
- Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts();
- for (ActivityInputPort conditionIn : inputPorts) {
- String conditionPort = conditionIn.getName();
- if (outData.containsKey(conditionPort))
- // Copy from previous output
- inputs.put(conditionPort, outData.get(conditionPort));
- else if (inData.containsKey(conditionPort))
- // Copy from original input
- inputs.put(conditionPort, inData.get(conditionPort));
- }
- return inputs;
- }
-
- private Map<String, T2Reference> getInData(String jobIdentifier) {
- AbstractDispatchEvent inEvent;
- synchronized (incomingJobs) {
- inEvent = incomingJobs.get(jobIdentifier);
- }
- Map<String, T2Reference> inData = new HashMap<>();
- if (inEvent instanceof DispatchJobEvent)
- inData = ((DispatchJobEvent) inEvent).getData();
- return inData;
- }
-
- private Map<String, T2Reference> getOutData(String jobIdentifier) {
- AbstractDispatchEvent outEvent;
- synchronized (outgoingJobs) {
- outEvent = outgoingJobs.get(jobIdentifier);
- }
- Map<String, T2Reference> outData = new HashMap<>();
- if (outEvent instanceof DispatchResultEvent)
- outData = ((DispatchResultEvent) outEvent).getData();
- return outData;
- }
-
- private String jobIdentifier(AbstractDispatchEvent event) {
- String jobId = event.getOwningProcess()
- + Arrays.toString(event.getIndex());
- return jobId;
- }
-
- public static final String LOOP_PORT = "loop";
-
- public class ConditionCallBack implements AsynchronousActivityCallback {
- private InvocationContext context;
- private final String jobIdentifier;
- private String processId;
-
- public ConditionCallBack(String jobIdentifier) {
- this.jobIdentifier = jobIdentifier;
- AbstractDispatchEvent originalEvent;
- synchronized (incomingJobs) {
- originalEvent = incomingJobs.get(jobIdentifier);
- }
- context = originalEvent.getContext();
- processId = originalEvent.getOwningProcess() + ":condition";
- }
-
- @Override
- public void fail(String message) {
- fail(message, null, DispatchErrorType.INVOCATION);
- }
-
- @Override
- public void fail(String message, Throwable t) {
- fail(message, t, DispatchErrorType.INVOCATION);
- }
-
- @Override
- public void fail(String message, Throwable t,
- DispatchErrorType errorType) {
- logger.warn("Failed (" + errorType + ") invoking condition service "
- + jobIdentifier + ":" + message, t);
-
- AbstractDispatchEvent originalEvent;
- synchronized (incomingJobs) {
- originalEvent = incomingJobs.get(jobIdentifier);
- }
- receiveError(new DispatchErrorEvent(originalEvent
- .getOwningProcess(), originalEvent.getIndex(),
- originalEvent.getContext(),
- "Can't invoke condition service ", t,
- DispatchErrorType.INVOCATION, null));
- }
-
- @Override
- public InvocationContext getContext() {
- return context;
- }
-
- @Override
- public String getParentProcessIdentifier() {
- return processId;
- }
-
- @Override
- public void receiveCompletion(int[] completionIndex) {
- // Ignore streaming
- }
-
- @Override
- public void receiveResult(Map<String, T2Reference> data, int[] index) {
- if (index.length > 0) {
- // Ignore streaming
- return;
- }
- T2Reference loopRef = data.get(LOOP_PORT);
- if (loopRef == null) {
- fail("Condition service didn't contain output port " + LOOP_PORT);
- return;
- }
- if (loopRef.containsErrors()) {
- fail("Condition service failed: " + loopRef);
- return;
- }
- if (loopRef.getDepth() != 0) {
- fail("Condition service output " + LOOP_PORT
- + " depth is not 0, but " + loopRef.getDepth());
- }
- ReferenceService referenceService = context.getReferenceService();
- String loop = (String) referenceService.renderIdentifier(loopRef,
- String.class, context);
-
- if (Boolean.parseBoolean(loop)) {
- // Push it down again
- AbstractDispatchEvent dispatchEvent;
- synchronized (incomingJobs) {
- dispatchEvent = incomingJobs.get(jobIdentifier);
- }
- if (dispatchEvent == null) {
- fail("Unknown job identifier " + jobIdentifier);
- }
- if (dispatchEvent instanceof DispatchJobEvent) {
- DispatchJobEvent newJobEvent = prepareNewJobEvent(data,
- dispatchEvent);
- getBelow().receiveJob(newJobEvent);
- } else if (dispatchEvent instanceof DispatchJobQueueEvent) {
- getBelow().receiveJobQueue(
- (DispatchJobQueueEvent) dispatchEvent);
- } else {
- fail("Unknown type of incoming event " + dispatchEvent);
- }
- return;
-
- } else {
- // We'll push it up, end of loop for now
-
- AbstractDispatchEvent outgoingEvent;
- synchronized (outgoingJobs) {
- outgoingEvent = outgoingJobs.get(jobIdentifier);
- }
- if (outgoingEvent == null && !config.get("runFirst").asBoolean()) {
- fail("Initial loop condition failed");
- }
- if (outgoingEvent instanceof DispatchCompletionEvent) {
- getAbove().receiveResultCompletion(
- (DispatchCompletionEvent) outgoingEvent);
- } else if (outgoingEvent instanceof DispatchResultEvent) {
- getAbove().receiveResult(
- (DispatchResultEvent) outgoingEvent);
- } else {
- fail("Unknown type of outgoing event " + outgoingEvent);
- }
- }
-
- }
-
- private DispatchJobEvent prepareNewJobEvent(
- Map<String, T2Reference> data,
- AbstractDispatchEvent dispatchEvent) {
- DispatchJobEvent dispatchJobEvent = (DispatchJobEvent) dispatchEvent;
- Map<String, T2Reference> newInputs = new HashMap<String, T2Reference>(
- dispatchJobEvent.getData());
- newInputs.putAll(data);
- DispatchJobEvent newJobEvent = new DispatchJobEvent(dispatchEvent
- .getOwningProcess(), dispatchEvent.getIndex(),
- dispatchEvent.getContext(), newInputs,
- ((DispatchJobEvent) dispatchEvent).getActivities());
- /*
- * TODO: Should this be registered as an incomingJobs? If so the
- * conditional could even feed to itself, and we should also keep a
- * list of originalJobs.
- */
- return newJobEvent;
- }
-
- @Override
- public void requestRun(Runnable runMe) {
- String newThreadName = "Condition service "
- + getParentProcessIdentifier();
- Thread thread = new Thread(runMe, newThreadName);
- thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- fail("Uncaught exception while invoking " + jobIdentifier,
- e);
- }
- });
- thread.start();
- }
- }
-
- @Override
- public Processor getProcessor() {
- if (dispatchStack == null)
- return null;
- return dispatchStack.getProcessor();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
deleted file mode 100644
index 7cfa2a5..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.util.Properties;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-/**
- * Configuration bean for the {@link Loop}.
- * <p>
- * Set the {@link #setCondition(Activity)} for an activity with an output port
- * called "loop". The LoopLayer will re-send a job only if this port exist and
- * it's output can be dereferenced to a string equal to "true".
- * </p>
- * <p>
- * If {@link #isRunFirst()} is false, the loop layer will check the condition
- * before invoking the job for the first time, otherwise the condition will be
- * invoked after the job has come back with successful results.
- * </p>
- *
- * @author Stian Soiland-Reyes
- *
- */
-@ConfigurationBean(uri = Loop.URI + "#Config")
-public class LoopConfiguration implements Cloneable {
- private Activity<?> condition = null;
- private Boolean runFirst;
- private Properties properties;
-
- public Properties getProperties() {
- synchronized (this) {
- if (properties == null)
- properties = new Properties();
- }
- return properties;
- }
-
- public void setProperties(Properties properties) {
- this.properties = properties;
- }
-
- @Override
- public LoopConfiguration clone() {
- LoopConfiguration clone;
- try {
- clone = (LoopConfiguration) super.clone();
- clone.condition = null;
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException("Unexpected CloneNotSupportedException",
- e);
- }
- return clone;
- }
-
- public Activity<?> getCondition() {
- return condition;
- }
-
- public boolean isRunFirst() {
- if (runFirst == null)
- return true;
- return runFirst;
- }
-
- @ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required = false)
- public void setCondition(Activity<?> activity) {
- this.condition = activity;
- }
-
- @ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required = false)
- public void setRunFirst(boolean runFirst) {
- this.runFirst = runFirst;
- }
-}
[6/6] incubator-taverna-engine git commit: updated scufl2 dependencies
Posted by st...@apache.org.
updated scufl2 dependencies
Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/commit/315a829c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/tree/315a829c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/diff/315a829c
Branch: refs/heads/master
Commit: 315a829c3d7740e51726d2477273196338dca58e
Parents: 4252fa9
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Fri Feb 27 15:26:24 2015 +0000
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Fri Feb 27 15:26:24 2015 +0000
----------------------------------------------------------------------
taverna-platform-integration-tests/pom.xml | 17 +----------------
taverna-prov/pom.xml | 4 +++-
taverna-report-api/pom.xml | 2 +-
3 files changed, 5 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/315a829c/taverna-platform-integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-platform-integration-tests/pom.xml b/taverna-platform-integration-tests/pom.xml
index a958d1c..e99c469 100644
--- a/taverna-platform-integration-tests/pom.xml
+++ b/taverna-platform-integration-tests/pom.xml
@@ -362,7 +362,7 @@
</dependency>
<dependency>
<groupId>org.apache.taverna.language</groupId>
- <artifactId>taverna-scufl2-rdfxml</artifactId>
+ <artifactId>taverna-scufl2-wfbundle</artifactId>
<version>${taverna.language.version}</version>
</dependency>
<dependency>
@@ -370,21 +370,6 @@
<artifactId>taverna-scufl2-ucfpackage</artifactId>
<version>${taverna.language.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.taverna.language</groupId>
- <artifactId>taverna-scufl2-validation</artifactId>
- <version>${taverna.language.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.taverna.language</groupId>
- <artifactId>taverna-scufl2-validation-correctness</artifactId>
- <version>${taverna.language.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.taverna.language</groupId>
- <artifactId>taverna-scufl2-validation-structural</artifactId>
- <version>${taverna.language.version}</version>
- </dependency>
<!-- Taverna CommandLine Tool -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/315a829c/taverna-prov/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-prov/pom.xml b/taverna-prov/pom.xml
index 2eebd56..79e38ff 100644
--- a/taverna-prov/pom.xml
+++ b/taverna-prov/pom.xml
@@ -40,14 +40,16 @@
</dependency>
<dependency>
<groupId>org.apache.taverna.language</groupId>
- <artifactId>taverna-scufl2-rdfxml</artifactId>
+ <artifactId>taverna-scufl2-wfbundle</artifactId>
<version>${taverna.language.version}</version>
</dependency>
+ <!--
<dependency>
<groupId>org.apache.taverna.language</groupId>
<artifactId>taverna-scufl2-wfdesc</artifactId>
<version>${taverna.language.version}</version>
</dependency>
+ -->
<dependency>
<groupId>org.apache.taverna.language</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/315a829c/taverna-report-api/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-report-api/pom.xml b/taverna-report-api/pom.xml
index 159f06f..117cf86 100644
--- a/taverna-report-api/pom.xml
+++ b/taverna-report-api/pom.xml
@@ -16,7 +16,7 @@
</dependency>
<dependency>
<groupId>org.apache.taverna.language</groupId>
- <artifactId>taverna-scufl2-rdfxml</artifactId>
+ <artifactId>taverna-scufl2-wfbundle</artifactId>
<version>${taverna.language.version}</version>
</dependency>
<dependency>
[3/6] incubator-taverna-engine git commit:
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
new file mode 100644
index 0000000..718079a
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
@@ -0,0 +1,508 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static java.lang.System.currentTimeMillis;
+
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import net.sf.taverna.t2.invocation.Event;
+import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
+import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
+import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
+import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ProvenanceItem;
+import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
+import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.workflowmodel.Dataflow;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Sits above the {@link Invoke} layer and collects information about the
+ * current workflow run to be stored by the {@link ProvenanceConnector}.
+ *
+ * @author Ian Dunlop
+ * @author Stian Soiland-Reyes
+ *
+ */
+public class IntermediateProvenance extends AbstractDispatchLayer<String> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
+ private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
+
+ private ProvenanceReporter reporter;
+ private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
+ private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
+ private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
+
+ // private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
+ // private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
+
+ private WorkflowProvenanceItem workflowItem;
+
+ @Override
+ public void configure(String o) {
+ }
+
+ /**
+ * A set of provenance events for a particular owning process has been
+ * finished with so you can remove all the {@link IterationProvenanceItem}s
+ * from the map
+ */
+ @Override
+ public void finishedWith(String owningProcess) {
+ processToIndexes.remove(owningProcess);
+ }
+
+ @Override
+ public String getConfiguration() {
+ return null;
+ }
+
+ protected Map<String, IterationProvenanceItem> getIndexesByProcess(
+ String owningProcess) {
+ synchronized (processToIndexes) {
+ Map<String, IterationProvenanceItem> indexes = processToIndexes
+ .get(owningProcess);
+ if (indexes == null) {
+ indexes = new HashMap<>();
+ processToIndexes.put(owningProcess, indexes);
+ }
+ return indexes;
+ }
+ }
+
+ protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
+ String owningProcess = event.getOwningProcess();
+ int[] originalIndex = event.getIndex();
+ int[] index = event.getIndex();
+ String indexStr = indexStr(index);
+ Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
+ IterationProvenanceItem iterationProvenanceItem = null;
+ synchronized (indexes) {
+ // find the iteration item for the int index eg [1]
+ iterationProvenanceItem = indexes.get(indexStr);
+ // if it is null then strip the index and look again
+
+ while (iterationProvenanceItem == null) {
+ try {
+ index = removeLastIndex(index);
+ iterationProvenanceItem = indexes.get(indexStr(index));
+ /*
+ * if we have a 'parent' iteration then create a new
+ * iteration for the original index and link it to the
+ * activity and the input data
+ *
+ * FIXME should this be linked to the parent iteration
+ * instead?
+ */
+ if (iterationProvenanceItem != null) {
+ // set the index to the one from the event
+ IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
+ iterationProvenanceItem1.setIteration(originalIndex);
+ iterationProvenanceItem1.setProcessId(owningProcess);
+ iterationProvenanceItem1.setIdentifier(UUID
+ .randomUUID().toString());
+ iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
+ iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
+ iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
+ iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());
+
+// for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
+// .entrySet()) {
+// List<Object> value = entrySet.getValue();
+// int[] newIndex = (int[]) value.get(0);
+// String owner = (String) value.get(1);
+// String indexString = indexStr(newIndex);
+// String indexString2 = indexStr(index);
+//
+// if (owningProcess.equalsIgnoreCase(owner)
+// && indexString
+// .equalsIgnoreCase(indexString2))
+// iterationProvenanceItem1.setParentId(entrySet
+// .getKey().getIdentifier());
+// }
+// for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
+// .entrySet()) {
+// List<Object> value = entrySet.getValue();
+// int[] newIndex = (int[]) value.get(0);
+// String owner = (String) value.get(1);
+// String indexString = indexStr(newIndex);
+// String indexString2 = indexStr(index);
+// if (owningProcess.equalsIgnoreCase(owner)
+// && indexString
+// .equalsIgnoreCase(indexString2))
+// iterationProvenanceItem1
+// .setInputDataItem(entrySet.getKey());
+// }
+
+ // for (ActivityProvenanceItem item :
+ // activityProvenanceItemList) {
+ // if (owningProcess.equalsIgnoreCase(item
+ // .getProcessId())) {
+ // iterationProvenanceItem1.setParentId(item
+ // .getIdentifier());
+ // }
+ // }
+ // for (InputDataProvenanceItem item :
+ // inputDataProvenanceItemList) {
+ // if (owningProcess.equalsIgnoreCase(item
+ // .getProcessId())) {
+ // iterationProvenanceItem1.setInputDataItem(item);
+ // }
+ // indexes.put(indexStr, iterationProvenanceItem1);
+ // return iterationProvenanceItem1;
+ // // }
+ // }
+
+ // add this new iteration item to the map
+ getIndexesByProcess(event.getOwningProcess()).put(
+ indexStr(event.getIndex()),
+ iterationProvenanceItem1);
+ return iterationProvenanceItem1;
+ }
+ /*
+ * if we have not found an iteration items and the index is
+ * [] then something is wrong remove the last index in the
+ * int array before we go back through the while
+ */
+ } catch (IllegalStateException e) {
+ logger
+ .warn("Cannot find a parent iteration with index [] for owning process: "
+ + owningProcess
+ + "Workflow invocation is in an illegal state");
+ throw e;
+ }
+ }
+
+ // if (iterationProvenanceItem == null) {
+ // logger.info("Iteration item was null for: "
+ // + event.getOwningProcess() + " " + event.getIndex());
+ // System.out.println("Iteration item was null for: "
+ // + event.getOwningProcess() + " " + event.getIndex());
+ // iterationProvenanceItem = new IterationProvenanceItem(index);
+ // iterationProvenanceItem.setProcessId(owningProcess);
+ // iterationProvenanceItem.setIdentifier(UUID.randomUUID()
+ // .toString());
+ // // for (ActivityProvenanceItem
+ // item:activityProvenanceItemList)
+ // // {
+ // // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
+ // // iterationProvenanceItem.setParentId(item.getIdentifier());
+ // // }
+ // // }
+ // // for (InputDataProvenanceItem item:
+ // // inputDataProvenanceItemList) {
+ // // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
+ // // iterationProvenanceItem.setInputDataItem(item);
+ // // }
+ // // }
+ // indexes.put(indexStr, iterationProvenanceItem);
+
+ }
+ return iterationProvenanceItem;
+ }
+
+ private String indexStr(int[] index) {
+ StringBuilder indexStr = new StringBuilder();
+ for (int ind : index)
+ indexStr.append(":").append(ind);
+ return indexStr.toString();
+ }
+
+ /**
+ * Remove the last index of the int array in the form 1:2:3 etc
+ *
+ * @param index
+ * @return
+ */
+ @SuppressWarnings("unused")
+ private String[] stripLastIndex(int[] index) {
+ // will be in form :1:2:3
+ return indexStr(index).split(":");
+ }
+
+ /**
+ * Remove the last value in the int array
+ *
+ * @param index
+ * @return
+ */
+ private int[] removeLastIndex(int[] index) {
+ if (index.length == 0)
+ throw new IllegalStateException(
+ "There is no parent iteration of index [] for this result");
+ int[] newIntArray = new int[index.length - 1];
+ for (int i = 0; i < index.length - 1; i++)
+ newIntArray[i] = index[i];
+ return newIntArray;
+ }
+
+ private static String uuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ /**
+ * Create an {@link ErrorProvenanceItem} and send across to the
+ * {@link ProvenanceConnector}
+ */
+ @Override
+ public void receiveError(DispatchErrorEvent errorEvent) {
+ IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
+ // get using errorEvent.getOwningProcess();
+
+ ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
+ errorItem.setCause(errorEvent
+ .getCause());
+ errorItem.setErrorType(errorEvent
+ .getFailureType().toString());
+ errorItem.setMessage(errorEvent.getMessage());
+
+ errorItem.setProcessId(errorEvent.getOwningProcess());
+ errorItem.setIdentifier(uuid());
+ errorItem.setParentId(iterationProvItem.getIdentifier());
+ // iterationProvItem.setErrorItem(errorItem);
+ // FIXME don't need to add to the processor item earlier
+ getReporter().addProvenanceItem(errorItem);
+ super.receiveError(errorEvent);
+ }
+
+ /**
+ * Create the {@link ProvenanceItem}s and send them all across to the
+ * {@link ProvenanceConnector} except for the
+ * {@link IterationProvenanceItem}, this one is told what it's inputs are
+ * but has to wait until the results are received before being sent across.
+ * Each item has a unique identifier and also knows who its parent item is
+ */
+ @Override
+ public void receiveJob(DispatchJobEvent jobEvent) {
+ try {
+ // FIXME do we need this ProcessProvenanceItem?
+ ProcessProvenanceItem provenanceItem;
+ String[] split = jobEvent.getOwningProcess().split(":");
+ provenanceItem = new ProcessProvenanceItem();
+ String parentDataflowId = workflowItem.getParentId();
+ provenanceItem.setWorkflowId(parentDataflowId);
+ provenanceItem.setFacadeID(split[0]);
+ provenanceItem.setDataflowID(split[1]);
+ provenanceItem.setProcessId(jobEvent.getOwningProcess());
+ provenanceItem.setIdentifier(uuid());
+ provenanceItem.setParentId(workflowItem.getIdentifier());
+ ProcessorProvenanceItem processorProvItem;
+ processorProvItem = new ProcessorProvenanceItem();
+ processorProvItem.setWorkflowId(parentDataflowId);
+ processorProvItem.setProcessId(jobEvent
+ .getOwningProcess());
+ processorProvItem.setIdentifier(uuid());
+ processorProvItem.setParentId(provenanceItem.getIdentifier());
+ provenanceItem.setProcessId(jobEvent.getOwningProcess());
+ getReporter().addProvenanceItem(provenanceItem);
+ getReporter().addProvenanceItem(processorProvItem);
+
+ IterationProvenanceItem iterationProvItem = null;
+ iterationProvItem = new IterationProvenanceItem();
+ iterationProvItem.setWorkflowId(parentDataflowId);
+ iterationProvItem.setIteration(jobEvent.getIndex());
+ iterationProvItem.setIdentifier(uuid());
+
+ ReferenceService referenceService = jobEvent.getContext()
+ .getReferenceService();
+
+ InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
+ inputDataItem.setDataMap(jobEvent.getData());
+ inputDataItem.setReferenceService(referenceService);
+ inputDataItem.setIdentifier(uuid());
+ inputDataItem.setParentId(iterationProvItem.getIdentifier());
+ inputDataItem.setProcessId(jobEvent.getOwningProcess());
+
+ List<Object> inputIndexOwnerList = new ArrayList<>();
+ inputIndexOwnerList.add(jobEvent.getIndex());
+ inputIndexOwnerList.add(jobEvent.getOwningProcess());
+ inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
+
+ // inputDataProvenanceItemList.add(inputDataItem);
+ iterationProvItem.setInputDataItem(inputDataItem);
+ iterationProvItem.setIteration(jobEvent.getIndex());
+ iterationProvItem.setProcessId(jobEvent.getOwningProcess());
+
+ for (Activity<?> activity : jobEvent.getActivities())
+ if (activity instanceof AsynchronousActivity) {
+ ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
+ activityProvItem.setWorkflowId(parentDataflowId);
+ activityProvItem.setIdentifier(uuid());
+ iterationProvItem.setParentId(activityProvItem.getIdentifier());
+ // getConnector().addProvenanceItem(iterationProvItem);
+ activityProvItem.setParentId(processorProvItem.getIdentifier());
+ // processorProvItem.setActivityProvenanceItem(activityProvItem);
+ activityProvItem.setProcessId(jobEvent.getOwningProcess());
+ List<Object> activityIndexOwnerList = new ArrayList<>();
+ activityIndexOwnerList.add(jobEvent.getOwningProcess());
+ activityIndexOwnerList.add(jobEvent.getIndex());
+ activityProvenanceItemMap.put(activityProvItem,
+ inputIndexOwnerList);
+ // activityProvenanceItemList.add(activityProvItem);
+ // activityProvItem.setIterationProvenanceItem(iterationProvItem);
+ getReporter().addProvenanceItem(activityProvItem);
+ break;
+ }
+ getIndexesByProcess(jobEvent.getOwningProcess()).put(
+ indexStr(jobEvent.getIndex()), iterationProvItem);
+ iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
+ getReporter().addProvenanceItem(iterationProvItem);
+ } catch (RuntimeException ex) {
+ logger.error("Could not store provenance for " + jobEvent, ex);
+ }
+
+ super.receiveJob(jobEvent);
+ }
+
+ @Override
+ public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+ super.receiveJobQueue(jobQueueEvent);
+ }
+
+ /**
+ * Populate an {@link OutputDataProvenanceItem} with the results and attach
+ * it to the appropriate {@link IterationProvenanceItem}. Then send the
+ * {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
+ */
+ @Override
+ public void receiveResult(DispatchResultEvent resultEvent) {
+ try {
+ // FIXME use the connector from the result event context
+ IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
+ iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
+
+ ReferenceService referenceService = resultEvent.getContext()
+ .getReferenceService();
+
+ OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
+ outputDataItem.setDataMap(resultEvent.getData());
+ outputDataItem.setReferenceService(referenceService);
+ outputDataItem.setIdentifier(uuid());
+ outputDataItem.setProcessId(resultEvent.getOwningProcess());
+ outputDataItem.setParentId(iterationProvItem.getIdentifier());
+ iterationProvItem.setOutputDataItem(outputDataItem);
+
+ getReporter().addProvenanceItem(iterationProvItem);
+ // getConnector().addProvenanceItem(outputDataItem);
+
+ // PM -- testing
+ // add xencoding of data value here??
+ // Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
+ // for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
+ // // create a simpler bean that we can serialize?
+ //
+ // T2Reference ref = entry.getValue();
+ //
+ // SimplerT2Reference t2RefBean = new SimplerT2Reference();
+ // t2RefBean.setReferenceType(ref.getReferenceType());
+ // t2RefBean.setDepth(ref.getDepth());
+ // t2RefBean.setLocalPart(ref.getLocalPart());
+ // t2RefBean.setNamespacePart(ref.getNamespacePart());
+ //
+ // System.out.println("data ref: "+ref);
+ // String serializedInput = SerializeParam(t2RefBean);
+ // System.out.println("serialized reference:" + serializedInput);
+ //
+ // System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
+// }
+ } catch (Exception ex) {
+ logger.error("Could not store provenance for "
+ + resultEvent.getOwningProcess() + " "
+ + Arrays.toString(resultEvent.getIndex()), ex);
+ // But don't break super.receiveResult() !!
+ }
+ super.receiveResult(resultEvent);
+ }
+
+ @Override
+ public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
+ super.receiveResultCompletion(completionEvent);
+ }
+
+ /**
+ * Tell this layer what {@link ProvenanceConnector} implementation is being
+ * used to capture the {@link ProvenanceItem}s. NOTE: should probably use
+ * the connector from the result events context where possible
+ *
+ * @param connector
+ */
+ public void setReporter(ProvenanceReporter connector) {
+ this.reporter = connector;
+ }
+
+ public ProvenanceReporter getReporter() {
+ return reporter;
+ }
+
+ /**
+ * So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
+ * enacted this layer has to know about the {@link WorkflowProvenanceItem}
+ *
+ * @param workflowItem
+ */
+ public void setWorkflow(WorkflowProvenanceItem workflowItem) {
+ this.workflowItem = workflowItem;
+ }
+
+ // TODO is this unused?
+ public static String SerializeParam(Object ParamValue) {
+ ByteArrayOutputStream BStream = new ByteArrayOutputStream();
+ XMLEncoder encoder = new XMLEncoder(BStream);
+ encoder.writeObject(ParamValue);
+ encoder.close();
+ return BStream.toString();
+ }
+
+ // TODO is this unused?
+ public static Object DeserializeParam(String SerializedParam) {
+ InputStream IStream = new ByteArrayInputStream(
+ SerializedParam.getBytes());
+ XMLDecoder decoder = new XMLDecoder(IStream);
+ Object output = decoder.readObject();
+ decoder.close();
+ return output;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
new file mode 100644
index 0000000..f8403df
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
@@ -0,0 +1,369 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.ERROR;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT_COMPLETION;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.sql.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import net.sf.taverna.t2.invocation.InvocationContext;
+import net.sf.taverna.t2.monitor.MonitorManager;
+import net.sf.taverna.t2.monitor.MonitorableProperty;
+import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
+import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
+import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.reference.T2Reference;
+import net.sf.taverna.t2.workflowmodel.ControlBoundary;
+import net.sf.taverna.t2.workflowmodel.OutputPort;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
+import net.sf.taverna.t2.workflowmodel.processor.activity.MonitorableAsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Context free invoker layer, does not pass index arrays of jobs into activity
+ * instances.
+ * <p>
+ * This layer will invoke the first invokable activity in the activity list, so
+ * any sane dispatch stack will have narrowed this down to a single item list by
+ * this point, i.e. by the insertion of a failover layer.
+ * <p>
+ * Currently only handles activities implementing {@link AsynchronousActivity}.
+ *
+ * @author Tom Oinn
+ * @author Stian Soiland-Reyes
+ *
+ */
+@DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {})
+@ControlBoundary
+public class Invoke extends AbstractDispatchLayer<JsonNode> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke";
+ private static Logger logger = Logger.getLogger(Invoke.class);
+ private static Long invocationCount = 0L;
+
+ private MonitorManager monMan;
+
+ private static String getNextProcessID() {
+ long count;
+ synchronized (invocationCount) {
+ count = ++invocationCount;
+ }
+ return "invocation" + count;
+ }
+
+ public Invoke() {
+ super();
+ monMan = MonitorManager.getInstance();
+ }
+
+ @Override
+ public void configure(JsonNode config) {
+ // No configuration, do nothing
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ return null;
+ }
+
+ /**
+ * Receive a job from the layer above and pick the first concrete activity
+ * from the list to invoke. Invoke this activity, creating a callback which
+ * will wrap up the result messages in the appropriate collection depth
+ * before sending them on (in general activities are not aware of their
+ * invocation context and should not be responsible for providing correct
+ * index arrays for results)
+ * <p>
+ * This layer will invoke the first invokable activity in the activity list,
+ * so any sane dispatch stack will have narrowed this down to a single item
+ * list by this point, i.e. by the insertion of a failover layer.
+ */
+ @Override
+ public void receiveJob(final DispatchJobEvent jobEvent) {
+ for (Activity<?> activity : jobEvent.getActivities())
+ if (activity instanceof AsynchronousActivity) {
+ invoke(jobEvent, (AsynchronousActivity<?>) activity);
+ break;
+ }
+ }
+
+ protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
+ // Register with the monitor
+ final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
+ getNextProcessID()).getOwningProcess();
+ monMan.registerNode(activity, invocationProcessIdentifier,
+ new HashSet<MonitorableProperty<?>>());
+ monMan.registerNode(jobEvent, invocationProcessIdentifier,
+ new HashSet<MonitorableProperty<?>>());
+
+ /*
+ * The activity is an AsynchronousActivity so we invoke it with an
+ * AsynchronousActivityCallback object containing appropriate callback
+ * methods to push results, completions and failures back to the
+ * invocation layer.
+ *
+ * Get the registered DataManager for this process. In most cases this
+ * will just be a single DataManager for the entire workflow system but
+ * it never hurts to generalize
+ */
+
+ InvocationContext context = jobEvent.getContext();
+ final ReferenceService refService = context.getReferenceService();
+
+ InvocationStartedProvenanceItem invocationItem = null;
+ ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
+ if (provenanceReporter != null) {
+ IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
+ if (intermediateProvenance != null) {
+ invocationItem = new InvocationStartedProvenanceItem();
+ IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
+ invocationItem.setIdentifier(UUID.randomUUID().toString());
+ invocationItem.setActivity(activity);
+ invocationItem.setProcessId(jobEvent.getOwningProcess());
+ invocationItem.setInvocationProcessId(invocationProcessIdentifier);
+ invocationItem.setParentId(parentItem.getIdentifier());
+ invocationItem.setWorkflowId(parentItem.getWorkflowId());
+ invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
+ provenanceReporter.addProvenanceItem(invocationItem);
+ }
+ }
+
+ /*
+ * Create a Map of EntityIdentifiers named appropriately given the
+ * activity mapping
+ */
+ Map<String, T2Reference> inputData = new HashMap<>();
+ for (String inputName : jobEvent.getData().keySet()) {
+ String activityInputName = activity
+ .getInputPortMapping().get(inputName);
+ if (activityInputName != null)
+ inputData.put(activityInputName, jobEvent.getData()
+ .get(inputName));
+ }
+
+ /*
+ * Create a callback object to receive events, completions and failure
+ * notifications from the activity
+ */
+ AsynchronousActivityCallback callback = new InvokeCallBack(
+ jobEvent, refService, invocationProcessIdentifier,
+ activity);
+
+ if (activity instanceof MonitorableAsynchronousActivity<?>) {
+ /*
+ * Monitorable activity so get the monitorable properties and push
+ * them into the state tree after launching the job
+ */
+ MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
+ Set<MonitorableProperty<?>> props = maa
+ .executeAsynchWithMonitoring(inputData, callback);
+ monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
+ } else {
+ /*
+ * Run the job, passing in the callback we've just created along
+ * with the (possibly renamed) input data map
+ */
+ activity.executeAsynch(inputData, callback);
+ }
+ }
+
+ protected IntermediateProvenance findIntermediateProvenance() {
+ for (DispatchLayer<?> layer : getProcessor().getDispatchStack()
+ .getLayers())
+ if (layer instanceof IntermediateProvenance)
+ return (IntermediateProvenance) layer;
+ return null;
+ }
+
+ protected class InvokeCallBack implements AsynchronousActivityCallback {
+ protected final AsynchronousActivity<?> activity;
+ protected final String invocationProcessIdentifier;
+ protected final DispatchJobEvent jobEvent;
+ protected final ReferenceService refService;
+ protected boolean sentJob = false;
+
+ protected InvokeCallBack(DispatchJobEvent jobEvent,
+ ReferenceService refService,
+ String invocationProcessIdentifier,
+ AsynchronousActivity<?> asyncActivity) {
+ this.jobEvent = jobEvent;
+ this.refService = refService;
+ this.invocationProcessIdentifier = invocationProcessIdentifier;
+ this.activity = asyncActivity;
+ }
+
+ @Override
+ public void fail(String message) {
+ fail(message, null);
+ }
+
+ @Override
+ public void fail(String message, Throwable t) {
+ fail(message, t, DispatchErrorType.INVOCATION);
+ }
+
+ @Override
+ public void fail(String message, Throwable t,
+ DispatchErrorType errorType) {
+ logger.warn("Failed (" + errorType + ") invoking " + activity
+ + " for job " + jobEvent + ": " + message, t);
+ monMan.deregisterNode(
+ invocationProcessIdentifier);
+ getAbove().receiveError(
+ new DispatchErrorEvent(jobEvent.getOwningProcess(),
+ jobEvent.getIndex(), jobEvent.getContext(),
+ message, t, errorType, activity));
+ }
+
+ @Override
+ public InvocationContext getContext() {
+ return jobEvent.getContext();
+ }
+
+ @Override
+ public String getParentProcessIdentifier() {
+ return invocationProcessIdentifier;
+ }
+
+ @Override
+ public void receiveCompletion(int[] completionIndex) {
+ if (completionIndex.length == 0)
+ // Final result, clean up monitor state
+ monMan.deregisterNode(invocationProcessIdentifier);
+ if (sentJob) {
+ int[] newIndex;
+ if (completionIndex.length == 0)
+ newIndex = jobEvent.getIndex();
+ else {
+ newIndex = new int[jobEvent.getIndex().length
+ + completionIndex.length];
+ int i = 0;
+ for (int indexValue : jobEvent.getIndex())
+ newIndex[i++] = indexValue;
+ for (int indexValue : completionIndex)
+ newIndex[i++] = indexValue;
+ }
+ DispatchCompletionEvent c = new DispatchCompletionEvent(
+ jobEvent.getOwningProcess(), newIndex, jobEvent
+ .getContext());
+ getAbove().receiveResultCompletion(c);
+ } else {
+ /*
+ * We haven't sent any 'real' data prior to completing a stream.
+ * This in effect means we're sending an empty top level
+ * collection so we need to register empty collections for each
+ * output port with appropriate depth (by definition if we're
+ * streaming all outputs are collection types of some kind)
+ */
+ Map<String, T2Reference> emptyListMap = new HashMap<>();
+ for (OutputPort op : activity.getOutputPorts()) {
+ String portName = op.getName();
+ int portDepth = op.getDepth();
+ emptyListMap.put(portName, refService.getListService()
+ .registerEmptyList(portDepth, jobEvent.getContext()).getId());
+ }
+ receiveResult(emptyListMap, new int[0]);
+ }
+ }
+
+ @Override
+ public void receiveResult(Map<String, T2Reference> data, int[] index) {
+ /*
+ * Construct a new result map using the activity mapping (activity
+ * output name to processor output name)
+ */
+ Map<String, T2Reference> resultMap = new HashMap<>();
+ for (String outputName : data.keySet()) {
+ String processorOutputName = activity
+ .getOutputPortMapping().get(outputName);
+ if (processorOutputName != null)
+ resultMap.put(processorOutputName, data.get(outputName));
+ }
+ /*
+ * Construct a new index array if the specified index is non zero
+ * length, otherwise just use the original job's index array (means
+ * we're not streaming)
+ */
+ int[] newIndex;
+ boolean streaming = false;
+ if (index.length == 0)
+ newIndex = jobEvent.getIndex();
+ else {
+ streaming = true;
+ newIndex = new int[jobEvent.getIndex().length + index.length];
+ int i = 0;
+ for (int indexValue : jobEvent.getIndex())
+ newIndex[i++] = indexValue;
+ for (int indexValue : index)
+ newIndex[i++] = indexValue;
+ }
+ DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent
+ .getOwningProcess(), newIndex, jobEvent.getContext(),
+ resultMap, streaming);
+ if (!streaming) {
+ monMan.registerNode(resultEvent, invocationProcessIdentifier,
+ new HashSet<MonitorableProperty<?>>());
+ // Final result, clean up monitor state
+ monMan.deregisterNode(invocationProcessIdentifier);
+ }
+ // Push the modified data to the layer above in the dispatch stack
+ getAbove().receiveResult(resultEvent);
+
+ sentJob = true;
+ }
+
+ @Override
+ public void requestRun(Runnable runMe) {
+ String newThreadName = jobEvent.toString();
+ Thread thread = new Thread(runMe, newThreadName);
+ thread.setContextClassLoader(activity.getClass()
+ .getClassLoader());
+ thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ fail("Uncaught exception while invoking " + activity, e);
+ }
+ });
+ thread.start();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
new file mode 100644
index 0000000..d5077a4
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
@@ -0,0 +1,424 @@
+/*******************************************************************************
+ * Copyright (C) 2008 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import net.sf.taverna.t2.invocation.InvocationContext;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.reference.T2Reference;
+import net.sf.taverna.t2.workflowmodel.Processor;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
+import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.AbstractDispatchEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+
+/**
+ * A layer that allows while-style loops.
+ * <p>
+ * The layer is configured with a {@link LoopConfiguration}, where an activity
+ * has been set as the
+ * {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity)
+ * condition}.
+ * <p>
+ * After a job has been successful further down the dispatch stack, the loop
+ * layer will invoke the conditional activity to determine if the job will be
+ * invoked again. If {@link LoopConfiguration#isRunFirst()} is false, this test
+ * will be performed even before the first invocation. (The default
+ * runFirst=true is equivalent to a do..while construct, while runFirst=false is
+ * equivalent to a while.. construct.)
+ * <p>
+ * A job will be resent down the dispatch stack only if the conditional activity
+ * returns a reference to a string equal to "true" on its output port "loop".
+ * <p>
+ * If a job or the conditional activity fails, the while-loop is interrupted and
+ * the error is sent further up.
+ * <p>
+ * Note that the LoopLayer will be invoked for each item in an iteration, if you
+ * want to do the loop for the whole collection (ie. re-iterating if the
+ * loop-condition fails after processing the full list) - create a nested
+ * workflow with the desired depths on it's input ports and insert this
+ * LoopLayer in the stack of the nested workflow's processor in parent workflow.
+ * <p>
+ * It is recommended that the LoopLayer is to be inserted after the
+ * {@link ErrorBounce} layer, as this layer is needed for registering errors
+ * produced by the LoopLayer. If the user requires {@link Retry retries} and
+ * {@link Failover failovers} before checking the while condition, such layers
+ * should be below LoopLayer.
+ *
+ * @author Stian Soiland-Reyes
+ */
+// FIXME Doesn't work
+@SuppressWarnings({"unchecked","rawtypes"})
+public class Loop extends AbstractDispatchLayer<JsonNode> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop";
+ private static Logger logger = Logger.getLogger(Loop.class);
+
+ private JsonNode config = JsonNodeFactory.instance.objectNode();
+
+ protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>();
+ protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>();
+
+ @Override
+ public void configure(JsonNode config) {
+ this.config = config;
+ }
+
+ @Override
+ public void finishedWith(String owningProcess) {
+ String prefix = owningProcess + "[";
+ synchronized (outgoingJobs) {
+ for (String key : new ArrayList<>(outgoingJobs.keySet()))
+ if (key.startsWith(prefix))
+ outgoingJobs.remove(key);
+ }
+ synchronized (incomingJobs) {
+ for (String key : new ArrayList<>(incomingJobs.keySet()))
+ if (key.startsWith(prefix))
+ incomingJobs.remove(key);
+ }
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ return config;
+ }
+
+ @Override
+ public void receiveJob(DispatchJobEvent jobEvent) {
+ synchronized (incomingJobs) {
+ incomingJobs.put(jobIdentifier(jobEvent), jobEvent);
+ }
+ if (config.get("runFirst").asBoolean()) {
+ // We'll do the conditional in receiveResult instead
+ super.receiveJob(jobEvent);
+ return;
+ }
+ checkCondition(jobEvent);
+ }
+
+ @Override
+ public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+ synchronized (incomingJobs) {
+ incomingJobs.put(jobIdentifier(jobQueueEvent), jobQueueEvent);
+ }
+ if (config.get("runFirst").asBoolean()) {
+ // We'll do the conditional in receiveResult instead
+ super.receiveJobQueue(jobQueueEvent);
+ return;
+ }
+ checkCondition(jobQueueEvent);
+ }
+
+ private Activity<?> getCondition() {
+ //return config.getCondition();
+ return null;
+ }
+
+ @Override
+ public void receiveResult(DispatchResultEvent resultEvent) {
+ Activity<?> condition = getCondition();
+ if (condition == null) {
+ super.receiveResult(resultEvent);
+ return;
+ }
+ synchronized (outgoingJobs) {
+ outgoingJobs.put(jobIdentifier(resultEvent), resultEvent);
+ }
+ checkCondition(resultEvent);
+ }
+
+ @Override
+ public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
+ Activity<?> condition = getCondition();
+ if (condition == null) {
+ super.receiveResultCompletion(completionEvent);
+ return;
+ }
+ synchronized (outgoingJobs) {
+ outgoingJobs.put(jobIdentifier(completionEvent), completionEvent);
+ }
+ checkCondition(completionEvent);
+ }
+
+ private void checkCondition(AbstractDispatchEvent event) {
+ Activity<?> condition = getCondition();
+ if (condition == null) {
+ super.receiveError(new DispatchErrorEvent(event.getOwningProcess(),
+ event.getIndex(), event.getContext(),
+ "Can't invoke condition service: null", null,
+ DispatchErrorType.INVOCATION, condition));
+ return;
+ }
+ if (!(condition instanceof AbstractAsynchronousActivity)) {
+ DispatchErrorEvent errorEvent = new DispatchErrorEvent(
+ event.getOwningProcess(),
+ event.getIndex(),
+ event.getContext(),
+ "Can't invoke condition service "
+ + condition
+ + " is not an instance of AbstractAsynchronousActivity",
+ null, DispatchErrorType.INVOCATION, condition);
+ super.receiveError(errorEvent);
+ return;
+ }
+ AbstractAsynchronousActivity asyncCondition = (AbstractAsynchronousActivity) condition;
+ String jobIdentifier = jobIdentifier(event);
+ Map<String, T2Reference> inputs = prepareInputs(asyncCondition,
+ jobIdentifier);
+ AsynchronousActivityCallback callback = new ConditionCallBack(
+ jobIdentifier);
+ asyncCondition.executeAsynch(inputs, callback);
+ }
+
+ private Map<String, T2Reference> prepareInputs(
+ AbstractAsynchronousActivity asyncCondition, String jobIdentifier) {
+ Map<String, T2Reference> inputs = new HashMap<>();
+ Map<String, T2Reference> inData = getInData(jobIdentifier);
+ Map<String, T2Reference> outData = getOutData(jobIdentifier);
+
+ Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts();
+ for (ActivityInputPort conditionIn : inputPorts) {
+ String conditionPort = conditionIn.getName();
+ if (outData.containsKey(conditionPort))
+ // Copy from previous output
+ inputs.put(conditionPort, outData.get(conditionPort));
+ else if (inData.containsKey(conditionPort))
+ // Copy from original input
+ inputs.put(conditionPort, inData.get(conditionPort));
+ }
+ return inputs;
+ }
+
+ private Map<String, T2Reference> getInData(String jobIdentifier) {
+ AbstractDispatchEvent inEvent;
+ synchronized (incomingJobs) {
+ inEvent = incomingJobs.get(jobIdentifier);
+ }
+ Map<String, T2Reference> inData = new HashMap<>();
+ if (inEvent instanceof DispatchJobEvent)
+ inData = ((DispatchJobEvent) inEvent).getData();
+ return inData;
+ }
+
+ private Map<String, T2Reference> getOutData(String jobIdentifier) {
+ AbstractDispatchEvent outEvent;
+ synchronized (outgoingJobs) {
+ outEvent = outgoingJobs.get(jobIdentifier);
+ }
+ Map<String, T2Reference> outData = new HashMap<>();
+ if (outEvent instanceof DispatchResultEvent)
+ outData = ((DispatchResultEvent) outEvent).getData();
+ return outData;
+ }
+
+ private String jobIdentifier(AbstractDispatchEvent event) {
+ String jobId = event.getOwningProcess()
+ + Arrays.toString(event.getIndex());
+ return jobId;
+ }
+
+ public static final String LOOP_PORT = "loop";
+
+ public class ConditionCallBack implements AsynchronousActivityCallback {
+ private InvocationContext context;
+ private final String jobIdentifier;
+ private String processId;
+
+ public ConditionCallBack(String jobIdentifier) {
+ this.jobIdentifier = jobIdentifier;
+ AbstractDispatchEvent originalEvent;
+ synchronized (incomingJobs) {
+ originalEvent = incomingJobs.get(jobIdentifier);
+ }
+ context = originalEvent.getContext();
+ processId = originalEvent.getOwningProcess() + ":condition";
+ }
+
+ @Override
+ public void fail(String message) {
+ fail(message, null, DispatchErrorType.INVOCATION);
+ }
+
+ @Override
+ public void fail(String message, Throwable t) {
+ fail(message, t, DispatchErrorType.INVOCATION);
+ }
+
+ @Override
+ public void fail(String message, Throwable t,
+ DispatchErrorType errorType) {
+ logger.warn("Failed (" + errorType + ") invoking condition service "
+ + jobIdentifier + ":" + message, t);
+
+ AbstractDispatchEvent originalEvent;
+ synchronized (incomingJobs) {
+ originalEvent = incomingJobs.get(jobIdentifier);
+ }
+ receiveError(new DispatchErrorEvent(originalEvent
+ .getOwningProcess(), originalEvent.getIndex(),
+ originalEvent.getContext(),
+ "Can't invoke condition service ", t,
+ DispatchErrorType.INVOCATION, null));
+ }
+
+ @Override
+ public InvocationContext getContext() {
+ return context;
+ }
+
+ @Override
+ public String getParentProcessIdentifier() {
+ return processId;
+ }
+
+ @Override
+ public void receiveCompletion(int[] completionIndex) {
+ // Ignore streaming
+ }
+
+ @Override
+ public void receiveResult(Map<String, T2Reference> data, int[] index) {
+ if (index.length > 0) {
+ // Ignore streaming
+ return;
+ }
+ T2Reference loopRef = data.get(LOOP_PORT);
+ if (loopRef == null) {
+ fail("Condition service didn't contain output port " + LOOP_PORT);
+ return;
+ }
+ if (loopRef.containsErrors()) {
+ fail("Condition service failed: " + loopRef);
+ return;
+ }
+ if (loopRef.getDepth() != 0) {
+ fail("Condition service output " + LOOP_PORT
+ + " depth is not 0, but " + loopRef.getDepth());
+ }
+ ReferenceService referenceService = context.getReferenceService();
+ String loop = (String) referenceService.renderIdentifier(loopRef,
+ String.class, context);
+
+ if (Boolean.parseBoolean(loop)) {
+ // Push it down again
+ AbstractDispatchEvent dispatchEvent;
+ synchronized (incomingJobs) {
+ dispatchEvent = incomingJobs.get(jobIdentifier);
+ }
+ if (dispatchEvent == null) {
+ fail("Unknown job identifier " + jobIdentifier);
+ }
+ if (dispatchEvent instanceof DispatchJobEvent) {
+ DispatchJobEvent newJobEvent = prepareNewJobEvent(data,
+ dispatchEvent);
+ getBelow().receiveJob(newJobEvent);
+ } else if (dispatchEvent instanceof DispatchJobQueueEvent) {
+ getBelow().receiveJobQueue(
+ (DispatchJobQueueEvent) dispatchEvent);
+ } else {
+ fail("Unknown type of incoming event " + dispatchEvent);
+ }
+ return;
+
+ } else {
+ // We'll push it up, end of loop for now
+
+ AbstractDispatchEvent outgoingEvent;
+ synchronized (outgoingJobs) {
+ outgoingEvent = outgoingJobs.get(jobIdentifier);
+ }
+ if (outgoingEvent == null && !config.get("runFirst").asBoolean()) {
+ fail("Initial loop condition failed");
+ }
+ if (outgoingEvent instanceof DispatchCompletionEvent) {
+ getAbove().receiveResultCompletion(
+ (DispatchCompletionEvent) outgoingEvent);
+ } else if (outgoingEvent instanceof DispatchResultEvent) {
+ getAbove().receiveResult(
+ (DispatchResultEvent) outgoingEvent);
+ } else {
+ fail("Unknown type of outgoing event " + outgoingEvent);
+ }
+ }
+
+ }
+
+ private DispatchJobEvent prepareNewJobEvent(
+ Map<String, T2Reference> data,
+ AbstractDispatchEvent dispatchEvent) {
+ DispatchJobEvent dispatchJobEvent = (DispatchJobEvent) dispatchEvent;
+ Map<String, T2Reference> newInputs = new HashMap<String, T2Reference>(
+ dispatchJobEvent.getData());
+ newInputs.putAll(data);
+ DispatchJobEvent newJobEvent = new DispatchJobEvent(dispatchEvent
+ .getOwningProcess(), dispatchEvent.getIndex(),
+ dispatchEvent.getContext(), newInputs,
+ ((DispatchJobEvent) dispatchEvent).getActivities());
+ /*
+ * TODO: Should this be registered as an incomingJobs? If so the
+ * conditional could even feed to itself, and we should also keep a
+ * list of originalJobs.
+ */
+ return newJobEvent;
+ }
+
+ @Override
+ public void requestRun(Runnable runMe) {
+ String newThreadName = "Condition service "
+ + getParentProcessIdentifier();
+ Thread thread = new Thread(runMe, newThreadName);
+ thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ fail("Uncaught exception while invoking " + jobIdentifier,
+ e);
+ }
+ });
+ thread.start();
+ }
+ }
+
+ @Override
+ public Processor getProcessor() {
+ if (dispatchStack == null)
+ return null;
+ return dispatchStack.getProcessor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
new file mode 100644
index 0000000..7cfa2a5
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
@@ -0,0 +1,75 @@
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.util.Properties;
+
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
+
+/**
+ * Configuration bean for the {@link Loop}.
+ * <p>
+ * Set the {@link #setCondition(Activity)} for an activity with an output port
+ * called "loop". The LoopLayer will re-send a job only if this port exist and
+ * it's output can be dereferenced to a string equal to "true".
+ * </p>
+ * <p>
+ * If {@link #isRunFirst()} is false, the loop layer will check the condition
+ * before invoking the job for the first time, otherwise the condition will be
+ * invoked after the job has come back with successful results.
+ * </p>
+ *
+ * @author Stian Soiland-Reyes
+ *
+ */
+@ConfigurationBean(uri = Loop.URI + "#Config")
+public class LoopConfiguration implements Cloneable {
+ private Activity<?> condition = null;
+ private Boolean runFirst;
+ private Properties properties;
+
+ public Properties getProperties() {
+ synchronized (this) {
+ if (properties == null)
+ properties = new Properties();
+ }
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public LoopConfiguration clone() {
+ LoopConfiguration clone;
+ try {
+ clone = (LoopConfiguration) super.clone();
+ clone.condition = null;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Unexpected CloneNotSupportedException",
+ e);
+ }
+ return clone;
+ }
+
+ public Activity<?> getCondition() {
+ return condition;
+ }
+
+ public boolean isRunFirst() {
+ if (runFirst == null)
+ return true;
+ return runFirst;
+ }
+
+ @ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required = false)
+ public void setCondition(Activity<?> activity) {
+ this.condition = activity;
+ }
+
+ @ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required = false)
+ public void setRunFirst(boolean runFirst) {
+ this.runFirst = runFirst;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
new file mode 100644
index 0000000..bd0e69a
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
@@ -0,0 +1,463 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import net.sf.taverna.t2.invocation.Completion;
+import net.sf.taverna.t2.invocation.IterationInternalEvent;
+import net.sf.taverna.t2.monitor.MonitorManager;
+import net.sf.taverna.t2.monitor.MonitorableProperty;
+import net.sf.taverna.t2.monitor.NoSuchPropertyException;
+import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.NotifiableLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobQueueReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Dispatch layer which consumes a queue of events and fires off a fixed number
+ * of simultaneous jobs to the layer below. It observes failure, data and
+ * completion events coming up and uses these to determine when to push more
+ * jobs downwards into the stack as well as when it can safely emit completion
+ * events from the queue.
+ *
+ * @author Tom Oinn
+ *
+ */
+@DispatchLayerErrorReaction(emits = {}, relaysUnmodified = true, stateEffects = {
+ REMOVE_PROCESS_STATE, NO_EFFECT })
+@DispatchLayerJobQueueReaction(emits = { JOB }, relaysUnmodified = false, stateEffects = { CREATE_PROCESS_STATE })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {
+ REMOVE_PROCESS_STATE, NO_EFFECT })
+@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {
+ REMOVE_PROCESS_STATE, NO_EFFECT })
+@SupportsStreamedResult
+public class Parallelize extends AbstractDispatchLayer<JsonNode>
+ implements NotifiableLayer,
+ PropertyContributingDispatchLayer<JsonNode> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Parallelize";
+ private static Logger logger = Logger.getLogger(Parallelize.class);
+
+ private Map<String, StateModel> stateMap = new HashMap<>();
+ private JsonNode config = JsonNodeFactory.instance.objectNode();
+ int sentJobsCount = 0;
+ int completedJobsCount = 0;
+
+ public Parallelize() {
+ super();
+ }
+
+ /**
+ * Test constructor, only used by unit tests, should probably not be public
+ * access here?
+ *
+ * @param maxJobs
+ */
+ public Parallelize(int maxJobs) {
+ super();
+ ((ObjectNode)config).put("maxJobs", maxJobs);
+ }
+
+ @Override
+ public void eventAdded(String owningProcess) {
+ StateModel stateModel;
+ synchronized (stateMap) {
+ stateModel = stateMap.get(owningProcess);
+ }
+ if (stateModel == null)
+ /*
+ * Should never see this here, it means we've had duplicate
+ * completion events from upstream
+ */
+ throw new WorkflowStructureException(
+ "Unknown owning process " + owningProcess);
+ synchronized (stateModel) {
+ stateModel.fillFromQueue();
+ }
+ }
+
+ @Override
+ public void receiveJobQueue(DispatchJobQueueEvent queueEvent) {
+ StateModel model = new StateModel(queueEvent,
+ config.has("maxJobs") ? config.get("maxJobs").intValue() : 1);
+ synchronized (stateMap) {
+ stateMap.put(queueEvent.getOwningProcess(), model);
+ }
+ model.fillFromQueue();
+ }
+
+ public void receiveJob(Job job, List<? extends Activity<?>> activities) {
+ throw new WorkflowStructureException(
+ "Parallelize layer cannot handle job events");
+ }
+
+ @Override
+ public void receiveError(DispatchErrorEvent errorEvent) {
+ StateModel model;
+ String owningProcess = errorEvent.getOwningProcess();
+ synchronized(stateMap) {
+ model = stateMap.get(owningProcess);
+ }
+ if (model == null) {
+ logger.warn("Error received for unknown owning process: " + owningProcess);
+ return;
+ }
+ model.finishWith(errorEvent.getIndex());
+ getAbove().receiveError(errorEvent);
+ }
+
+ @Override
+ public void receiveResult(DispatchResultEvent resultEvent) {
+ StateModel model;
+ String owningProcess = resultEvent.getOwningProcess();
+ synchronized(stateMap) {
+ model = stateMap.get(owningProcess);
+ }
+ if (model == null) {
+ logger.warn("Error received for unknown owning process: " + owningProcess);
+ return;
+ }
+ if (!resultEvent.isStreamingEvent()) {
+ MonitorManager.getInstance().registerNode(resultEvent,
+ owningProcess,
+ new HashSet<MonitorableProperty<?>>());
+ }
+ model.finishWith(resultEvent.getIndex());
+ getAbove().receiveResult(resultEvent);
+ }
+
+ /**
+ * Only going to receive this if the activity invocation was streaming, in
+ * which case we need to handle all completion events and pass them up the
+ * stack.
+ */
+ @Override
+ public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
+ StateModel model;
+ String owningProcess = completionEvent.getOwningProcess();
+ synchronized(stateMap) {
+ model = stateMap.get(owningProcess);
+ }
+ if (model == null) {
+ logger.warn("Error received for unknown owning process: " + owningProcess);
+ return;
+ }
+ model.finishWith(completionEvent.getIndex());
+ getAbove().receiveResultCompletion(completionEvent);
+ }
+
+ @Override
+ public void finishedWith(final String owningProcess) {
+ // Delay the removal of the state to give the monitor a chance to poll
+ cleanupTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized(stateMap) {
+ stateMap.remove(owningProcess);
+ }
+ }
+ }, CLEANUP_DELAY_MS);
+ }
+
+ @Override
+ public void configure(JsonNode config) {
+ this.config = config;
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ return this.config;
+ }
+
+ /**
+ * Injects the following properties into its parent processor's property set:
+ * <ul>
+ * <li><code>dispatch.parallelize.queuesize [Integer]</code><br/>The current
+ * size of the incomming job queue, or -1 if the state isn't defined for the
+ * registered process identifier (which will be the case if the process
+ * hasn't started or has had its state purged after a final completion of
+ * some kind.</li>
+ * </ul>
+ */
+ @Override
+ public void injectPropertiesFor(final String owningProcess) {
+ /**
+ * Property for the queue depth, will evaluate to -1 if there isn't a
+ * queue in the state model for this identifier (which will be the case
+ * if we haven't created the state yet or the queue has been collected)
+ */
+ MonitorableProperty<Integer> queueSizeProperty = new MonitorableProperty<Integer>() {
+ @Override
+ public Date getLastModified() {
+ return new Date();
+ }
+
+ @Override
+ public String[] getName() {
+ return new String[] { "dispatch", "parallelize", "queuesize" };
+ }
+
+ @Override
+ public Integer getValue() throws NoSuchPropertyException {
+ StateModel model;
+ synchronized(stateMap) {
+ model = stateMap.get(owningProcess);
+ }
+ if (model == null)
+ return -1;
+ return model.queueSize();
+ }
+ };
+ dispatchStack.receiveMonitorableProperty(queueSizeProperty,
+ owningProcess);
+
+ MonitorableProperty<Integer> sentJobsProperty = new MonitorableProperty<Integer>() {
+ @Override
+ public Date getLastModified() {
+ return new Date();
+ }
+
+ @Override
+ public String[] getName() {
+ return new String[] { "dispatch", "parallelize", "sentjobs" };
+ }
+
+ @Override
+ public Integer getValue() throws NoSuchPropertyException {
+ return sentJobsCount;
+ }
+ };
+ dispatchStack.receiveMonitorableProperty(sentJobsProperty,
+ owningProcess);
+
+ MonitorableProperty<Integer> completedJobsProperty = new MonitorableProperty<Integer>() {
+ @Override
+ public Date getLastModified() {
+ return new Date();
+ }
+
+ @Override
+ public String[] getName() {
+ return new String[] { "dispatch", "parallelize",
+ "completedjobs" };
+ }
+
+ @Override
+ public Integer getValue() throws NoSuchPropertyException {
+ return completedJobsCount;
+ }
+ };
+ dispatchStack.receiveMonitorableProperty(completedJobsProperty,
+ owningProcess);
+ }
+
+ /**
+ * Holds the state for a given owning process
+ *
+ * @author Tom Oinn
+ *
+ */
+ // suppressed to avoid jdk1.5 error messages caused by the declaration
+ // IterationInternalEvent<? extends IterationInternalEvent<?>> e
+ @SuppressWarnings("rawtypes")
+ class StateModel {
+ private DispatchJobQueueEvent queueEvent;
+ private BlockingQueue<IterationInternalEvent> pendingEvents = new LinkedBlockingQueue<>();
+ private int activeJobs = 0;
+ private int maximumJobs;
+
+ /**
+ * Construct state model for a particular owning process
+ *
+ * @param owningProcess
+ * Process to track parallel execution
+ * @param queue
+ * reference to the queue into which jobs are inserted by the
+ * iteration strategy
+ * @param activities
+ * activities to pass along with job events down into the
+ * stack below
+ * @param maxJobs
+ * maximum number of concurrent jobs to keep 'hot' at any
+ * given point
+ */
+ protected StateModel(DispatchJobQueueEvent queueEvent, int maxJobs) {
+ this.queueEvent = queueEvent;
+ this.maximumJobs = maxJobs;
+ }
+
+ Integer queueSize() {
+ return queueEvent.getQueue().size();
+ }
+
+ /**
+ * Poll the queue repeatedly until either the queue is empty or we have
+ * enough jobs pulled from it. The semantics for this are:
+ * <ul>
+ * <li>If the head of the queue is a Job and activeJobs < maximumJobs
+ * then increment activeJobs, add the Job to the pending events list at
+ * the end and send the message down the stack
+ * <li>If the head of the queue is a Completion and the pending jobs
+ * list is empty then send it to the layer above
+ * <li>If the head of the queue is a Completion and the pending jobs
+ * list is not empty then add the Completion to the end of the pending
+ * jobs list and return
+ * </ul>
+ */
+ protected void fillFromQueue() {
+ synchronized (this) {
+ while (queueEvent.getQueue().peek() != null
+ && activeJobs < maximumJobs) {
+ final IterationInternalEvent e = queueEvent.getQueue()
+ .remove();
+
+ if (e instanceof Completion && pendingEvents.peek() == null) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ getAbove().receiveResultCompletion(
+ new DispatchCompletionEvent(e
+ .getOwningProcess(), e
+ .getIndex(), e.getContext()));
+ }
+ }, "Parallelize " + e.getOwningProcess()).start();
+ // getAbove().receiveResultCompletion((Completion) e);
+ } else {
+ pendingEvents.add(e);
+ }
+ if (e instanceof Job) {
+ synchronized (this) {
+ activeJobs++;
+ }
+ sentJobsCount++;
+
+ DispatchJobEvent dispatchJobEvent = new DispatchJobEvent(e
+ .getOwningProcess(), e
+ .getIndex(), e.getContext(),
+ ((Job) e).getData(), queueEvent
+ .getActivities());
+ // Register with the monitor
+ MonitorManager.getInstance().registerNode(dispatchJobEvent,
+ e.getOwningProcess(),
+ new HashSet<MonitorableProperty<?>>());
+
+ getBelow().receiveJob(dispatchJobEvent);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns true if the index matched an existing Job exactly, if this
+ * method returns false then you have a partial completion event which
+ * should be sent up the stack without modification.
+ *
+ * @param index
+ * @return
+ */
+ protected boolean finishWith(int[] index) {
+ synchronized (this) {
+ for (IterationInternalEvent e : new ArrayList<>(pendingEvents)) {
+ if (!(e instanceof Job))
+ continue;
+ Job j = (Job) e;
+ if (!arrayEquals(j.getIndex(), index))
+ continue;
+
+ /*
+ * Found a job in the pending events list which has the
+ * same index, remove it and decrement the current count
+ * of active jobs
+ */
+ pendingEvents.remove(e);
+ activeJobs--;
+ completedJobsCount++;
+ /*
+ * Now pull any completion events that have reached the head
+ * of the queue - this indicates that all the job events
+ * which came in before them have been processed and we can
+ * emit the completions
+ */
+ while (pendingEvents.peek() != null
+ && pendingEvents.peek() instanceof Completion) {
+ Completion c = (Completion) pendingEvents.remove();
+ getAbove().receiveResultCompletion(
+ new DispatchCompletionEvent(c
+ .getOwningProcess(), c.getIndex(), c
+ .getContext()));
+ }
+ /*
+ * Refresh from the queue; as we've just decremented the
+ * active job count there should be a worker available
+ */
+ fillFromQueue();
+ /*
+ * Return true to indicate that we removed a job event from
+ * the queue, that is to say that the index wasn't that of a
+ * partial completion.
+ */
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean arrayEquals(int[] a, int[] b) {
+ if (a.length != b.length)
+ return false;
+ for (int i = 0; i < a.length; i++)
+ if (a[i] != b[i])
+ return false;
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
new file mode 100644
index 0000000..29d69d6
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
+
+/**
+ * Bean to hold the configuration for the parallelize layer, specifically a
+ * single int property defining the number of concurrent jobs in that processor
+ * instance per owning process ID.
+ *
+ * @author Tom Oinn
+ */
+@ConfigurationBean(uri = Parallelize.URI + "#Config")
+public class ParallelizeConfig {
+ private int maxJobs;
+
+ public ParallelizeConfig() {
+ super();
+ this.maxJobs = 1;
+ }
+
+ @ConfigurationProperty(name = "maxJobs", label = "Maximum Parallel Jobs", description = "The maximum number of jobs that can run in parallel", required = false)
+ public void setMaximumJobs(int maxJobs) {
+ this.maxJobs = maxJobs;
+ }
+
+ public int getMaximumJobs() {
+ return this.maxJobs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
new file mode 100644
index 0000000..f2054d9
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
@@ -0,0 +1,180 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
+
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+
+/**
+ * Implements retry policy with delay between retries and exponential backoff
+ * <p>
+ * Default properties are as follows :
+ * <ul>
+ * <li>maxRetries = 0 (int)</li>
+ * <li>initialDelay = 1000 (milliseconds)</li>
+ * <li>maxDelay = 2000 (milliseconds)</li>
+ * <li>backoffFactor = 1.0 (double)</li>
+ * </ul>
+ *
+ * @author Tom Oinn
+ * @author David Withers
+ * @author Stian Soiland-Reyes
+ */
+@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
+ UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
+@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
+public class Retry extends AbstractErrorHandlerLayer<JsonNode> {
+ private static final String BACKOFF_FACTOR = "backoffFactor";
+ private static final String MAX_DELAY = "maxDelay";
+ private static final String MAX_RETRIES = "maxRetries";
+ private static final String INITIAL_DELAY = "initialDelay";
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Retry";
+
+ private ObjectNode config;
+ private int maxRetries;
+ private int initialDelay;
+ private int maxDelay;
+ private double backoffFactor;
+
+ private static Timer retryTimer = new Timer("Retry timer", true);
+
+ public Retry() {
+ super();
+ configure(JsonNodeFactory.instance.objectNode());
+ }
+
+ public Retry(int maxRetries, int initialDelay, int maxDelay,
+ double backoffFactor) {
+ super();
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put(MAX_RETRIES, maxRetries);
+ conf.put(INITIAL_DELAY, initialDelay);
+ conf.put(MAX_DELAY, maxDelay);
+ conf.put(BACKOFF_FACTOR, backoffFactor);
+ configure(conf);
+ }
+
+ class RetryState extends JobState {
+ int currentRetryCount = 0;
+
+ public RetryState(DispatchJobEvent jobEvent) {
+ super(jobEvent);
+ }
+
+ /**
+ * Try to schedule a retry, returns true if a retry is scheduled, false
+ * if the retry count has already been reached (in which case no retry
+ * is scheduled
+ *
+ * @return
+ */
+ @Override
+ public boolean handleError() {
+ if (currentRetryCount >= maxRetries)
+ return false;
+ int delay = (int) (initialDelay * Math.pow(backoffFactor, currentRetryCount));
+ delay = Math.min(delay, maxDelay);
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ currentRetryCount++;
+ getBelow().receiveJob(jobEvent);
+ }
+ };
+ retryTimer.schedule(task, delay);
+ return true;
+ }
+ }
+
+ @Override
+ protected JobState getStateObject(DispatchJobEvent jobEvent) {
+ return new RetryState(jobEvent);
+ }
+
+ @Override
+ public void configure(JsonNode config) {
+ ObjectNode defaultConfig = defaultConfig();
+ setAllMissingFields((ObjectNode) config, defaultConfig);
+ checkConfig((ObjectNode)config);
+ this.config = (ObjectNode) config;
+ maxRetries = config.get(MAX_RETRIES).intValue();
+ initialDelay = config.get(INITIAL_DELAY).intValue();
+ maxDelay = config.get(MAX_DELAY).intValue();
+ backoffFactor = config.get(BACKOFF_FACTOR).doubleValue();
+ }
+
+ private void setAllMissingFields(ObjectNode config, ObjectNode defaults) {
+ for (String fieldName : forEach(defaults.fieldNames()))
+ if (! config.has(fieldName) || config.get(fieldName).isNull())
+ config.put(fieldName, defaults.get(fieldName));
+ }
+
+ private <T> Iterable<T> forEach(final Iterator<T> iterator) {
+ return new Iterable<T>() {
+ @Override
+ public Iterator<T> iterator() {
+ return iterator;
+ }
+ };
+ }
+
+ private void checkConfig(ObjectNode conf) {
+ if (conf.get(MAX_RETRIES).intValue() < 0)
+ throw new IllegalArgumentException("maxRetries < 0");
+ if (conf.get(INITIAL_DELAY).intValue() < 0)
+ throw new IllegalArgumentException("initialDelay < 0");
+ if (conf.get(MAX_DELAY).intValue() < conf.get(INITIAL_DELAY).intValue())
+ throw new IllegalArgumentException("maxDelay < initialDelay");
+ if (conf.get(BACKOFF_FACTOR).doubleValue() < 0.0)
+ throw new IllegalArgumentException("backoffFactor < 0.0");
+ }
+
+ public static ObjectNode defaultConfig() {
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put(MAX_RETRIES, 0);
+ conf.put(INITIAL_DELAY, 1000);
+ conf.put(MAX_DELAY, 5000);
+ conf.put(BACKOFF_FACTOR, 1.0);
+ return conf;
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ return this.config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
new file mode 100644
index 0000000..39dedbf
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
@@ -0,0 +1,97 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
+import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
+
+@ConfigurationBean(uri = Retry.URI + "#Config")
+public class RetryConfig {
+ private static final float BACKOFF_FACTOR = 1.0f;
+ private static final int MAX_DELAY = 5000;
+ private static final int INITIAL_DELAY = 1000;
+ private static final int MAX_RETRIES = 0;
+
+ private float backoffFactor = BACKOFF_FACTOR;
+ private int initialDelay = INITIAL_DELAY;
+ private int maxDelay = MAX_DELAY;
+ private int maxRetries = MAX_RETRIES;
+
+ /**
+ * Factor by which the initial delay is multiplied for each retry after the
+ * first, this allows for exponential backoff of retry times up to a certain
+ * ceiling
+ *
+ * @return
+ */
+ public float getBackoffFactor() {
+ return this.backoffFactor;
+ }
+
+ /**
+ * Delay in milliseconds between the initial failure message and the first
+ * attempt to retry the failed job
+ *
+ * @return
+ */
+ public int getInitialDelay() {
+ return this.initialDelay;
+ }
+
+ /**
+ * Maximum delay in milliseconds between failure reception and retry. This
+ * acts as a ceiling for the exponential backoff factor allowing the retry
+ * delay to initially increase to a certain value then remain constant after
+ * that point rather than exploding to unreasonable levels.
+ */
+ public int getMaxDelay() {
+ return this.maxDelay;
+ }
+
+ /**
+ * Maximum number of retries for a failing process
+ *
+ * @return
+ */
+ public int getMaxRetries() {
+ return this.maxRetries;
+ }
+
+ @ConfigurationProperty(name = "backoffFactor", label = "Backoff Factor", description = "Factor by which the initial delay is multiplied for each retry after the first retry", required=false)
+ public void setBackoffFactor(float factor) {
+ this.backoffFactor = factor;
+ }
+
+ @ConfigurationProperty(name = "initialDelay", label = "Initial Delay", description = "Delay in milliseconds between the initial failure message and the first attempt to retry the failed job", required=false)
+ public void setInitialDelay(int delay) {
+ this.initialDelay = delay;
+ }
+
+ @ConfigurationProperty(name = "maxDelay", label = "Maximum Delay", description = "Maximum delay in milliseconds between failure reception and retry", required=false)
+ public void setMaxDelay(int delay) {
+ this.maxDelay = delay;
+ }
+
+ @ConfigurationProperty(name = "maxRetries", label = "Maximum Retries", description = "Maximum number of retries for a failing process", required=false)
+ public void setMaxRetries(int max) {
+ this.maxRetries = max;
+ }
+}
[4/6] incubator-taverna-engine git commit:
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
Posted by st...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
deleted file mode 100644
index bd0e69a..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import net.sf.taverna.t2.invocation.Completion;
-import net.sf.taverna.t2.invocation.IterationInternalEvent;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.NotifiableLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobQueueReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Dispatch layer which consumes a queue of events and fires off a fixed number
- * of simultaneous jobs to the layer below. It observes failure, data and
- * completion events coming up and uses these to determine when to push more
- * jobs downwards into the stack as well as when it can safely emit completion
- * events from the queue.
- *
- * @author Tom Oinn
- *
- */
-@DispatchLayerErrorReaction(emits = {}, relaysUnmodified = true, stateEffects = {
- REMOVE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerJobQueueReaction(emits = { JOB }, relaysUnmodified = false, stateEffects = { CREATE_PROCESS_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {
- REMOVE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {
- REMOVE_PROCESS_STATE, NO_EFFECT })
-@SupportsStreamedResult
-public class Parallelize extends AbstractDispatchLayer<JsonNode>
- implements NotifiableLayer,
- PropertyContributingDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Parallelize";
- private static Logger logger = Logger.getLogger(Parallelize.class);
-
- private Map<String, StateModel> stateMap = new HashMap<>();
- private JsonNode config = JsonNodeFactory.instance.objectNode();
- int sentJobsCount = 0;
- int completedJobsCount = 0;
-
- public Parallelize() {
- super();
- }
-
- /**
- * Test constructor, only used by unit tests, should probably not be public
- * access here?
- *
- * @param maxJobs
- */
- public Parallelize(int maxJobs) {
- super();
- ((ObjectNode)config).put("maxJobs", maxJobs);
- }
-
- @Override
- public void eventAdded(String owningProcess) {
- StateModel stateModel;
- synchronized (stateMap) {
- stateModel = stateMap.get(owningProcess);
- }
- if (stateModel == null)
- /*
- * Should never see this here, it means we've had duplicate
- * completion events from upstream
- */
- throw new WorkflowStructureException(
- "Unknown owning process " + owningProcess);
- synchronized (stateModel) {
- stateModel.fillFromQueue();
- }
- }
-
- @Override
- public void receiveJobQueue(DispatchJobQueueEvent queueEvent) {
- StateModel model = new StateModel(queueEvent,
- config.has("maxJobs") ? config.get("maxJobs").intValue() : 1);
- synchronized (stateMap) {
- stateMap.put(queueEvent.getOwningProcess(), model);
- }
- model.fillFromQueue();
- }
-
- public void receiveJob(Job job, List<? extends Activity<?>> activities) {
- throw new WorkflowStructureException(
- "Parallelize layer cannot handle job events");
- }
-
- @Override
- public void receiveError(DispatchErrorEvent errorEvent) {
- StateModel model;
- String owningProcess = errorEvent.getOwningProcess();
- synchronized(stateMap) {
- model = stateMap.get(owningProcess);
- }
- if (model == null) {
- logger.warn("Error received for unknown owning process: " + owningProcess);
- return;
- }
- model.finishWith(errorEvent.getIndex());
- getAbove().receiveError(errorEvent);
- }
-
- @Override
- public void receiveResult(DispatchResultEvent resultEvent) {
- StateModel model;
- String owningProcess = resultEvent.getOwningProcess();
- synchronized(stateMap) {
- model = stateMap.get(owningProcess);
- }
- if (model == null) {
- logger.warn("Error received for unknown owning process: " + owningProcess);
- return;
- }
- if (!resultEvent.isStreamingEvent()) {
- MonitorManager.getInstance().registerNode(resultEvent,
- owningProcess,
- new HashSet<MonitorableProperty<?>>());
- }
- model.finishWith(resultEvent.getIndex());
- getAbove().receiveResult(resultEvent);
- }
-
- /**
- * Only going to receive this if the activity invocation was streaming, in
- * which case we need to handle all completion events and pass them up the
- * stack.
- */
- @Override
- public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
- StateModel model;
- String owningProcess = completionEvent.getOwningProcess();
- synchronized(stateMap) {
- model = stateMap.get(owningProcess);
- }
- if (model == null) {
- logger.warn("Error received for unknown owning process: " + owningProcess);
- return;
- }
- model.finishWith(completionEvent.getIndex());
- getAbove().receiveResultCompletion(completionEvent);
- }
-
- @Override
- public void finishedWith(final String owningProcess) {
- // Delay the removal of the state to give the monitor a chance to poll
- cleanupTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- synchronized(stateMap) {
- stateMap.remove(owningProcess);
- }
- }
- }, CLEANUP_DELAY_MS);
- }
-
- @Override
- public void configure(JsonNode config) {
- this.config = config;
- }
-
- @Override
- public JsonNode getConfiguration() {
- return this.config;
- }
-
- /**
- * Injects the following properties into its parent processor's property set:
- * <ul>
- * <li><code>dispatch.parallelize.queuesize [Integer]</code><br/>The current
- * size of the incomming job queue, or -1 if the state isn't defined for the
- * registered process identifier (which will be the case if the process
- * hasn't started or has had its state purged after a final completion of
- * some kind.</li>
- * </ul>
- */
- @Override
- public void injectPropertiesFor(final String owningProcess) {
- /**
- * Property for the queue depth, will evaluate to -1 if there isn't a
- * queue in the state model for this identifier (which will be the case
- * if we haven't created the state yet or the queue has been collected)
- */
- MonitorableProperty<Integer> queueSizeProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "parallelize", "queuesize" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- StateModel model;
- synchronized(stateMap) {
- model = stateMap.get(owningProcess);
- }
- if (model == null)
- return -1;
- return model.queueSize();
- }
- };
- dispatchStack.receiveMonitorableProperty(queueSizeProperty,
- owningProcess);
-
- MonitorableProperty<Integer> sentJobsProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "parallelize", "sentjobs" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- return sentJobsCount;
- }
- };
- dispatchStack.receiveMonitorableProperty(sentJobsProperty,
- owningProcess);
-
- MonitorableProperty<Integer> completedJobsProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "parallelize",
- "completedjobs" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- return completedJobsCount;
- }
- };
- dispatchStack.receiveMonitorableProperty(completedJobsProperty,
- owningProcess);
- }
-
- /**
- * Holds the state for a given owning process
- *
- * @author Tom Oinn
- *
- */
- // suppressed to avoid jdk1.5 error messages caused by the declaration
- // IterationInternalEvent<? extends IterationInternalEvent<?>> e
- @SuppressWarnings("rawtypes")
- class StateModel {
- private DispatchJobQueueEvent queueEvent;
- private BlockingQueue<IterationInternalEvent> pendingEvents = new LinkedBlockingQueue<>();
- private int activeJobs = 0;
- private int maximumJobs;
-
- /**
- * Construct state model for a particular owning process
- *
- * @param owningProcess
- * Process to track parallel execution
- * @param queue
- * reference to the queue into which jobs are inserted by the
- * iteration strategy
- * @param activities
- * activities to pass along with job events down into the
- * stack below
- * @param maxJobs
- * maximum number of concurrent jobs to keep 'hot' at any
- * given point
- */
- protected StateModel(DispatchJobQueueEvent queueEvent, int maxJobs) {
- this.queueEvent = queueEvent;
- this.maximumJobs = maxJobs;
- }
-
- Integer queueSize() {
- return queueEvent.getQueue().size();
- }
-
- /**
- * Poll the queue repeatedly until either the queue is empty or we have
- * enough jobs pulled from it. The semantics for this are:
- * <ul>
- * <li>If the head of the queue is a Job and activeJobs < maximumJobs
- * then increment activeJobs, add the Job to the pending events list at
- * the end and send the message down the stack
- * <li>If the head of the queue is a Completion and the pending jobs
- * list is empty then send it to the layer above
- * <li>If the head of the queue is a Completion and the pending jobs
- * list is not empty then add the Completion to the end of the pending
- * jobs list and return
- * </ul>
- */
- protected void fillFromQueue() {
- synchronized (this) {
- while (queueEvent.getQueue().peek() != null
- && activeJobs < maximumJobs) {
- final IterationInternalEvent e = queueEvent.getQueue()
- .remove();
-
- if (e instanceof Completion && pendingEvents.peek() == null) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- getAbove().receiveResultCompletion(
- new DispatchCompletionEvent(e
- .getOwningProcess(), e
- .getIndex(), e.getContext()));
- }
- }, "Parallelize " + e.getOwningProcess()).start();
- // getAbove().receiveResultCompletion((Completion) e);
- } else {
- pendingEvents.add(e);
- }
- if (e instanceof Job) {
- synchronized (this) {
- activeJobs++;
- }
- sentJobsCount++;
-
- DispatchJobEvent dispatchJobEvent = new DispatchJobEvent(e
- .getOwningProcess(), e
- .getIndex(), e.getContext(),
- ((Job) e).getData(), queueEvent
- .getActivities());
- // Register with the monitor
- MonitorManager.getInstance().registerNode(dispatchJobEvent,
- e.getOwningProcess(),
- new HashSet<MonitorableProperty<?>>());
-
- getBelow().receiveJob(dispatchJobEvent);
- }
- }
- }
- }
-
- /**
- * Returns true if the index matched an existing Job exactly, if this
- * method returns false then you have a partial completion event which
- * should be sent up the stack without modification.
- *
- * @param index
- * @return
- */
- protected boolean finishWith(int[] index) {
- synchronized (this) {
- for (IterationInternalEvent e : new ArrayList<>(pendingEvents)) {
- if (!(e instanceof Job))
- continue;
- Job j = (Job) e;
- if (!arrayEquals(j.getIndex(), index))
- continue;
-
- /*
- * Found a job in the pending events list which has the
- * same index, remove it and decrement the current count
- * of active jobs
- */
- pendingEvents.remove(e);
- activeJobs--;
- completedJobsCount++;
- /*
- * Now pull any completion events that have reached the head
- * of the queue - this indicates that all the job events
- * which came in before them have been processed and we can
- * emit the completions
- */
- while (pendingEvents.peek() != null
- && pendingEvents.peek() instanceof Completion) {
- Completion c = (Completion) pendingEvents.remove();
- getAbove().receiveResultCompletion(
- new DispatchCompletionEvent(c
- .getOwningProcess(), c.getIndex(), c
- .getContext()));
- }
- /*
- * Refresh from the queue; as we've just decremented the
- * active job count there should be a worker available
- */
- fillFromQueue();
- /*
- * Return true to indicate that we removed a job event from
- * the queue, that is to say that the index wasn't that of a
- * partial completion.
- */
- return true;
- }
- }
- return false;
- }
-
- private boolean arrayEquals(int[] a, int[] b) {
- if (a.length != b.length)
- return false;
- for (int i = 0; i < a.length; i++)
- if (a[i] != b[i])
- return false;
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
deleted file mode 100644
index 29d69d6..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-/**
- * Bean to hold the configuration for the parallelize layer, specifically a
- * single int property defining the number of concurrent jobs in that processor
- * instance per owning process ID.
- *
- * @author Tom Oinn
- */
-@ConfigurationBean(uri = Parallelize.URI + "#Config")
-public class ParallelizeConfig {
- private int maxJobs;
-
- public ParallelizeConfig() {
- super();
- this.maxJobs = 1;
- }
-
- @ConfigurationProperty(name = "maxJobs", label = "Maximum Parallel Jobs", description = "The maximum number of jobs that can run in parallel", required = false)
- public void setMaximumJobs(int maxJobs) {
- this.maxJobs = maxJobs;
- }
-
- public int getMaximumJobs() {
- return this.maxJobs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
deleted file mode 100644
index f2054d9..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.Iterator;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-
-/**
- * Implements retry policy with delay between retries and exponential backoff
- * <p>
- * Default properties are as follows :
- * <ul>
- * <li>maxRetries = 0 (int)</li>
- * <li>initialDelay = 1000 (milliseconds)</li>
- * <li>maxDelay = 2000 (milliseconds)</li>
- * <li>backoffFactor = 1.0 (double)</li>
- * </ul>
- *
- * @author Tom Oinn
- * @author David Withers
- * @author Stian Soiland-Reyes
- */
-@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
- UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
-@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
-public class Retry extends AbstractErrorHandlerLayer<JsonNode> {
- private static final String BACKOFF_FACTOR = "backoffFactor";
- private static final String MAX_DELAY = "maxDelay";
- private static final String MAX_RETRIES = "maxRetries";
- private static final String INITIAL_DELAY = "initialDelay";
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Retry";
-
- private ObjectNode config;
- private int maxRetries;
- private int initialDelay;
- private int maxDelay;
- private double backoffFactor;
-
- private static Timer retryTimer = new Timer("Retry timer", true);
-
- public Retry() {
- super();
- configure(JsonNodeFactory.instance.objectNode());
- }
-
- public Retry(int maxRetries, int initialDelay, int maxDelay,
- double backoffFactor) {
- super();
- ObjectNode conf = JsonNodeFactory.instance.objectNode();
- conf.put(MAX_RETRIES, maxRetries);
- conf.put(INITIAL_DELAY, initialDelay);
- conf.put(MAX_DELAY, maxDelay);
- conf.put(BACKOFF_FACTOR, backoffFactor);
- configure(conf);
- }
-
- class RetryState extends JobState {
- int currentRetryCount = 0;
-
- public RetryState(DispatchJobEvent jobEvent) {
- super(jobEvent);
- }
-
- /**
- * Try to schedule a retry, returns true if a retry is scheduled, false
- * if the retry count has already been reached (in which case no retry
- * is scheduled
- *
- * @return
- */
- @Override
- public boolean handleError() {
- if (currentRetryCount >= maxRetries)
- return false;
- int delay = (int) (initialDelay * Math.pow(backoffFactor, currentRetryCount));
- delay = Math.min(delay, maxDelay);
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- currentRetryCount++;
- getBelow().receiveJob(jobEvent);
- }
- };
- retryTimer.schedule(task, delay);
- return true;
- }
- }
-
- @Override
- protected JobState getStateObject(DispatchJobEvent jobEvent) {
- return new RetryState(jobEvent);
- }
-
- @Override
- public void configure(JsonNode config) {
- ObjectNode defaultConfig = defaultConfig();
- setAllMissingFields((ObjectNode) config, defaultConfig);
- checkConfig((ObjectNode)config);
- this.config = (ObjectNode) config;
- maxRetries = config.get(MAX_RETRIES).intValue();
- initialDelay = config.get(INITIAL_DELAY).intValue();
- maxDelay = config.get(MAX_DELAY).intValue();
- backoffFactor = config.get(BACKOFF_FACTOR).doubleValue();
- }
-
- private void setAllMissingFields(ObjectNode config, ObjectNode defaults) {
- for (String fieldName : forEach(defaults.fieldNames()))
- if (! config.has(fieldName) || config.get(fieldName).isNull())
- config.put(fieldName, defaults.get(fieldName));
- }
-
- private <T> Iterable<T> forEach(final Iterator<T> iterator) {
- return new Iterable<T>() {
- @Override
- public Iterator<T> iterator() {
- return iterator;
- }
- };
- }
-
- private void checkConfig(ObjectNode conf) {
- if (conf.get(MAX_RETRIES).intValue() < 0)
- throw new IllegalArgumentException("maxRetries < 0");
- if (conf.get(INITIAL_DELAY).intValue() < 0)
- throw new IllegalArgumentException("initialDelay < 0");
- if (conf.get(MAX_DELAY).intValue() < conf.get(INITIAL_DELAY).intValue())
- throw new IllegalArgumentException("maxDelay < initialDelay");
- if (conf.get(BACKOFF_FACTOR).doubleValue() < 0.0)
- throw new IllegalArgumentException("backoffFactor < 0.0");
- }
-
- public static ObjectNode defaultConfig() {
- ObjectNode conf = JsonNodeFactory.instance.objectNode();
- conf.put(MAX_RETRIES, 0);
- conf.put(INITIAL_DELAY, 1000);
- conf.put(MAX_DELAY, 5000);
- conf.put(BACKOFF_FACTOR, 1.0);
- return conf;
- }
-
- @Override
- public JsonNode getConfiguration() {
- return this.config;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
deleted file mode 100644
index 39dedbf..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-@ConfigurationBean(uri = Retry.URI + "#Config")
-public class RetryConfig {
- private static final float BACKOFF_FACTOR = 1.0f;
- private static final int MAX_DELAY = 5000;
- private static final int INITIAL_DELAY = 1000;
- private static final int MAX_RETRIES = 0;
-
- private float backoffFactor = BACKOFF_FACTOR;
- private int initialDelay = INITIAL_DELAY;
- private int maxDelay = MAX_DELAY;
- private int maxRetries = MAX_RETRIES;
-
- /**
- * Factor by which the initial delay is multiplied for each retry after the
- * first, this allows for exponential backoff of retry times up to a certain
- * ceiling
- *
- * @return
- */
- public float getBackoffFactor() {
- return this.backoffFactor;
- }
-
- /**
- * Delay in milliseconds between the initial failure message and the first
- * attempt to retry the failed job
- *
- * @return
- */
- public int getInitialDelay() {
- return this.initialDelay;
- }
-
- /**
- * Maximum delay in milliseconds between failure reception and retry. This
- * acts as a ceiling for the exponential backoff factor allowing the retry
- * delay to initially increase to a certain value then remain constant after
- * that point rather than exploding to unreasonable levels.
- */
- public int getMaxDelay() {
- return this.maxDelay;
- }
-
- /**
- * Maximum number of retries for a failing process
- *
- * @return
- */
- public int getMaxRetries() {
- return this.maxRetries;
- }
-
- @ConfigurationProperty(name = "backoffFactor", label = "Backoff Factor", description = "Factor by which the initial delay is multiplied for each retry after the first retry", required=false)
- public void setBackoffFactor(float factor) {
- this.backoffFactor = factor;
- }
-
- @ConfigurationProperty(name = "initialDelay", label = "Initial Delay", description = "Delay in milliseconds between the initial failure message and the first attempt to retry the failed job", required=false)
- public void setInitialDelay(int delay) {
- this.initialDelay = delay;
- }
-
- @ConfigurationProperty(name = "maxDelay", label = "Maximum Delay", description = "Maximum delay in milliseconds between failure reception and retry", required=false)
- public void setMaxDelay(int delay) {
- this.maxDelay = delay;
- }
-
- @ConfigurationProperty(name = "maxRetries", label = "Maximum Retries", description = "Maximum number of retries for a failing process", required=false)
- public void setMaxRetries(int max) {
- this.maxRetries = max;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
deleted file mode 100644
index 3169f8c..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- *
- */
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.reference.WorkflowRunIdEntity;
-import net.sf.taverna.t2.workflowmodel.ConfigurationException;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * This layer allows for the cancellation, pausing and resuming of workflow
- * runs. It does so by intercepting jobs sent to the layer.
- *
- * @author alanrw
- */
-public class Stop extends AbstractDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop";
- /**
- * The set of ids of workflow runs that have been cancelled.
- */
- private static Set<String> cancelledWorkflowRuns = new HashSet<>();
- /**
- * A map from workflow run ids to the set of Stop layers where jobs have
- * been intercepted for that run.
- */
- private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<>();
- /**
- * A map for a given Stop from ids of suspended workflow runs to the jobs
- * that have been intercepted.
- */
- private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<>();
-
- @Override
- public void configure(JsonNode conf) throws ConfigurationException {
- // nothing
- }
-
- @Override
- public JsonNode getConfiguration() {
- return null;
- }
-
- @Override
- public void receiveJob(final DispatchJobEvent jobEvent) {
- List<WorkflowRunIdEntity> entities = jobEvent.getContext().getEntities(
- WorkflowRunIdEntity.class);
- if (entities != null && !entities.isEmpty()) {
- final String wfRunId = entities.get(0).getWorkflowRunId();
- // If the workflow run is cancelled then simply "eat" the jobEvent.
- // This does a hard-cancel.
- if (cancelledWorkflowRuns.contains(wfRunId))
- return;
- // If the workflow run is paused
- if (pausedLayerMap.containsKey(wfRunId))
- synchronized (Stop.class) {
- // double check as pausedLayerMap may have been changed
- // waiting for the lock
- if (pausedLayerMap.containsKey(wfRunId)) {
- // Remember that this Stop layer was affected by the
- // workflow pause
- pausedLayerMap.get(wfRunId).add(this);
- if (!suspendedJobEventMap.containsKey(wfRunId))
- suspendedJobEventMap.put(wfRunId,
- new HashSet<DispatchJobEvent>());
- // Remember the suspended jobEvent
- suspendedJobEventMap.get(wfRunId).add(jobEvent);
- return;
- }
- }
- }
- // By default pass the jobEvent down to the next layer
- super.receiveJob(jobEvent);
- }
-
- @Override
- public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
- super.receiveJobQueue(jobQueueEvent);
- }
-
- /**
- * Cancel the workflow run with the specified id
- *
- * @param workflowRunId
- * The id of the workflow run to cancel
- * @return If the workflow run was cancelled then true. If it was already
- * cancelled then false.
- */
- public static synchronized boolean cancelWorkflow(String workflowRunId) {
- if (cancelledWorkflowRuns.contains(workflowRunId))
- return false;
- Set<String> cancelledWorkflowRunsCopy = new HashSet<>(
- cancelledWorkflowRuns);
- cancelledWorkflowRunsCopy.add(workflowRunId);
- cancelledWorkflowRuns = cancelledWorkflowRunsCopy;
- return true;
- }
-
- /**
- * Pause the workflow run with the specified id
- *
- * @param workflowRunId
- * The id of the workflow run to pause
- * @return If the workflow run was paused then true. If it was already
- * paused or cancelled then false.
- */
- public static synchronized boolean pauseWorkflow(String workflowRunId) {
- if (cancelledWorkflowRuns.contains(workflowRunId))
- return false;
- if (pausedLayerMap.containsKey(workflowRunId))
- return false;
- Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>(pausedLayerMap);
- pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
- pausedLayerMap = pausedLayerMapCopy;
- return true;
- }
-
- /**
- * Resume the workflow run with the specified id
- *
- * @param workflowRunId
- * The id of the workflow run to resume
- * @return If the workflow run was resumed then true. If the workflow run
- * was not paused or it was cancelled, then false.
- */
- public static synchronized boolean resumeWorkflow(String workflowRunId) {
- if (cancelledWorkflowRuns.contains(workflowRunId))
- return false;
- if (!pausedLayerMap.containsKey(workflowRunId))
- return false;
- Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>();
- pausedLayerMapCopy.putAll(pausedLayerMap);
- Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
- pausedLayerMap = pausedLayerMapCopy;
- for (Stop s : stops)
- s.resumeLayerWorkflow(workflowRunId);
- return true;
- }
-
- /**
- * Resume the workflow run with the specified id on this Stop layer. This
- * method processes any suspended job events.
- *
- * @param workflowRunId
- * The id of the workflow run to resume.
- */
- private void resumeLayerWorkflow(String workflowRunId) {
- synchronized (Stop.class) {
- for (DispatchJobEvent dje : suspendedJobEventMap
- .remove(workflowRunId))
- receiveJob(dje);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
deleted file mode 100644
index fe6e73f..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
+++ /dev/null
@@ -1,4 +0,0 @@
-<body>
-Contains implementations of DispatchLayer defined by the core Taverna 2
-specification.
-</body>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml b/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
deleted file mode 100644
index dc55fc7..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans:beans xmlns="http://www.springframework.org/schema/osgi" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:beans="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/osgi
- http://www.springframework.org/schema/osgi/spring-osgi.xsd" >
-
- <service ref="coreDispatchLayerFactory" interface="net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory" />
-
-</beans:beans>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml b/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
deleted file mode 100644
index 90ed75f..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
+++ /dev/null
@@ -1,9 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd">
-
- <bean id="coreDispatchLayerFactory" class="net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.CoreDispatchLayerFactory" />
-
-</beans>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java b/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
deleted file mode 100644
index e202df1..0000000
--- a/taverna-workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import static org.junit.Assert.*;
-
-public class TestRetry {
-
- @Test
- public void defaultConfig() throws Exception {
- Retry retry = new Retry();
- JsonNode configuration = retry.getConfiguration();
- assertEquals(0, configuration.get("maxRetries").intValue());
- assertEquals(1000, configuration.get("initialDelay").intValue());
- assertEquals(5000, configuration.get("maxDelay").intValue());
- assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
- }
-
- @Test
- public void customConfig() throws Exception {
- Retry retry = new Retry(15, 150, 1200, 1.2);
- JsonNode configuration = retry.getConfiguration();
- assertEquals(15, configuration.get("maxRetries").intValue());
- assertEquals(150, configuration.get("initialDelay").intValue());
- assertEquals(1200, configuration.get("maxDelay").intValue());
- assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
- }
-
- @Test
- public void configureEmpty() throws Exception {
- Retry retry = new Retry(15, 150, 1200, 1.2);
- JsonNode empty = JsonNodeFactory.instance.objectNode();
- retry.configure(empty);
- // We would expect missing values to be replaced with the
- // DEFAULT values rather than the previous values
- JsonNode configuration = retry.getConfiguration();
- assertEquals(0, configuration.get("maxRetries").intValue());
- assertEquals(1000, configuration.get("initialDelay").intValue());
- assertEquals(5000, configuration.get("maxDelay").intValue());
- assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
- }
-
- @Test
- public void configurePartly() throws Exception {
- Retry retry = new Retry(15, 150, 1200, 1.2);
- ObjectNode conf = JsonNodeFactory.instance.objectNode();
- conf.put("maxRetries", 15);
- conf.put("backoffFactor", 1.2);
- retry.configure(conf);
- // We would expect to see the new values
- JsonNode configuration = retry.getConfiguration();
- assertEquals(15, configuration.get("maxRetries").intValue());
- assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
- // And the default values (not the previous values!)
- assertEquals(1000, configuration.get("initialDelay").intValue());
- assertEquals(5000, configuration.get("maxDelay").intValue());
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void invalidMaxRetries() throws Exception {
- Retry retry = new Retry();
- ObjectNode conf = JsonNodeFactory.instance.objectNode();
- conf.put("maxRetries", -15);
- retry.configure(conf);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void invalidInitialDelay() throws Exception {
- Retry retry = new Retry();
- ObjectNode conf = JsonNodeFactory.instance.objectNode();
- conf.put("initialDelay", -15);
- retry.configure(conf);
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void invalidMaxDelay() throws Exception {
- Retry retry = new Retry();
- ObjectNode conf = JsonNodeFactory.instance.objectNode();
- conf.put("maxDelay", 150);
- // Valid on its own, but less than the default initialDelay of 1000!
- retry.configure(conf);
- }
-
-
- @Test
- public void invalidConfigureRecovers() throws Exception {
- Retry retry = new Retry(15, 150, 1200, 1.2);
- ObjectNode conf = JsonNodeFactory.instance.objectNode();
- conf.put("maxRetries", -15);
- try {
- retry.configure(conf);
- } catch (IllegalArgumentException ex) {
- // As expected
- }
- // We would expect the earlier values to persist
- JsonNode configuration = retry.getConfiguration();
- assertEquals(15, configuration.get("maxRetries").intValue());
- assertEquals(150, configuration.get("initialDelay").intValue());
- assertEquals(1200, configuration.get("maxDelay").intValue());
- assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
- }
-
- // TODO: Testing the Retry layer without making a big dispatch stack and job context
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/.gitignore
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/.gitignore b/taverna-workflowmodel-extensions/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/taverna-workflowmodel-extensions/.gitignore
@@ -0,0 +1 @@
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/pom.xml b/taverna-workflowmodel-extensions/pom.xml
new file mode 100644
index 0000000..4f1be7a
--- /dev/null
+++ b/taverna-workflowmodel-extensions/pom.xml
@@ -0,0 +1,30 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.taverna.engine</groupId>
+ <artifactId>taverna-engine</artifactId>
+ <version>3.1.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>taverna-workflowmodel-extensions</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Taverna Workflow Model Extension Points</name>
+ <description>Implementation of core extension points to the workflow model</description>
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>taverna-workflowmodel-api</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
new file mode 100644
index 0000000..0b11627
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Copyright (C) 2011 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Factory for creating core dispatch layers.
+ *
+ * The core dispatch layers are :
+ * <ul>
+ * <li>ErrorBounce</li>
+ * <li>Parallelize</li>
+ * <li>Failover</li>
+ * <li>Retry</li>
+ * <li>Stop</li>
+ * <li>Invoke</li>
+ * <li>Loop</li>
+ * <li>IntermediateProvenance</li>
+ * </ul>
+ *
+ * @author David Withers
+ */
+public class CoreDispatchLayerFactory implements DispatchLayerFactory {
+ private static final URI parallelizeLayer = URI.create(Parallelize.URI);
+ private static final URI errorBounceLayer = URI.create(ErrorBounce.URI);
+ private static final URI failoverLayer = URI.create(Failover.URI);
+ private static final URI retryLayer = URI.create(Retry.URI);
+ private static final URI invokeLayer = URI.create(Invoke.URI);
+ private static final URI loopLayer = URI.create(Loop.URI);
+ private static final URI intermediateProvenanceLayer = URI.create(IntermediateProvenance.URI);
+ private static final URI stopLayer = URI.create(Stop.URI);
+
+ private final static Set<URI> dispatchLayerURIs = new HashSet<URI>();
+
+ static {
+ dispatchLayerURIs.add(parallelizeLayer);
+ dispatchLayerURIs.add(errorBounceLayer);
+ dispatchLayerURIs.add(failoverLayer);
+ dispatchLayerURIs.add(retryLayer);
+ dispatchLayerURIs.add(invokeLayer);
+ dispatchLayerURIs.add(loopLayer);
+ dispatchLayerURIs.add(intermediateProvenanceLayer);
+ dispatchLayerURIs.add(stopLayer);
+ }
+
+ @Override
+ public DispatchLayer<?> createDispatchLayer(URI uri) {
+ if (parallelizeLayer.equals(uri))
+ return new Parallelize();
+ else if (errorBounceLayer.equals(uri))
+ return new ErrorBounce();
+ else if (failoverLayer.equals(uri))
+ return new Failover();
+ else if (retryLayer.equals(uri))
+ return new Retry();
+ else if (invokeLayer.equals(uri))
+ return new Invoke();
+ else if (loopLayer.equals(uri))
+ return new Loop();
+ else if (intermediateProvenanceLayer.equals(uri))
+ return new IntermediateProvenance();
+ else if (stopLayer.equals(uri))
+ return new Stop();
+ return null;
+ }
+
+ @Override
+ public JsonNode getDispatchLayerConfigurationSchema(URI uri) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public Set<URI> getDispatchLayerTypes() {
+ return dispatchLayerURIs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
new file mode 100644
index 0000000..dfde240
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
@@ -0,0 +1,324 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_PROCESS_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import net.sf.taverna.t2.invocation.Event;
+import net.sf.taverna.t2.monitor.MonitorableProperty;
+import net.sf.taverna.t2.monitor.NoSuchPropertyException;
+import net.sf.taverna.t2.reference.ErrorDocument;
+import net.sf.taverna.t2.reference.ReferenceService;
+import net.sf.taverna.t2.reference.T2Reference;
+import net.sf.taverna.t2.workflowmodel.OutputPort;
+import net.sf.taverna.t2.workflowmodel.Processor;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
+
+/**
+ * Receives job events, checks to see whether any parameters in the job are
+ * error tokens or collections which contain errors. If so then sends a
+ * corresponding result message back where all outputs are error tokens having
+ * registered such with the invocation context's data manager. It also re-writes
+ * any failure messages as result messages containing error tokens at the
+ * appropriate depth - this means that it must be placed above any error
+ * handling layers in order for those to have an effect at all. In general this
+ * layer should be placed immediately below the parallelize layer in most
+ * default cases (this will guarantee the processor never sees a failure message
+ * though, which may or may not be desirable)
+ *
+ * @author Tom Oinn
+ *
+ */
+@DispatchLayerErrorReaction(emits = { RESULT }, relaysUnmodified = false, stateEffects = {
+ CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE })
+@DispatchLayerJobReaction(emits = { RESULT }, relaysUnmodified = true, stateEffects = {
+ CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE, NO_EFFECT })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
+@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
+@SupportsStreamedResult
+public class ErrorBounce extends AbstractDispatchLayer<JsonNode> implements
+ PropertyContributingDispatchLayer<JsonNode> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce";
+
+ /**
+ * Track the number of reflected and translated errors handled by this error
+ * bounce instance
+ */
+ private Map<String, ErrorBounceState> state = new ConcurrentHashMap<>();
+
+ private int totalTranslatedErrors = 0;
+ private int totalReflectedErrors = 0;
+
+ private synchronized ErrorBounceState getState(String owningProcess) {
+ if (state.containsKey(owningProcess))
+ return state.get(owningProcess);
+ ErrorBounceState ebs = new ErrorBounceState();
+ state.put(owningProcess, ebs);
+ return ebs;
+ }
+
+ /**
+ * If the job contains errors, or collections which contain errors
+ * themselves then bounce a result message with error documents in back up
+ * to the layer above
+ */
+ @Override
+ public void receiveJob(DispatchJobEvent jobEvent) {
+ Set<T2Reference> errorReferences = new HashSet<>();
+ for (T2Reference ei : jobEvent.getData().values())
+ if (ei.containsErrors())
+ errorReferences.add(ei);
+ if (errorReferences.isEmpty())
+ // relay the message down...
+ getBelow().receiveJob(jobEvent);
+ else {
+ getState(jobEvent.getOwningProcess()).incrementErrorsReflected();
+ sendErrorOutput(jobEvent, null, errorReferences);
+ }
+ }
+
+ /**
+ * Always send the error document job result on receiving a failure, at
+ * least for now! This should be configurable, in effect this is the part
+ * that ensures the processor never sees a top level failure.
+ */
+ @Override
+ public void receiveError(DispatchErrorEvent errorEvent) {
+ getState(errorEvent.getOwningProcess()).incrementErrorsTranslated();
+ sendErrorOutput(errorEvent, errorEvent.getCause(), null);
+ }
+
+ /**
+ * Construct and send a new result message with error documents in place of
+ * all outputs at the appropriate depth
+ *
+ * @param event
+ * @param cause
+ * @param errorReferences
+ */
+ private void sendErrorOutput(Event<?> event, Throwable cause,
+ Set<T2Reference> errorReferences) {
+ ReferenceService rs = event.getContext().getReferenceService();
+
+ Processor p = dispatchStack.getProcessor();
+ Map<String, T2Reference> outputDataMap = new HashMap<>();
+ String[] owningProcessArray = event.getOwningProcess().split(":");
+ String processor = owningProcessArray[owningProcessArray.length - 1];
+ for (OutputPort op : p.getOutputPorts()) {
+ String message = "Processor '" + processor + "' - Port '"
+ + op.getName() + "'";
+ if (event instanceof DispatchErrorEvent)
+ message += ": " + ((DispatchErrorEvent) event).getMessage();
+ ErrorDocument ed;
+ if (cause != null)
+ ed = rs.getErrorDocumentService().registerError(message, cause,
+ op.getDepth(), event.getContext());
+ else
+ ed = rs.getErrorDocumentService().registerError(message,
+ errorReferences, op.getDepth(), event.getContext());
+ outputDataMap.put(op.getName(), ed.getId());
+ }
+ DispatchResultEvent dre = new DispatchResultEvent(
+ event.getOwningProcess(), event.getIndex(), event.getContext(),
+ outputDataMap, false);
+ getAbove().receiveResult(dre);
+ }
+
+ @Override
+ public void configure(JsonNode config) {
+ // Do nothing - no configuration required
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ // Layer has no configuration associated
+ return null;
+ }
+
+ @Override
+ public void finishedWith(final String owningProcess) {
+ /*
+ * Delay the removal of the state to give the monitor a chance to poll
+ */
+ cleanupTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ state.remove(owningProcess);
+ }
+ }, CLEANUP_DELAY_MS);
+ }
+
+ /**
+ * Two properties, dispatch.errorbounce.reflected(integer) is the number of
+ * incoming jobs which have been bounced back as results with errors,
+ * dispatch.errorbounce.translated(integer) is the number of failures from
+ * downstream in the stack that have been re-written as complete results
+ * containing error documents.
+ */
+ @Override
+ public void injectPropertiesFor(final String owningProcess) {
+ MonitorableProperty<Integer> errorsReflectedProperty = new MonitorableProperty<Integer>() {
+ @Override
+ public Date getLastModified() {
+ return new Date();
+ }
+
+ @Override
+ public String[] getName() {
+ return new String[] { "dispatch", "errorbounce", "reflected" };
+ }
+
+ @Override
+ public Integer getValue() throws NoSuchPropertyException {
+ ErrorBounceState ebs = state.get(owningProcess);
+ if (ebs == null)
+ return 0;
+ return ebs.getErrorsReflected();
+ }
+ };
+ dispatchStack.receiveMonitorableProperty(errorsReflectedProperty,
+ owningProcess);
+
+ MonitorableProperty<Integer> errorsTranslatedProperty = new MonitorableProperty<Integer>() {
+ @Override
+ public Date getLastModified() {
+ return new Date();
+ }
+
+ @Override
+ public String[] getName() {
+ return new String[] { "dispatch", "errorbounce", "translated" };
+ }
+
+ @Override
+ public Integer getValue() throws NoSuchPropertyException {
+ ErrorBounceState ebs = state.get(owningProcess);
+ if (ebs == null)
+ return 0;
+ return ebs.getErrorsTranslated();
+ }
+ };
+ dispatchStack.receiveMonitorableProperty(errorsTranslatedProperty,
+ owningProcess);
+
+ MonitorableProperty<Integer> totalTranslatedTranslatedProperty = new MonitorableProperty<Integer>() {
+ @Override
+ public Date getLastModified() {
+ return new Date();
+ }
+
+ @Override
+ public String[] getName() {
+ return new String[] { "dispatch", "errorbounce",
+ "totalTranslated" };
+ }
+
+ @Override
+ public Integer getValue() throws NoSuchPropertyException {
+ return totalTranslatedErrors;
+ }
+ };
+ dispatchStack.receiveMonitorableProperty(
+ totalTranslatedTranslatedProperty, owningProcess);
+
+ MonitorableProperty<Integer> totalReflectedTranslatedProperty = new MonitorableProperty<Integer>() {
+ @Override
+ public Date getLastModified() {
+ return new Date();
+ }
+
+ @Override
+ public String[] getName() {
+ return new String[] { "dispatch", "errorbounce",
+ "totalReflected" };
+ }
+
+ @Override
+ public Integer getValue() throws NoSuchPropertyException {
+ return totalReflectedErrors;
+ }
+ };
+ dispatchStack.receiveMonitorableProperty(
+ totalReflectedTranslatedProperty, owningProcess);
+ }
+
+ public class ErrorBounceState {
+ private int errorsReflected = 0;
+ private int errorsTranslated = 0;
+
+ /**
+ * Number of times the bounce layer has converted an incoming job event
+ * where the input data contained error tokens into a result event
+ * containing all errors.
+ */
+ int getErrorsReflected() {
+ return this.errorsReflected;
+ }
+
+ /**
+ * Number of times the bounce layer has converted an incoming failure
+ * event into a result containing error tokens
+ */
+ int getErrorsTranslated() {
+ return errorsTranslated;
+ }
+
+ void incrementErrorsReflected() {
+ synchronized (this) {
+ errorsReflected++;
+ }
+ synchronized (ErrorBounce.this) {
+ totalReflectedErrors++;
+ }
+ }
+
+ void incrementErrorsTranslated() {
+ synchronized (this) {
+ errorsTranslated++;
+ }
+ synchronized (ErrorBounce.this) {
+ totalTranslatedErrors++;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
new file mode 100644
index 0000000..1c5ef03
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
+import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Failure handling dispatch layer, consumes job events with multiple activities
+ * and emits the same job but with only the first activity. On failures the job
+ * is resent to the layer below with a new activity list containing the second
+ * in the original list and so on. If a failure is received and there are no
+ * further activities to use the job fails and the failure is sent back up to
+ * the layer above.
+ *
+ * @author Tom Oinn
+ * @author Stian Soiland-Reyes
+ */
+@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
+ UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
+@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
+@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
+public class Failover extends AbstractErrorHandlerLayer<JsonNode> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover";
+
+ @Override
+ protected JobState getStateObject(DispatchJobEvent jobEvent) {
+ return new FailoverState(jobEvent);
+ }
+
+ /**
+ * Receive a job from the layer above, store it in the state map then relay
+ * it to the layer below with a modified activity list containing only the
+ * activity at index 0
+ */
+ @Override
+ public void receiveJob(DispatchJobEvent jobEvent) {
+ addJobToStateList(jobEvent);
+ List<Activity<?>> newActivityList = new ArrayList<>();
+ newActivityList.add(jobEvent.getActivities().get(0));
+ getBelow().receiveJob(
+ new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
+ .getIndex(), jobEvent.getContext(), jobEvent.getData(),
+ newActivityList));
+ }
+
+ class FailoverState extends JobState {
+ int currentActivityIndex = 0;
+
+ public FailoverState(DispatchJobEvent jobEvent) {
+ super(jobEvent);
+ }
+
+ @Override
+ public boolean handleError() {
+ currentActivityIndex++;
+ if (currentActivityIndex == jobEvent.getActivities().size())
+ return false;
+ List<Activity<?>> newActivityList = new ArrayList<>();
+ newActivityList.add(jobEvent.getActivities().get(
+ currentActivityIndex));
+ getBelow().receiveJob(
+ new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
+ .getIndex(), jobEvent.getContext(), jobEvent
+ .getData(), newActivityList));
+ return true;
+ }
+ }
+
+ @Override
+ public void configure(JsonNode config) {
+ // Do nothing - there is no configuration to do
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ return null;
+ }
+}