You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/27 16:26:49 UTC

[2/6] incubator-taverna-engine git commit: taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
new file mode 100644
index 0000000..3169f8c
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
@@ -0,0 +1,163 @@
+/**
+ *
+ */
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.sf.taverna.t2.reference.WorkflowRunIdEntity;
+import net.sf.taverna.t2.workflowmodel.ConfigurationException;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
+import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * This layer allows for the cancellation, pausing and resuming of workflow
+ * runs. It does so by intercepting jobs sent to the layer.
+ *
+ * @author alanrw
+ */
+public class Stop extends AbstractDispatchLayer<JsonNode> {
+	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop";
+	/**
+	 * The set of ids of workflow runs that have been cancelled.
+	 */
+	private static Set<String> cancelledWorkflowRuns = new HashSet<>();
+	/**
+	 * A map from workflow run ids to the set of Stop layers where jobs have
+	 * been intercepted for that run.
+	 */
+	private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<>();
+	/**
+	 * A map for a given Stop from ids of suspended workflow runs to the jobs
+	 * that have been intercepted.
+	 */
+	private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<>();
+
+	@Override
+	public void configure(JsonNode conf) throws ConfigurationException {
+		// nothing
+	}
+
+	@Override
+	public JsonNode getConfiguration() {
+		return null;
+	}
+
+	@Override
+	public void receiveJob(final DispatchJobEvent jobEvent) {
+		List<WorkflowRunIdEntity> entities = jobEvent.getContext().getEntities(
+				WorkflowRunIdEntity.class);
+		if (entities != null && !entities.isEmpty()) {
+			final String wfRunId = entities.get(0).getWorkflowRunId();
+			// If the workflow run is cancelled then simply "eat" the jobEvent.
+			// This does a hard-cancel.
+			if (cancelledWorkflowRuns.contains(wfRunId))
+				return;
+			// If the workflow run is paused
+			if (pausedLayerMap.containsKey(wfRunId))
+				synchronized (Stop.class) {
+					// double check as pausedLayerMap may have been changed
+					// waiting for the lock
+					if (pausedLayerMap.containsKey(wfRunId)) {
+						// Remember that this Stop layer was affected by the
+						// workflow pause
+						pausedLayerMap.get(wfRunId).add(this);
+						if (!suspendedJobEventMap.containsKey(wfRunId))
+							suspendedJobEventMap.put(wfRunId,
+									new HashSet<DispatchJobEvent>());
+						// Remember the suspended jobEvent
+						suspendedJobEventMap.get(wfRunId).add(jobEvent);
+						return;
+					}
+				}
+		}
+		// By default pass the jobEvent down to the next layer
+		super.receiveJob(jobEvent);
+	}
+
+	@Override
+	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
+		super.receiveJobQueue(jobQueueEvent);
+	}
+
+	/**
+	 * Cancel the workflow run with the specified id
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to cancel
+	 * @return If the workflow run was cancelled then true. If it was already
+	 *         cancelled then false.
+	 */
+	public static synchronized boolean cancelWorkflow(String workflowRunId) {
+		if (cancelledWorkflowRuns.contains(workflowRunId))
+			return false;
+		Set<String> cancelledWorkflowRunsCopy = new HashSet<>(
+				cancelledWorkflowRuns);
+		cancelledWorkflowRunsCopy.add(workflowRunId);
+		cancelledWorkflowRuns = cancelledWorkflowRunsCopy;
+		return true;
+	}
+
+	/**
+	 * Pause the workflow run with the specified id
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to pause
+	 * @return If the workflow run was paused then true. If it was already
+	 *         paused or cancelled then false.
+	 */
+	public static synchronized boolean pauseWorkflow(String workflowRunId) {
+		if (cancelledWorkflowRuns.contains(workflowRunId))
+			return false;
+		if (pausedLayerMap.containsKey(workflowRunId))
+			return false;
+		Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>(pausedLayerMap);
+		pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
+		pausedLayerMap = pausedLayerMapCopy;
+		return true;
+	}
+
+	/**
+	 * Resume the workflow run with the specified id
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to resume
+	 * @return If the workflow run was resumed then true. If the workflow run
+	 *         was not paused or it was cancelled, then false.
+	 */
+	public static synchronized boolean resumeWorkflow(String workflowRunId) {
+		if (cancelledWorkflowRuns.contains(workflowRunId))
+			return false;
+		if (!pausedLayerMap.containsKey(workflowRunId))
+			return false;
+		Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>();
+		pausedLayerMapCopy.putAll(pausedLayerMap);
+		Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
+		pausedLayerMap = pausedLayerMapCopy;
+		for (Stop s : stops)
+			s.resumeLayerWorkflow(workflowRunId);
+		return true;
+	}
+
+	/**
+	 * Resume the workflow run with the specified id on this Stop layer. This
+	 * method processes any suspended job events.
+	 *
+	 * @param workflowRunId
+	 *            The id of the workflow run to resume.
+	 */
+	private void resumeLayerWorkflow(String workflowRunId) {
+		synchronized (Stop.class) {
+			for (DispatchJobEvent dje : suspendedJobEventMap
+					.remove(workflowRunId))
+				receiveJob(dje);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
new file mode 100644
index 0000000..fe6e73f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/package.html
@@ -0,0 +1,4 @@
+<body>
+Contains implementations of DispatchLayer defined by the core Taverna 2
+specification.
+</body>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
new file mode 100644
index 0000000..dc55fc7
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context-osgi.xml
@@ -0,0 +1,11 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans:beans xmlns="http://www.springframework.org/schema/osgi" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xmlns:beans="http://www.springframework.org/schema/beans"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans 
+                                 http://www.springframework.org/schema/beans/spring-beans.xsd
+                                 http://www.springframework.org/schema/osgi 
+                                 http://www.springframework.org/schema/osgi/spring-osgi.xsd" >
+
+	<service ref="coreDispatchLayerFactory" interface="net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory" />
+
+</beans:beans>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
new file mode 100644
index 0000000..90ed75f
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/main/resources/META-INF/spring/workflowmodel-core-extensions-context.xml
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans 
+                           http://www.springframework.org/schema/beans/spring-beans.xsd">
+                           
+	<bean id="coreDispatchLayerFactory" class="net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.CoreDispatchLayerFactory" />
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
new file mode 100644
index 0000000..e202df1
--- /dev/null
+++ b/taverna-workflowmodel-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
@@ -0,0 +1,110 @@
+package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import static org.junit.Assert.*;
+
+public class TestRetry {
+    
+    @Test
+    public void defaultConfig() throws Exception {
+        Retry retry = new Retry();
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(0, configuration.get("maxRetries").intValue());
+        assertEquals(1000, configuration.get("initialDelay").intValue());
+        assertEquals(5000, configuration.get("maxDelay").intValue());
+        assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+
+    @Test
+    public void customConfig() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(15, configuration.get("maxRetries").intValue());
+        assertEquals(150, configuration.get("initialDelay").intValue());
+        assertEquals(1200, configuration.get("maxDelay").intValue());
+        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+    
+    @Test
+    public void configureEmpty() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        JsonNode empty = JsonNodeFactory.instance.objectNode();
+        retry.configure(empty);
+        // We would expect missing values to be replaced with the
+        // DEFAULT values rather than the previous values
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(0, configuration.get("maxRetries").intValue());
+        assertEquals(1000, configuration.get("initialDelay").intValue());
+        assertEquals(5000, configuration.get("maxDelay").intValue());
+        assertEquals(1.0, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+
+    @Test
+    public void configurePartly() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxRetries", 15);
+        conf.put("backoffFactor", 1.2);
+        retry.configure(conf);
+        // We would expect to see the new values
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(15, configuration.get("maxRetries").intValue());
+        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+        // And the default values (not the previous values!)
+        assertEquals(1000, configuration.get("initialDelay").intValue());
+        assertEquals(5000, configuration.get("maxDelay").intValue());
+    }    
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void invalidMaxRetries() throws Exception {
+        Retry retry = new Retry();
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxRetries", -15);
+        retry.configure(conf);
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void invalidInitialDelay() throws Exception {
+        Retry retry = new Retry();
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("initialDelay", -15);
+        retry.configure(conf);
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void invalidMaxDelay() throws Exception {
+        Retry retry = new Retry();
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxDelay", 150);
+        // Valid on its own, but less than the default initialDelay of 1000!
+        retry.configure(conf);
+    }
+
+    
+    @Test
+    public void invalidConfigureRecovers() throws Exception {
+        Retry retry = new Retry(15, 150, 1200, 1.2);
+        ObjectNode conf = JsonNodeFactory.instance.objectNode();
+        conf.put("maxRetries", -15);
+        try { 
+            retry.configure(conf);
+        } catch (IllegalArgumentException ex) {
+            // As expected
+        }
+        // We would expect the earlier values to persist
+        JsonNode configuration = retry.getConfiguration();
+        assertEquals(15, configuration.get("maxRetries").intValue());
+        assertEquals(150, configuration.get("initialDelay").intValue());
+        assertEquals(1200, configuration.get("maxDelay").intValue());
+        assertEquals(1.2, configuration.get("backoffFactor").doubleValue(), 0.001);
+    }
+    
+    // TODO: Testing the Retry layer without making a big dispatch stack and job context
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-impl/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/pom.xml b/taverna-workflowmodel-impl/pom.xml
index 93dfea1..97367e5 100644
--- a/taverna-workflowmodel-impl/pom.xml
+++ b/taverna-workflowmodel-impl/pom.xml
@@ -50,7 +50,7 @@
 		</dependency>
 		<dependency>
 			<groupId>${project.parent.groupId}</groupId>
-			<artifactId>taverna-workflowmodel-core-extensions</artifactId>
+			<artifactId>taverna-workflowmodel-extensions</artifactId>
 			<version>${project.parent.version}</version>
 			<!--<scope>test</scope> -->
 		</dependency>