You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/27 16:26:49 UTC
[2/6] incubator-taverna-engine git commit:
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
new file mode 100644
index 0000000..3169f8c
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
@@ -0,0 +1,163 @@
+/**
+ *
+ */
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.sf.taverna.t2.reference.WorkflowRunIdEntity;
+import net.sf.taverna.t2.workflowmodel.ConfigurationException;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * This layer allows for the cancellation, pausing and resuming of workflow
+ * runs. It does so by intercepting jobs sent to the layer.
+ *
+ * @author alanrw
+ */
+public class Stop extends AbstractDispatchLayer<JsonNode> {
+ public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop";
+ /**
+ * The set of ids of workflow runs that have been cancelled.
+ */
+ private static Set<String> cancelledWorkflowRuns = new HashSet<>();
+ /**
+ * A map from workflow run ids to the set of Stop layers where jobs have
+ * been intercepted for that run.
+ */
+ private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<>();
+ /**
+ * A map for a given Stop from ids of suspended workflow runs to the jobs
+ * that have been intercepted.
+ */
+ private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<>();
+
+ @Override
+ public void configure(JsonNode conf) throws ConfigurationException {
+ // nothing
+ }
+
+ @Override
+ public JsonNode getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void receiveJob(final DispatchJobEvent jobEvent) {
+ List<WorkflowRunIdEntity> entities = jobEvent.getContext().getEntities(
+ WorkflowRunIdEntity.class);
+ if (entities != null && !entities.isEmpty()) {
+ final String wfRunId = entities.get(0).getWorkflowRunId();
+ // If the workflow run is cancelled then simply "eat" the jobEvent.
+ // This does a hard-cancel.
+ if (cancelledWorkflowRuns.contains(wfRunId))
+ return;
+ // If the workflow run is paused
+ if (pausedLayerMap.containsKey(wfRunId))
+ synchronized (Stop.class) {
+ // double check as pausedLayerMap may have been changed
+ // waiting for the lock
+ if (pausedLayerMap.containsKey(wfRunId)) {
+ // Remember that this Stop layer was affected by the
+ // workflow pause
+ pausedLayerMap.get(wfRunId).add(this);
+ if (!suspendedJobEventMap.containsKey(wfRunId))
+ suspendedJobEventMap.put(wfRunId,
+ new HashSet<DispatchJobEvent>());
+ // Remember the suspended jobEvent
+ suspendedJobEventMap.get(wfRunId).add(jobEvent);
+ return;
+ }
+ }
+ }
+ // By default pass the jobEvent down to the next layer
+ super.receiveJob(jobEvent);
+ }
+
+ @Override
+ public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+ super.receiveJobQueue(jobQueueEvent);
+ }
+
+ /**
+ * Cancel the workflow run with the specified id
+ *
+ * @param workflowRunId
+ * The id of the workflow run to cancel
+ * @return If the workflow run was cancelled then true. If it was already
+ * cancelled then false.
+ */
+ public static synchronized boolean cancelWorkflow(String workflowRunId) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
+ return false;
+ Set<String> cancelledWorkflowRunsCopy = new HashSet<>(
+ cancelledWorkflowRuns);
+ cancelledWorkflowRunsCopy.add(workflowRunId);
+ cancelledWorkflowRuns = cancelledWorkflowRunsCopy;
+ return true;
+ }
+
+ /**
+ * Pause the workflow run with the specified id
+ *
+ * @param workflowRunId
+ * The id of the workflow run to pause
+ * @return If the workflow run was paused then true. If it was already
+ * paused or cancelled then false.
+ */
+ public static synchronized boolean pauseWorkflow(String workflowRunId) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
+ return false;
+ if (pausedLayerMap.containsKey(workflowRunId))
+ return false;
+ Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>(pausedLayerMap);
+ pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
+ pausedLayerMap = pausedLayerMapCopy;
+ return true;
+ }
+
+ /**
+ * Resume the workflow run with the specified id
+ *
+ * @param workflowRunId
+ * The id of the workflow run to resume
+ * @return If the workflow run was resumed then true. If the workflow run
+ * was not paused or it was cancelled, then false.
+ */
+ public static synchronized boolean resumeWorkflow(String workflowRunId) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
+ return false;
+ if (!pausedLayerMap.containsKey(workflowRunId))
+ return false;
+ Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>();
+ pausedLayerMapCopy.putAll(pausedLayerMap);
+ Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
+ pausedLayerMap = pausedLayerMapCopy;
+ for (Stop s : stops)
+ s.resumeLayerWorkflow(workflowRunId);
+ return true;
+ }
+
+ /**
+ * Resume the workflow run with the specified id on this Stop layer. This
+ * method processes any suspended job events.
+ *
+ * @param workflowRunId
+ * The id of the workflow run to resume.
+ */
+ private void resumeLayerWorkflow(String workflowRunId) {
+ synchronized (Stop.class) {
+ for (DispatchJobEvent dje : suspendedJobEventMap
+ .remove(workflowRunId))
+ receiveJob(dje);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
new file mode 100644
index 0000000..fe6e73f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
@@ -0,0 +1,4 @@
+<body>
+Contains implementations of DispatchLayer defined by the core Taverna 2
+specification.
+</body>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
new file mode 100644
index 0000000..dc55fc7
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans:beans xmlns="http://www.springframework.org/schema/osgi" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:beans="http://www.springframework.org/schema/beans"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/osgi
+ http://www.springframework.org/schema/osgi/spring-osgi.xsd" >
+
+ <service ref="coreDispatchLayerFactory" interface="net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory" />
+
+</beans:beans>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
new file mode 100644
index 0000000..90ed75f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <bean id="coreDispatchLayerFactory" class="net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.CoreDispatchLayerFactory" />
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
new file mode 100644
index 0000000..e202df1
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
@@ -0,0 +1,110 @@
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import static org.junit.Assert.*;
+
+public class TestRetry {
+
+ @Test
+ public void defaultConfig() throws Exception {
+ Retry retry = new Retry();
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(0, configuration.get("maxRetries").intValue());
+ assertEquals(1000, configuration.get("initialDelay").intValue());
+ assertEquals(5000, configuration.get("maxDelay").intValue());
+ assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ @Test
+ public void customConfig() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(15, configuration.get("maxRetries").intValue());
+ assertEquals(150, configuration.get("initialDelay").intValue());
+ assertEquals(1200, configuration.get("maxDelay").intValue());
+ assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ @Test
+ public void configureEmpty() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ JsonNode empty = JsonNodeFactory.instance.objectNode();
+ retry.configure(empty);
+ // We would expect missing values to be replaced with the
+ // DEFAULT values rather than the previous values
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(0, configuration.get("maxRetries").intValue());
+ assertEquals(1000, configuration.get("initialDelay").intValue());
+ assertEquals(5000, configuration.get("maxDelay").intValue());
+ assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ @Test
+ public void configurePartly() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxRetries", 15);
+ conf.put("backoffFactor", 1.2);
+ retry.configure(conf);
+ // We would expect to see the new values
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(15, configuration.get("maxRetries").intValue());
+ assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+ // And the default values (not the previous values!)
+ assertEquals(1000, configuration.get("initialDelay").intValue());
+ assertEquals(5000, configuration.get("maxDelay").intValue());
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void invalidMaxRetries() throws Exception {
+ Retry retry = new Retry();
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxRetries", -15);
+ retry.configure(conf);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void invalidInitialDelay() throws Exception {
+ Retry retry = new Retry();
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("initialDelay", -15);
+ retry.configure(conf);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void invalidMaxDelay() throws Exception {
+ Retry retry = new Retry();
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxDelay", 150);
+ // Valid on its own, but less than the default initialDelay of 1000!
+ retry.configure(conf);
+ }
+
+
+ @Test
+ public void invalidConfigureRecovers() throws Exception {
+ Retry retry = new Retry(15, 150, 1200, 1.2);
+ ObjectNode conf = JsonNodeFactory.instance.objectNode();
+ conf.put("maxRetries", -15);
+ try {
+ retry.configure(conf);
+ } catch (IllegalArgumentException ex) {
+ // As expected
+ }
+ // We would expect the earlier values to persist
+ JsonNode configuration = retry.getConfiguration();
+ assertEquals(15, configuration.get("maxRetries").intValue());
+ assertEquals(150, configuration.get("initialDelay").intValue());
+ assertEquals(1200, configuration.get("maxDelay").intValue());
+ assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+ }
+
+ // TODO: Testing the Retry layer without making a big dispatch stack and job context
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/pom.xml b/taverna-workflowmodel-impl/pom.xml
index 93dfea1..97367e5 100644
--- a/taverna-workflowmodel-impl/pom.xml
+++ b/taverna-workflowmodel-impl/pom.xml
@@ -50,7 +50,7 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>taverna-workflowmodel-core-extensions</artifactId>
+ <artifactId>taverna-workflowmodel-extensions</artifactId>
<version>${project.parent.version}</version>
<!--<scope>test</scope> -->
</dependency>