You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/23 17:37:53 UTC

[09/51] [partial] incubator-taverna-engine git commit:

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-api/src/test/java/org/apache/taverna/workflowmodel/processor/iteration/TestIterationStrategyNodes.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-api/src/test/java/org/apache/taverna/workflowmodel/processor/iteration/TestIterationStrategyNodes.java b/taverna-workflowmodel-api/src/test/java/org/apache/taverna/workflowmodel/processor/iteration/TestIterationStrategyNodes.java
new file mode 100644
index 0000000..b6a7b19
--- /dev/null
+++ b/taverna-workflowmodel-api/src/test/java/org/apache/taverna/workflowmodel/processor/iteration/TestIterationStrategyNodes.java
@@ -0,0 +1,211 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.workflowmodel.processor.iteration;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Map;
+
+import javax.swing.tree.TreeNode;
+
+import org.apache.taverna.invocation.Completion;
+import org.apache.taverna.workflowmodel.processor.activity.Job;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test {@link AbstractIterationStrategyNode} implementations for
+ * {@link TreeNode} behaviour.
+ * 
+ * @author Stian Soiland-Reyes
+ * 
+ */
+public class TestIterationStrategyNodes {
+
+	TerminalNode root;
+	private NamedInputPortNode input1;
+	private NamedInputPortNode input2;
+	private CrossProduct crossProduct1;
+	private CrossProduct crossProduct2;
+	private DotProduct dotProduct1;
+	private DotProduct dotProduct2;
+
+	@Test
+	public void addSingleChildToTerminal() throws Exception {
+		assertNull(input1.getParent());
+		assertEquals(0, root.getChildCount());
+		root.insert(input1);
+		assertEquals(root, input1.getParent());
+		assertEquals(1, root.getChildCount());
+		assertEquals(input1, root.getChildAt(0));
+		assertEquals(Arrays.asList(input1), root.getChildren());
+
+		root.insert(input1);
+		assertEquals(1, root.getChildCount());
+
+		root.insert(input1, 0);
+		assertEquals(1, root.getChildCount());
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void cantAddSeveralChildrenToTerminal() throws Exception {
+		root.insert(input1);
+		root.insert(input2);
+	}
+
+	@Test
+	public void addCrossProduct() throws Exception {
+		assertNull(crossProduct1.getParent());
+		crossProduct1.setParent(root);
+		assertEquals(root, crossProduct1.getParent());
+		assertEquals(1, root.getChildCount());
+		assertEquals(crossProduct1, root.getChildAt(0));
+		assertEquals(Arrays.asList(crossProduct1), root.getChildren());
+		assertEquals(0, crossProduct1.getChildCount());
+
+		crossProduct1.insert(input1);
+		assertEquals(input1, crossProduct1.getChildAt(0));
+		crossProduct1.insert(input2, 0);
+		assertEquals(input2, crossProduct1.getChildAt(0));
+		assertEquals(input1, crossProduct1.getChildAt(1));
+		assertEquals(2, crossProduct1.getChildCount());
+		assertEquals(Arrays.asList(input2, input1), crossProduct1.getChildren());
+
+		// A re-insert should move it
+		crossProduct1.insert(input2, 2);
+		assertEquals(2, crossProduct1.getChildCount());
+		assertEquals(Arrays.asList(input1, input2), crossProduct1.getChildren());
+
+		crossProduct1.insert(input2, 0);
+		assertEquals(Arrays.asList(input2, input1), crossProduct1.getChildren());
+
+		crossProduct1.insert(input1, 1);
+		assertEquals(Arrays.asList(input2, input1), crossProduct1.getChildren());
+	}
+
+	@Test
+	public void addCrossProductMany() {
+		crossProduct1.insert(dotProduct1);
+		crossProduct1.insert(dotProduct2);
+		crossProduct1.insert(input1);
+		crossProduct1.insert(input2);
+		crossProduct1.insert(crossProduct2);
+		assertEquals(5, crossProduct1.getChildCount());
+		assertEquals(Arrays.asList(dotProduct1, dotProduct2, input1, input2,
+				crossProduct2), crossProduct1.getChildren());
+		Enumeration<IterationStrategyNode> enumeration = crossProduct1
+				.children();
+		assertTrue(enumeration.hasMoreElements());
+		assertEquals(dotProduct1, enumeration.nextElement());
+		assertEquals(dotProduct2, enumeration.nextElement());
+		assertEquals(input1, enumeration.nextElement());
+		assertEquals(input2, enumeration.nextElement());
+		assertEquals(crossProduct2, enumeration.nextElement());
+		assertFalse(enumeration.hasMoreElements());
+	}
+
+	@Test
+	public void moveNodeToDifferentParent() {
+		crossProduct1.setParent(root);
+		crossProduct1.insert(input1);
+		crossProduct1.insert(dotProduct1);
+		dotProduct1.insert(input2);
+		dotProduct1.insert(crossProduct2);
+
+		// Check tree
+		assertEquals(crossProduct2, root.getChildAt(0).getChildAt(1)
+				.getChildAt(1));
+		assertEquals(Arrays.asList(input2, crossProduct2), dotProduct1
+				.getChildren());
+
+		crossProduct1.insert(crossProduct2, 1);
+		assertEquals(Arrays.asList(input1, crossProduct2, dotProduct1),
+				crossProduct1.getChildren());
+		assertEquals(crossProduct1, crossProduct2.getParent());
+		// Should no longer be in dotProduct1
+		assertEquals(Arrays.asList(input2), dotProduct1.getChildren());
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void cantAddToNamedInput() throws Exception {
+		input1.insert(dotProduct1);
+	}
+
+	@Test
+	public void cantAddSelf() throws Exception {
+		dotProduct1.setParent(crossProduct1);
+		try {
+			dotProduct1.insert(dotProduct1);
+			fail("Didn't throw IllegalArgumentException");
+		} catch (IllegalArgumentException ex) {
+			// Make sure we didn't loose our old parent and
+			// ended up in a funny state
+			assertEquals(crossProduct1, dotProduct1.getParent());
+			assertEquals(dotProduct1, crossProduct1.getChildAt(0));
+		}
+	}
+
+	@Test
+	public void cantSetSelfParent() throws Exception {
+		crossProduct1.insert(dotProduct1);
+		try {
+			dotProduct1.setParent(dotProduct1);
+			fail("Didn't throw IllegalArgumentException");
+		} catch (IllegalArgumentException ex) {
+			// Make sure we didn't loose our old parent and
+			// ended up in a funny state
+			assertEquals(crossProduct1, dotProduct1.getParent());
+			assertEquals(dotProduct1, crossProduct1.getChildAt(0));
+		}
+	}
+
+	@Before
+	public void makeNodes() throws Exception {
+		root = new DummyTerminalNode();
+		input1 = new NamedInputPortNode("input1", 1);
+		input2 = new NamedInputPortNode("input2", 2);
+		crossProduct1 = new CrossProduct();
+		crossProduct2 = new CrossProduct();
+		dotProduct1 = new DotProduct();
+		dotProduct2 = new DotProduct();
+	}
+
+	@SuppressWarnings("serial")
+	protected final class DummyTerminalNode extends TerminalNode {
+
+		@Override
+		public int getIterationDepth(Map<String, Integer> inputDepths)
+				throws IterationTypeMismatchException {
+			return 0;
+		}
+
+		@Override
+		public void receiveCompletion(int inputIndex, Completion completion) {
+		}
+
+		@Override
+		public void receiveJob(int inputIndex, Job newJob) {
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-api/src/test/resources/META-INF/services/net.sf.taverna.t2.workflowmodel.health.HealthChecker
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-api/src/test/resources/META-INF/services/net.sf.taverna.t2.workflowmodel.health.HealthChecker b/taverna-workflowmodel-api/src/test/resources/META-INF/services/net.sf.taverna.t2.workflowmodel.health.HealthChecker
deleted file mode 100644
index 63bb2e4..0000000
--- a/taverna-workflowmodel-api/src/test/resources/META-INF/services/net.sf.taverna.t2.workflowmodel.health.HealthChecker
+++ /dev/null
@@ -1,3 +0,0 @@
-net.sf.taverna.t2.workflowmodel.health.StringHealthChecker
-net.sf.taverna.t2.workflowmodel.health.FloatHealthChecker
-net.sf.taverna.t2.workflowmodel.health.FloatHealthChecker2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-api/src/test/resources/META-INF/services/org.apache.taverna.workflowmodel.health.HealthChecker
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-api/src/test/resources/META-INF/services/org.apache.taverna.workflowmodel.health.HealthChecker b/taverna-workflowmodel-api/src/test/resources/META-INF/services/org.apache.taverna.workflowmodel.health.HealthChecker
new file mode 100644
index 0000000..1e0484d
--- /dev/null
+++ b/taverna-workflowmodel-api/src/test/resources/META-INF/services/org.apache.taverna.workflowmodel.health.HealthChecker
@@ -0,0 +1,3 @@
+org.apache.taverna.workflowmodel.health.StringHealthChecker
+org.apache.taverna.workflowmodel.health.FloatHealthChecker
+org.apache.taverna.workflowmodel.health.FloatHealthChecker2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
deleted file mode 100644
index 0b11627..0000000
--- a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
-
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Factory for creating core dispatch layers.
- *
- * The core dispatch layers are :
- * <ul>
- * <li>ErrorBounce</li>
- * <li>Parallelize</li>
- * <li>Failover</li>
- * <li>Retry</li>
- * <li>Stop</li>
- * <li>Invoke</li>
- * <li>Loop</li>
- * <li>IntermediateProvenance</li>
- * </ul>
- *
- * @author David Withers
- */
-public class CoreDispatchLayerFactory implements DispatchLayerFactory {
-	private static final URI parallelizeLayer = URI.create(Parallelize.URI);
-	private static final URI errorBounceLayer = URI.create(ErrorBounce.URI);
-	private static final URI failoverLayer = URI.create(Failover.URI);
-	private static final URI retryLayer = URI.create(Retry.URI);
-	private static final URI invokeLayer = URI.create(Invoke.URI);
-	private static final URI loopLayer = URI.create(Loop.URI);
-	private static final URI intermediateProvenanceLayer = URI.create(IntermediateProvenance.URI);
-	private static final URI stopLayer = URI.create(Stop.URI);
-
-	private final static Set<URI> dispatchLayerURIs = new HashSet<URI>();
-
-	static {
-		dispatchLayerURIs.add(parallelizeLayer);
-		dispatchLayerURIs.add(errorBounceLayer);
-		dispatchLayerURIs.add(failoverLayer);
-		dispatchLayerURIs.add(retryLayer);
-		dispatchLayerURIs.add(invokeLayer);
-		dispatchLayerURIs.add(loopLayer);
-		dispatchLayerURIs.add(intermediateProvenanceLayer);
-		dispatchLayerURIs.add(stopLayer);
-	}
-
-	@Override
-	public DispatchLayer<?> createDispatchLayer(URI uri) {
-		if (parallelizeLayer.equals(uri))
-			return new Parallelize();
-		else if (errorBounceLayer.equals(uri))
-			return new ErrorBounce();
-		else if (failoverLayer.equals(uri))
-			return new Failover();
-		else if (retryLayer.equals(uri))
-			return new Retry();
-		else if (invokeLayer.equals(uri))
-			return new Invoke();
-		else if (loopLayer.equals(uri))
-			return new Loop();
-		else if (intermediateProvenanceLayer.equals(uri))
-			return new IntermediateProvenance();
-		else if (stopLayer.equals(uri))
-			return new Stop();
-		return null;
-	}
-
-	@Override
-	public JsonNode getDispatchLayerConfigurationSchema(URI uri) {
-		// TODO
-		return null;
-	}
-
-	@Override
-	public Set<URI> getDispatchLayerTypes() {
-		return dispatchLayerURIs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
deleted file mode 100644
index dfde240..0000000
--- a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-import net.sf.taverna.t2.reference.ErrorDocument;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-/**
- * Receives job events, checks to see whether any parameters in the job are
- * error tokens or collections which contain errors. If so then sends a
- * corresponding result message back where all outputs are error tokens having
- * registered such with the invocation context's data manager. It also re-writes
- * any failure messages as result messages containing error tokens at the
- * appropriate depth - this means that it must be placed above any error
- * handling layers in order for those to have an effect at all. In general this
- * layer should be placed immediately below the parallelize layer in most
- * default cases (this will guarantee the processor never sees a failure message
- * though, which may or may not be desirable)
- * 
- * @author Tom Oinn
- * 
- */
-@DispatchLayerErrorReaction(emits = { RESULT }, relaysUnmodified = false, stateEffects = {
-		CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE })
-@DispatchLayerJobReaction(emits = { RESULT }, relaysUnmodified = true, stateEffects = {
-		CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@SupportsStreamedResult
-public class ErrorBounce extends AbstractDispatchLayer<JsonNode> implements
-		PropertyContributingDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce";
-
-	/**
-	 * Track the number of reflected and translated errors handled by this error
-	 * bounce instance
-	 */
-	private Map<String, ErrorBounceState> state = new ConcurrentHashMap<>();
-
-	private int totalTranslatedErrors = 0;
-	private int totalReflectedErrors = 0;
-
-	private synchronized ErrorBounceState getState(String owningProcess) {
-		if (state.containsKey(owningProcess))
-			return state.get(owningProcess);
-		ErrorBounceState ebs = new ErrorBounceState();
-		state.put(owningProcess, ebs);
-		return ebs;
-	}
-
-	/**
-	 * If the job contains errors, or collections which contain errors
-	 * themselves then bounce a result message with error documents in back up
-	 * to the layer above
-	 */
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-		Set<T2Reference> errorReferences = new HashSet<>();
-		for (T2Reference ei : jobEvent.getData().values())
-			if (ei.containsErrors())
-				errorReferences.add(ei);
-		if (errorReferences.isEmpty())
-			// relay the message down...
-			getBelow().receiveJob(jobEvent);
-		else {
-			getState(jobEvent.getOwningProcess()).incrementErrorsReflected();
-			sendErrorOutput(jobEvent, null, errorReferences);
-		}
-	}
-
-	/**
-	 * Always send the error document job result on receiving a failure, at
-	 * least for now! This should be configurable, in effect this is the part
-	 * that ensures the processor never sees a top level failure.
-	 */
-	@Override
-	public void receiveError(DispatchErrorEvent errorEvent) {
-		getState(errorEvent.getOwningProcess()).incrementErrorsTranslated();
-		sendErrorOutput(errorEvent, errorEvent.getCause(), null);
-	}
-
-	/**
-	 * Construct and send a new result message with error documents in place of
-	 * all outputs at the appropriate depth
-	 * 
-	 * @param event
-	 * @param cause
-	 * @param errorReferences
-	 */
-	private void sendErrorOutput(Event<?> event, Throwable cause,
-			Set<T2Reference> errorReferences) {
-		ReferenceService rs = event.getContext().getReferenceService();
-
-		Processor p = dispatchStack.getProcessor();
-		Map<String, T2Reference> outputDataMap = new HashMap<>();
-		String[] owningProcessArray = event.getOwningProcess().split(":");
-		String processor = owningProcessArray[owningProcessArray.length - 1];
-		for (OutputPort op : p.getOutputPorts()) {
-			String message = "Processor '" + processor + "' - Port '"
-					+ op.getName() + "'";
-			if (event instanceof DispatchErrorEvent)
-				message += ": " + ((DispatchErrorEvent) event).getMessage();
-			ErrorDocument ed;
-			if (cause != null)
-				ed = rs.getErrorDocumentService().registerError(message, cause,
-						op.getDepth(), event.getContext());
-			else
-				ed = rs.getErrorDocumentService().registerError(message,
-						errorReferences, op.getDepth(), event.getContext());
-			outputDataMap.put(op.getName(), ed.getId());
-		}
-		DispatchResultEvent dre = new DispatchResultEvent(
-				event.getOwningProcess(), event.getIndex(), event.getContext(),
-				outputDataMap, false);
-		getAbove().receiveResult(dre);
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-		// Do nothing - no configuration required
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		// Layer has no configuration associated
-		return null;
-	}
-
-	@Override
-	public void finishedWith(final String owningProcess) {
-		/*
-		 * Delay the removal of the state to give the monitor a chance to poll
-		 */
-		cleanupTimer.schedule(new TimerTask() {
-			@Override
-			public void run() {
-				state.remove(owningProcess);
-			}
-		}, CLEANUP_DELAY_MS);
-	}
-
-	/**
-	 * Two properties, dispatch.errorbounce.reflected(integer) is the number of
-	 * incoming jobs which have been bounced back as results with errors,
-	 * dispatch.errorbounce.translated(integer) is the number of failures from
-	 * downstream in the stack that have been re-written as complete results
-	 * containing error documents.
-	 */
-	@Override
-	public void injectPropertiesFor(final String owningProcess) {
-		MonitorableProperty<Integer> errorsReflectedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce", "reflected" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				ErrorBounceState ebs = state.get(owningProcess);
-				if (ebs == null)
-					return 0;
-				return ebs.getErrorsReflected();
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(errorsReflectedProperty,
-				owningProcess);
-
-		MonitorableProperty<Integer> errorsTranslatedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce", "translated" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				ErrorBounceState ebs = state.get(owningProcess);
-				if (ebs == null)
-					return 0;
-				return ebs.getErrorsTranslated();
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(errorsTranslatedProperty,
-				owningProcess);
-
-		MonitorableProperty<Integer> totalTranslatedTranslatedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce",
-						"totalTranslated" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				return totalTranslatedErrors;
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(
-				totalTranslatedTranslatedProperty, owningProcess);
-
-		MonitorableProperty<Integer> totalReflectedTranslatedProperty = new MonitorableProperty<Integer>() {
-			@Override
-			public Date getLastModified() {
-				return new Date();
-			}
-
-			@Override
-			public String[] getName() {
-				return new String[] { "dispatch", "errorbounce",
-						"totalReflected" };
-			}
-
-			@Override
-			public Integer getValue() throws NoSuchPropertyException {
-				return totalReflectedErrors;
-			}
-		};
-		dispatchStack.receiveMonitorableProperty(
-				totalReflectedTranslatedProperty, owningProcess);
-	}
-
-	public class ErrorBounceState {
-		private int errorsReflected = 0;
-		private int errorsTranslated = 0;
-
-		/**
-		 * Number of times the bounce layer has converted an incoming job event
-		 * where the input data contained error tokens into a result event
-		 * containing all errors.
-		 */
-		int getErrorsReflected() {
-			return this.errorsReflected;
-		}
-
-		/**
-		 * Number of times the bounce layer has converted an incoming failure
-		 * event into a result containing error tokens
-		 */
-		int getErrorsTranslated() {
-			return errorsTranslated;
-		}
-
-		void incrementErrorsReflected() {
-			synchronized (this) {
-				errorsReflected++;
-			}
-			synchronized (ErrorBounce.this) {
-				totalReflectedErrors++;
-			}
-		}
-
-		void incrementErrorsTranslated() {
-			synchronized (this) {
-				errorsTranslated++;
-			}
-			synchronized (ErrorBounce.this) {
-				totalTranslatedErrors++;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
deleted file mode 100644
index 1c5ef03..0000000
--- a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Failure handling dispatch layer, consumes job events with multiple activities
- * and emits the same job but with only the first activity. On failures the job
- * is resent to the layer below with a new activity list containing the second
- * in the original list and so on. If a failure is received and there are no
- * further activities to use the job fails and the failure is sent back up to
- * the layer above.
- * 
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- */
-@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
-		UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
-@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
-public class Failover extends AbstractErrorHandlerLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover";
-
-	@Override
-	protected JobState getStateObject(DispatchJobEvent jobEvent) {
-		return new FailoverState(jobEvent);
-	}
-
-	/**
-	 * Receive a job from the layer above, store it in the state map then relay
-	 * it to the layer below with a modified activity list containing only the
-	 * activity at index 0
-	 */
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-		addJobToStateList(jobEvent);
-		List<Activity<?>> newActivityList = new ArrayList<>();
-		newActivityList.add(jobEvent.getActivities().get(0));
-		getBelow().receiveJob(
-				new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
-						.getIndex(), jobEvent.getContext(), jobEvent.getData(),
-						newActivityList));
-	}
-
-	class FailoverState extends JobState {
-		int currentActivityIndex = 0;
-
-		public FailoverState(DispatchJobEvent jobEvent) {
-			super(jobEvent);
-		}
-
-		@Override
-		public boolean handleError() {
-			currentActivityIndex++;
-			if (currentActivityIndex == jobEvent.getActivities().size())
-				return false;
-			List<Activity<?>> newActivityList = new ArrayList<>();
-			newActivityList.add(jobEvent.getActivities().get(
-					currentActivityIndex));
-			getBelow().receiveJob(
-					new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
-							.getIndex(), jobEvent.getContext(), jobEvent
-							.getData(), newActivityList));
-			return true;
-		}
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-		// Do nothing - there is no configuration to do
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
deleted file mode 100644
index 718079a..0000000
--- a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static java.lang.System.currentTimeMillis;
-
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-/**
- * Sits above the {@link Invoke} layer and collects information about the
- * current workflow run to be stored by the {@link ProvenanceConnector}.
- * 
- * @author Ian Dunlop
- * @author Stian Soiland-Reyes
- * 
- */
-public class IntermediateProvenance extends AbstractDispatchLayer<String> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
-	private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
-
-	private ProvenanceReporter reporter;
-	private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
-	private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
-	private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
-
-	// private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
-	// private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
-
-	private WorkflowProvenanceItem workflowItem;
-
-	@Override
-	public void configure(String o) {
-	}
-
-	/**
-	 * A set of provenance events for a particular owning process has been
-	 * finished with so you can remove all the {@link IterationProvenanceItem}s
-	 * from the map
-	 */
-	@Override
-	public void finishedWith(String owningProcess) {
-		processToIndexes.remove(owningProcess);
-	}
-
-	@Override
-	public String getConfiguration() {
-		return null;
-	}
-
-	protected Map<String, IterationProvenanceItem> getIndexesByProcess(
-			String owningProcess) {
-		synchronized (processToIndexes) {
-			Map<String, IterationProvenanceItem> indexes = processToIndexes
-					.get(owningProcess);
-			if (indexes == null) {
-				indexes = new HashMap<>();
-				processToIndexes.put(owningProcess, indexes);
-			}
-			return indexes;
-		}
-	}
-
-	protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
-		String owningProcess = event.getOwningProcess();
-		int[] originalIndex = event.getIndex();
-		int[] index = event.getIndex();
-		String indexStr = indexStr(index);
-		Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
-		IterationProvenanceItem iterationProvenanceItem = null;
-		synchronized (indexes) {
-			// find the iteration item for the int index eg [1]
-			iterationProvenanceItem = indexes.get(indexStr);
-			// if it is null then strip the index and look again
-
-			while (iterationProvenanceItem == null) {
-				try {
-					index = removeLastIndex(index);
-					iterationProvenanceItem = indexes.get(indexStr(index));
-					/*
-					 * if we have a 'parent' iteration then create a new
-					 * iteration for the original index and link it to the
-					 * activity and the input data
-					 * 
-					 * FIXME should this be linked to the parent iteration
-					 * instead?
-					 */
-					if (iterationProvenanceItem != null) {
-						// set the index to the one from the event
-						IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
-						iterationProvenanceItem1.setIteration(originalIndex);
-						iterationProvenanceItem1.setProcessId(owningProcess);
-						iterationProvenanceItem1.setIdentifier(UUID
-								.randomUUID().toString());
-						iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
-						iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
-						iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
-						iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());
-
-//						for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
-//								.entrySet()) {
-//							List<Object> value = entrySet.getValue();
-//							int[] newIndex = (int[]) value.get(0);
-//							String owner = (String) value.get(1);
-//							String indexString = indexStr(newIndex);
-//							String indexString2 = indexStr(index);
-//
-//							if (owningProcess.equalsIgnoreCase(owner)
-//									&& indexString
-//											.equalsIgnoreCase(indexString2))
-//								iterationProvenanceItem1.setParentId(entrySet
-//										.getKey().getIdentifier());
-//						}
-//						for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
-//								.entrySet()) {
-//							List<Object> value = entrySet.getValue();
-//							int[] newIndex = (int[]) value.get(0);
-//							String owner = (String) value.get(1);
-//							String indexString = indexStr(newIndex);
-//							String indexString2 = indexStr(index);
-//							if (owningProcess.equalsIgnoreCase(owner)
-//									&& indexString
-//											.equalsIgnoreCase(indexString2))
-//								iterationProvenanceItem1
-//										.setInputDataItem(entrySet.getKey());
-//						}
-
-						// for (ActivityProvenanceItem item :
-						// activityProvenanceItemList) {
-						// if (owningProcess.equalsIgnoreCase(item
-						// .getProcessId())) {
-						// iterationProvenanceItem1.setParentId(item
-						// .getIdentifier());
-						// }
-						// }
-						// for (InputDataProvenanceItem item :
-						// inputDataProvenanceItemList) {
-						// if (owningProcess.equalsIgnoreCase(item
-						// .getProcessId())) {
-						// iterationProvenanceItem1.setInputDataItem(item);
-						// }
-						// indexes.put(indexStr, iterationProvenanceItem1);
-						// return iterationProvenanceItem1;
-						// // }
-						// }
-
-						// add this new iteration item to the map
-						getIndexesByProcess(event.getOwningProcess()).put(
-								indexStr(event.getIndex()),
-								iterationProvenanceItem1);
-						return iterationProvenanceItem1;
-					}
-					/*
-					 * if we have not found an iteration items and the index is
-					 * [] then something is wrong remove the last index in the
-					 * int array before we go back through the while
-					 */
-				} catch (IllegalStateException e) {
-					logger
-							.warn("Cannot find a parent iteration with index [] for owning process: "
-									+ owningProcess
-									+ "Workflow invocation is in an illegal state");
-					throw e;
-				}
-			}
-
-			// if (iterationProvenanceItem == null) {
-			// logger.info("Iteration item was null for: "
-			// + event.getOwningProcess() + " " + event.getIndex());
-			// System.out.println("Iteration item was null for: "
-			// + event.getOwningProcess() + " " + event.getIndex());
-			// iterationProvenanceItem = new IterationProvenanceItem(index);
-			// iterationProvenanceItem.setProcessId(owningProcess);
-			// iterationProvenanceItem.setIdentifier(UUID.randomUUID()
-			// .toString());
-			// // for (ActivityProvenanceItem
-			// item:activityProvenanceItemList)
-			// // {
-			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
-			// // iterationProvenanceItem.setParentId(item.getIdentifier());
-			// // }
-			// // }
-			// // for (InputDataProvenanceItem item:
-			// // inputDataProvenanceItemList) {
-			// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
-			// // iterationProvenanceItem.setInputDataItem(item);
-			// // }
-			// // }
-			// indexes.put(indexStr, iterationProvenanceItem);
-
-		}
-		return iterationProvenanceItem;
-	}
-
-	private String indexStr(int[] index) {
-		StringBuilder indexStr = new StringBuilder();
-		for (int ind : index)
-			indexStr.append(":").append(ind);
-		return indexStr.toString();
-	}
-
-	/**
-	 * Remove the last index of the int array in the form 1:2:3 etc
-	 * 
-	 * @param index
-	 * @return
-	 */
-	@SuppressWarnings("unused")
-	private String[] stripLastIndex(int[] index) {
-		// will be in form :1:2:3
-		return indexStr(index).split(":");
-	}
-
-	/**
-	 * Remove the last value in the int array
-	 * 
-	 * @param index
-	 * @return
-	 */
-	private int[] removeLastIndex(int[] index) {
-		if (index.length == 0)
-			throw new IllegalStateException(
-					"There is no parent iteration of index [] for this result");
-		int[] newIntArray = new int[index.length - 1];
-		for (int i = 0; i < index.length - 1; i++)
-			newIntArray[i] = index[i];
-		return newIntArray;
-	}
-
-	private static String uuid() {
-		return UUID.randomUUID().toString();
-	}
-
-	/**
-	 * Create an {@link ErrorProvenanceItem} and send across to the
-	 * {@link ProvenanceConnector}
-	 */
-	@Override
-	public void receiveError(DispatchErrorEvent errorEvent) {
-		IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
-		// get using errorEvent.getOwningProcess();
-		
-		ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
-		errorItem.setCause(errorEvent
-				.getCause());
-		errorItem.setErrorType(errorEvent
-				.getFailureType().toString());
-		errorItem.setMessage(errorEvent.getMessage());
-		
-		errorItem.setProcessId(errorEvent.getOwningProcess());
-		errorItem.setIdentifier(uuid());
-		errorItem.setParentId(iterationProvItem.getIdentifier());
-		// iterationProvItem.setErrorItem(errorItem);
-		// FIXME don't need to add to the processor item earlier
-		getReporter().addProvenanceItem(errorItem);
-		super.receiveError(errorEvent);
-	}
-
-	/**
-	 * Create the {@link ProvenanceItem}s and send them all across to the
-	 * {@link ProvenanceConnector} except for the
-	 * {@link IterationProvenanceItem}, this one is told what it's inputs are
-	 * but has to wait until the results are received before being sent across.
-	 * Each item has a unique identifier and also knows who its parent item is
-	 */
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-			try {
-			// FIXME do we need this ProcessProvenanceItem?
-			ProcessProvenanceItem provenanceItem;
-			String[] split = jobEvent.getOwningProcess().split(":");
-			provenanceItem = new ProcessProvenanceItem();
-			String parentDataflowId = workflowItem.getParentId();
-			provenanceItem.setWorkflowId(parentDataflowId);
-			provenanceItem.setFacadeID(split[0]);
-			provenanceItem.setDataflowID(split[1]);
-			provenanceItem.setProcessId(jobEvent.getOwningProcess());
-			provenanceItem.setIdentifier(uuid());
-			provenanceItem.setParentId(workflowItem.getIdentifier());
-			ProcessorProvenanceItem processorProvItem;
-			processorProvItem = new ProcessorProvenanceItem();
-			processorProvItem.setWorkflowId(parentDataflowId);
-			processorProvItem.setProcessId(jobEvent
-					.getOwningProcess());
-			processorProvItem.setIdentifier(uuid());
-			processorProvItem.setParentId(provenanceItem.getIdentifier());
-			provenanceItem.setProcessId(jobEvent.getOwningProcess());
-			getReporter().addProvenanceItem(provenanceItem);
-			getReporter().addProvenanceItem(processorProvItem);
-	
-			IterationProvenanceItem iterationProvItem = null;
-			iterationProvItem = new IterationProvenanceItem();
-			iterationProvItem.setWorkflowId(parentDataflowId);
-			iterationProvItem.setIteration(jobEvent.getIndex());
-			iterationProvItem.setIdentifier(uuid());
-			
-			ReferenceService referenceService = jobEvent.getContext()
-					.getReferenceService();
-	
-			InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
-			inputDataItem.setDataMap(jobEvent.getData());
-			inputDataItem.setReferenceService(referenceService);
-			inputDataItem.setIdentifier(uuid());
-			inputDataItem.setParentId(iterationProvItem.getIdentifier());
-			inputDataItem.setProcessId(jobEvent.getOwningProcess());
-	
-			List<Object> inputIndexOwnerList = new ArrayList<>();
-			inputIndexOwnerList.add(jobEvent.getIndex());
-			inputIndexOwnerList.add(jobEvent.getOwningProcess());
-			inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
-	
-			// inputDataProvenanceItemList.add(inputDataItem);
-			iterationProvItem.setInputDataItem(inputDataItem);
-			iterationProvItem.setIteration(jobEvent.getIndex());
-			iterationProvItem.setProcessId(jobEvent.getOwningProcess());
-	
-			for (Activity<?> activity : jobEvent.getActivities())
-				if (activity instanceof AsynchronousActivity) {
-					ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
-					activityProvItem.setWorkflowId(parentDataflowId);
-					activityProvItem.setIdentifier(uuid());
-					iterationProvItem.setParentId(activityProvItem.getIdentifier());
-					// getConnector().addProvenanceItem(iterationProvItem);
-					activityProvItem.setParentId(processorProvItem.getIdentifier());
-					// processorProvItem.setActivityProvenanceItem(activityProvItem);
-					activityProvItem.setProcessId(jobEvent.getOwningProcess());
-					List<Object> activityIndexOwnerList = new ArrayList<>();
-					activityIndexOwnerList.add(jobEvent.getOwningProcess());
-					activityIndexOwnerList.add(jobEvent.getIndex());
-					activityProvenanceItemMap.put(activityProvItem,
-							inputIndexOwnerList);
-					// activityProvenanceItemList.add(activityProvItem);
-					// activityProvItem.setIterationProvenanceItem(iterationProvItem);
-					getReporter().addProvenanceItem(activityProvItem);
-					break;
-				}
-			getIndexesByProcess(jobEvent.getOwningProcess()).put(
-					indexStr(jobEvent.getIndex()), iterationProvItem);
-			iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
-			getReporter().addProvenanceItem(iterationProvItem);
-		} catch (RuntimeException ex) {
-			logger.error("Could not store provenance for " + jobEvent, ex);
-		}
-		
-		super.receiveJob(jobEvent);
-	}
-
-	@Override
-	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
-		super.receiveJobQueue(jobQueueEvent);
-	}
-
-	/**
-	 * Populate an {@link OutputDataProvenanceItem} with the results and attach
-	 * it to the appropriate {@link IterationProvenanceItem}. Then send the
-	 * {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
-	 */
-	@Override
-	public void receiveResult(DispatchResultEvent resultEvent) {
-		try {
-			// FIXME use the connector from the result event context
-			IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
-			iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
-			
-			ReferenceService referenceService = resultEvent.getContext()
-					.getReferenceService();
-
-			OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
-			outputDataItem.setDataMap(resultEvent.getData());
-			outputDataItem.setReferenceService(referenceService);
-			outputDataItem.setIdentifier(uuid());
-			outputDataItem.setProcessId(resultEvent.getOwningProcess());
-			outputDataItem.setParentId(iterationProvItem.getIdentifier());
-			iterationProvItem.setOutputDataItem(outputDataItem);
-			
-			getReporter().addProvenanceItem(iterationProvItem);
-			// getConnector().addProvenanceItem(outputDataItem);
-	
-			// PM -- testing
-			// add xencoding of data value here??
-	//		Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
-	//		for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
-	//			// create a simpler bean that we can serialize?
-	//			
-	//			T2Reference ref = entry.getValue();
-	//			
-	//			SimplerT2Reference t2RefBean = new SimplerT2Reference();
-	//			t2RefBean.setReferenceType(ref.getReferenceType());
-	//			t2RefBean.setDepth(ref.getDepth());
-	//			t2RefBean.setLocalPart(ref.getLocalPart());
-	//			t2RefBean.setNamespacePart(ref.getNamespacePart());
-	//						
-	//			System.out.println("data ref: "+ref);
-	//			String serializedInput = SerializeParam(t2RefBean);
-	//			System.out.println("serialized reference:" + serializedInput);
-	//			
-	//			System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
-//		}
-		} catch (Exception ex) {
-			logger.error("Could not store provenance for "
-					+ resultEvent.getOwningProcess() + " "
-					+ Arrays.toString(resultEvent.getIndex()), ex);
-			// But don't break super.receiveResult() !!
-		}
-		super.receiveResult(resultEvent);
-	}
-
-	@Override
-	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
-		super.receiveResultCompletion(completionEvent);
-	}
-
-	/**
-	 * Tell this layer what {@link ProvenanceConnector} implementation is being
-	 * used to capture the {@link ProvenanceItem}s. NOTE: should probably use
-	 * the connector from the result events context where possible
-	 * 
-	 * @param connector
-	 */
-	public void setReporter(ProvenanceReporter connector) {
-		this.reporter = connector;
-	}
-
-	public ProvenanceReporter getReporter() {
-		return reporter;
-	}
-
-	/**
-	 * So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
-	 * enacted this layer has to know about the {@link WorkflowProvenanceItem}
-	 * 
-	 * @param workflowItem
-	 */
-	public void setWorkflow(WorkflowProvenanceItem workflowItem) {
-		this.workflowItem = workflowItem;
-	}
-
-	// TODO is this unused?
-	public static String SerializeParam(Object ParamValue) {
-		ByteArrayOutputStream BStream = new ByteArrayOutputStream();
-		XMLEncoder encoder = new XMLEncoder(BStream);
-		encoder.writeObject(ParamValue);
-		encoder.close();
-		return BStream.toString();
-	}
-
-	// TODO is this unused?
-	public static Object DeserializeParam(String SerializedParam) {
-		InputStream IStream = new ByteArrayInputStream(
-				SerializedParam.getBytes());
-		XMLDecoder decoder = new XMLDecoder(IStream);
-		Object output = decoder.readObject();
-		decoder.close();
-		return output;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
deleted file mode 100644
index f8403df..0000000
--- a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.ERROR;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT_COMPLETION;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.sql.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.ControlBoundary;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.activity.MonitorableAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Context free invoker layer, does not pass index arrays of jobs into activity
- * instances.
- * <p>
- * This layer will invoke the first invokable activity in the activity list, so
- * any sane dispatch stack will have narrowed this down to a single item list by
- * this point, i.e. by the insertion of a failover layer.
- * <p>
- * Currently only handles activities implementing {@link AsynchronousActivity}.
- *
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- *
- */
-@DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {})
-@ControlBoundary
-public class Invoke extends AbstractDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke";
-	private static Logger logger = Logger.getLogger(Invoke.class);
-	private static Long invocationCount = 0L;
-
-	private MonitorManager monMan;
-
-	private static String getNextProcessID() {
-		long count;
-		synchronized (invocationCount) {
-			count = ++invocationCount;
-		}
-		return "invocation" + count;
-	}
-
-	public Invoke() {
-		super();
-		monMan = MonitorManager.getInstance();
-	}
-
-	@Override
-	public void configure(JsonNode config) {
-		// No configuration, do nothing
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return null;
-	}
-
-	/**
-	 * Receive a job from the layer above and pick the first concrete activity
-	 * from the list to invoke. Invoke this activity, creating a callback which
-	 * will wrap up the result messages in the appropriate collection depth
-	 * before sending them on (in general activities are not aware of their
-	 * invocation context and should not be responsible for providing correct
-	 * index arrays for results)
-	 * <p>
-	 * This layer will invoke the first invokable activity in the activity list,
-	 * so any sane dispatch stack will have narrowed this down to a single item
-	 * list by this point, i.e. by the insertion of a failover layer.
-	 */
-	@Override
-	public void receiveJob(final DispatchJobEvent jobEvent) {
-		for (Activity<?> activity : jobEvent.getActivities())
-			if (activity instanceof AsynchronousActivity) {
-				invoke(jobEvent, (AsynchronousActivity<?>) activity);
-				break;
-			}
-	}
-
-	protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
-		// Register with the monitor
-		final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
-				getNextProcessID()).getOwningProcess();
-		monMan.registerNode(activity, invocationProcessIdentifier,
-				new HashSet<MonitorableProperty<?>>());
-		monMan.registerNode(jobEvent, invocationProcessIdentifier,
-				new HashSet<MonitorableProperty<?>>());
-
-		/*
-		 * The activity is an AsynchronousActivity so we invoke it with an
-		 * AsynchronousActivityCallback object containing appropriate callback
-		 * methods to push results, completions and failures back to the
-		 * invocation layer.
-		 * 
-		 * Get the registered DataManager for this process. In most cases this
-		 * will just be a single DataManager for the entire workflow system but
-		 * it never hurts to generalize
-		 */
-
-		InvocationContext context = jobEvent.getContext();
-		final ReferenceService refService = context.getReferenceService();
-
-		InvocationStartedProvenanceItem invocationItem = null;
-		ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
-		if (provenanceReporter != null) {
-			IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
-			if (intermediateProvenance != null) {
-				invocationItem = new InvocationStartedProvenanceItem();
-				IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
-				invocationItem.setIdentifier(UUID.randomUUID().toString());
-				invocationItem.setActivity(activity);
-				invocationItem.setProcessId(jobEvent.getOwningProcess());
-				invocationItem.setInvocationProcessId(invocationProcessIdentifier);
-				invocationItem.setParentId(parentItem.getIdentifier());
-				invocationItem.setWorkflowId(parentItem.getWorkflowId());
-				invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
-				provenanceReporter.addProvenanceItem(invocationItem);
-			}
-		}
-
-		/*
-		 * Create a Map of EntityIdentifiers named appropriately given the
-		 * activity mapping
-		 */
-		Map<String, T2Reference> inputData = new HashMap<>();
-		for (String inputName : jobEvent.getData().keySet()) {
-			String activityInputName = activity
-					.getInputPortMapping().get(inputName);
-			if (activityInputName != null)
-				inputData.put(activityInputName, jobEvent.getData()
-						.get(inputName));
-		}
-
-		/*
-		 * Create a callback object to receive events, completions and failure
-		 * notifications from the activity
-		 */
-		AsynchronousActivityCallback callback = new InvokeCallBack(
-				jobEvent, refService, invocationProcessIdentifier,
-				activity);
-
-		if (activity instanceof MonitorableAsynchronousActivity<?>) {
-			/*
-			 * Monitorable activity so get the monitorable properties and push
-			 * them into the state tree after launching the job
-			 */
-			MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
-			Set<MonitorableProperty<?>> props = maa
-					.executeAsynchWithMonitoring(inputData, callback);
-			monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
-		} else {
-			/*
-			 * Run the job, passing in the callback we've just created along
-			 * with the (possibly renamed) input data map
-			 */
-			activity.executeAsynch(inputData, callback);
-		}
-	}
-
-	protected IntermediateProvenance findIntermediateProvenance() {
-		for (DispatchLayer<?> layer : getProcessor().getDispatchStack()
-				.getLayers())
-			if (layer instanceof IntermediateProvenance)
-				return (IntermediateProvenance) layer;
-		return null;
-	}
-
-	protected class InvokeCallBack implements AsynchronousActivityCallback {
-		protected final AsynchronousActivity<?> activity;
-		protected final String invocationProcessIdentifier;
-		protected final DispatchJobEvent jobEvent;
-		protected final ReferenceService refService;
-		protected boolean sentJob = false;
-
-		protected InvokeCallBack(DispatchJobEvent jobEvent,
-				ReferenceService refService,
-				String invocationProcessIdentifier,
-				AsynchronousActivity<?> asyncActivity) {
-			this.jobEvent = jobEvent;
-			this.refService = refService;
-			this.invocationProcessIdentifier = invocationProcessIdentifier;
-			this.activity = asyncActivity;
-		}
-
-		@Override
-		public void fail(String message) {
-			fail(message, null);
-		}
-
-		@Override
-		public void fail(String message, Throwable t) {
-			fail(message, t, DispatchErrorType.INVOCATION);
-		}
-
-		@Override
-		public void fail(String message, Throwable t,
-				DispatchErrorType errorType) {
-			logger.warn("Failed (" + errorType + ") invoking " + activity
-					+ " for job " + jobEvent + ": " + message, t);
-			monMan.deregisterNode(
-					invocationProcessIdentifier);
-			getAbove().receiveError(
-					new DispatchErrorEvent(jobEvent.getOwningProcess(),
-							jobEvent.getIndex(), jobEvent.getContext(),
-							message, t, errorType, activity));
-		}
-
-		@Override
-		public InvocationContext getContext() {
-			return jobEvent.getContext();
-		}
-
-		@Override
-		public String getParentProcessIdentifier() {
-			return invocationProcessIdentifier;
-		}
-
-		@Override
-		public void receiveCompletion(int[] completionIndex) {
-			if (completionIndex.length == 0)
-				// Final result, clean up monitor state
-				monMan.deregisterNode(invocationProcessIdentifier);
-			if (sentJob) {
-				int[] newIndex;
-				if (completionIndex.length == 0)
-					newIndex = jobEvent.getIndex();
-				else {
-					newIndex = new int[jobEvent.getIndex().length
-							+ completionIndex.length];
-					int i = 0;
-					for (int indexValue : jobEvent.getIndex())
-						newIndex[i++] = indexValue;
-					for (int indexValue : completionIndex)
-						newIndex[i++] = indexValue;
-				}
-				DispatchCompletionEvent c = new DispatchCompletionEvent(
-						jobEvent.getOwningProcess(), newIndex, jobEvent
-								.getContext());
-				getAbove().receiveResultCompletion(c);
-			} else {
-				/*
-				 * We haven't sent any 'real' data prior to completing a stream.
-				 * This in effect means we're sending an empty top level
-				 * collection so we need to register empty collections for each
-				 * output port with appropriate depth (by definition if we're
-				 * streaming all outputs are collection types of some kind)
-				 */
-				Map<String, T2Reference> emptyListMap = new HashMap<>();
-				for (OutputPort op : activity.getOutputPorts()) {
-					String portName = op.getName();
-					int portDepth = op.getDepth();
-					emptyListMap.put(portName, refService.getListService()
-							.registerEmptyList(portDepth, jobEvent.getContext()).getId());
-				}
-				receiveResult(emptyListMap, new int[0]);
-			}
-		}
-
-		@Override
-		public void receiveResult(Map<String, T2Reference> data, int[] index) {
-			/*
-			 * Construct a new result map using the activity mapping (activity
-			 * output name to processor output name)
-			 */
-			Map<String, T2Reference> resultMap = new HashMap<>();
-			for (String outputName : data.keySet()) {
-				String processorOutputName = activity
-						.getOutputPortMapping().get(outputName);
-				if (processorOutputName != null)
-					resultMap.put(processorOutputName, data.get(outputName));
-			}
-			/*
-			 * Construct a new index array if the specified index is non zero
-			 * length, otherwise just use the original job's index array (means
-			 * we're not streaming)
-			 */
-			int[] newIndex;
-			boolean streaming = false;
-			if (index.length == 0)
-				newIndex = jobEvent.getIndex();
-			else {
-				streaming = true;
-				newIndex = new int[jobEvent.getIndex().length + index.length];
-				int i = 0;
-				for (int indexValue : jobEvent.getIndex())
-					newIndex[i++] = indexValue;
-				for (int indexValue : index)
-					newIndex[i++] = indexValue;
-			}
-			DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent
-					.getOwningProcess(), newIndex, jobEvent.getContext(),
-					resultMap, streaming);
-			if (!streaming) {
-				monMan.registerNode(resultEvent, invocationProcessIdentifier,
-						new HashSet<MonitorableProperty<?>>());
-				// Final result, clean up monitor state
-				monMan.deregisterNode(invocationProcessIdentifier);
-			}
-			// Push the modified data to the layer above in the dispatch stack
-			getAbove().receiveResult(resultEvent);
-
-			sentJob = true;
-		}
-
-		@Override
-		public void requestRun(Runnable runMe) {
-			String newThreadName = jobEvent.toString();
-			Thread thread = new Thread(runMe, newThreadName);
-			thread.setContextClassLoader(activity.getClass()
-					.getClassLoader());
-			thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-				@Override
-				public void uncaughtException(Thread t, Throwable e) {
-					fail("Uncaught exception while invoking " + activity, e);
-				}
-			});
-			thread.start();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
deleted file mode 100644
index d5077a4..0000000
--- a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2008 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.AbstractDispatchEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-
-/**
- * A layer that allows while-style loops.
- * <p>
- * The layer is configured with a {@link LoopConfiguration}, where an activity
- * has been set as the
- * {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity)
- * condition}.
- * <p>
- * After a job has been successful further down the dispatch stack, the loop
- * layer will invoke the conditional activity to determine if the job will be
- * invoked again. If {@link LoopConfiguration#isRunFirst()} is false, this test
- * will be performed even before the first invocation. (The default
- * runFirst=true is equivalent to a do..while construct, while runFirst=false is
- * equivalent to a while.. construct.)
- * <p>
- * A job will be resent down the dispatch stack only if the conditional activity
- * returns a reference to a string equal to "true" on its output port "loop".
- * <p>
- * If a job or the conditional activity fails, the while-loop is interrupted and
- * the error is sent further up.
- * <p>
- * Note that the LoopLayer will be invoked for each item in an iteration, if you
- * want to do the loop for the whole collection (ie. re-iterating if the
- * loop-condition fails after processing the full list) - create a nested
- * workflow with the desired depths on it's input ports and insert this
- * LoopLayer in the stack of the nested workflow's processor in parent workflow.
- * <p>
- * It is recommended that the LoopLayer is to be inserted after the
- * {@link ErrorBounce} layer, as this layer is needed for registering errors
- * produced by the LoopLayer. If the user requires {@link Retry retries} and
- * {@link Failover failovers} before checking the while condition, such layers
- * should be below LoopLayer.
- *
- * @author Stian Soiland-Reyes
- */
-// FIXME Doesn't work
-@SuppressWarnings({"unchecked","rawtypes"})
-public class Loop extends AbstractDispatchLayer<JsonNode> {
-	public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop";
-	private static Logger logger = Logger.getLogger(Loop.class);
-
-	private JsonNode config = JsonNodeFactory.instance.objectNode();
-
-	protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>();
-	protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>();
-
-	@Override
-	public void configure(JsonNode config) {
-		this.config = config;
-	}
-
-	@Override
-	public void finishedWith(String owningProcess) {
-		String prefix = owningProcess + "[";
-		synchronized (outgoingJobs) {
-			for (String key : new ArrayList<>(outgoingJobs.keySet()))
-				if (key.startsWith(prefix))
-					outgoingJobs.remove(key);
-		}
-		synchronized (incomingJobs) {
-			for (String key : new ArrayList<>(incomingJobs.keySet()))
-				if (key.startsWith(prefix))
-					incomingJobs.remove(key);
-		}
-	}
-
-	@Override
-	public JsonNode getConfiguration() {
-		return config;
-	}
-
-	@Override
-	public void receiveJob(DispatchJobEvent jobEvent) {
-		synchronized (incomingJobs) {
-			incomingJobs.put(jobIdentifier(jobEvent), jobEvent);
-		}
-		if (config.get("runFirst").asBoolean()) {
-			// We'll do the conditional in receiveResult instead
-			super.receiveJob(jobEvent);
-			return;
-		}
-		checkCondition(jobEvent);
-	}
-
-	@Override
-	public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
-		synchronized (incomingJobs) {
-			incomingJobs.put(jobIdentifier(jobQueueEvent), jobQueueEvent);
-		}
-		if (config.get("runFirst").asBoolean()) {
-			// We'll do the conditional in receiveResult instead
-			super.receiveJobQueue(jobQueueEvent);
-			return;
-		}
-		checkCondition(jobQueueEvent);
-	}
-	
-	private Activity<?> getCondition() {
-		//return config.getCondition();
-		return null;
-	}
-
-	@Override
-	public void receiveResult(DispatchResultEvent resultEvent) {
-		Activity<?> condition = getCondition();
-		if (condition == null) {
-			super.receiveResult(resultEvent);
-			return;
-		}
-		synchronized (outgoingJobs) {
-			outgoingJobs.put(jobIdentifier(resultEvent), resultEvent);
-		}
-		checkCondition(resultEvent);
-	}
-
-	@Override
-	public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
-		Activity<?> condition = getCondition();
-		if (condition == null) {
-			super.receiveResultCompletion(completionEvent);
-			return;
-		}
-		synchronized (outgoingJobs) {
-			outgoingJobs.put(jobIdentifier(completionEvent), completionEvent);
-		}
-		checkCondition(completionEvent);
-	}
-
-	private void checkCondition(AbstractDispatchEvent event) {
-		Activity<?> condition = getCondition();
-		if (condition == null) {
-			super.receiveError(new DispatchErrorEvent(event.getOwningProcess(),
-					event.getIndex(), event.getContext(),
-					"Can't invoke condition service: null", null,
-					DispatchErrorType.INVOCATION, condition));
-			return;
-		}
-		if (!(condition instanceof AbstractAsynchronousActivity)) {
-			DispatchErrorEvent errorEvent = new DispatchErrorEvent(
-					event.getOwningProcess(),
-					event.getIndex(),
-					event.getContext(),
-					"Can't invoke condition service "
-							+ condition
-							+ " is not an instance of AbstractAsynchronousActivity",
-					null, DispatchErrorType.INVOCATION, condition);
-			super.receiveError(errorEvent);
-			return;
-		}
-		AbstractAsynchronousActivity asyncCondition = (AbstractAsynchronousActivity) condition;
-		String jobIdentifier = jobIdentifier(event);
-		Map<String, T2Reference> inputs = prepareInputs(asyncCondition,
-				jobIdentifier);
-		AsynchronousActivityCallback callback = new ConditionCallBack(
-				jobIdentifier);
-		asyncCondition.executeAsynch(inputs, callback);
-	}
-
-	private Map<String, T2Reference> prepareInputs(
-			AbstractAsynchronousActivity asyncCondition, String jobIdentifier) {
-		Map<String, T2Reference> inputs = new HashMap<>();
-		Map<String, T2Reference> inData = getInData(jobIdentifier);
-		Map<String, T2Reference> outData = getOutData(jobIdentifier);
-
-		Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts();
-		for (ActivityInputPort conditionIn : inputPorts) {
-			String conditionPort = conditionIn.getName();
-			if (outData.containsKey(conditionPort))
-				// Copy from previous output
-				inputs.put(conditionPort, outData.get(conditionPort));
-			else if (inData.containsKey(conditionPort))
-				// Copy from original input
-				inputs.put(conditionPort, inData.get(conditionPort));
-		}
-		return inputs;
-	}
-
-	private Map<String, T2Reference> getInData(String jobIdentifier) {
-		AbstractDispatchEvent inEvent;
-		synchronized (incomingJobs) {
-			inEvent = incomingJobs.get(jobIdentifier);
-		}
-		Map<String, T2Reference> inData = new HashMap<>();
-		if (inEvent instanceof DispatchJobEvent)
-			inData = ((DispatchJobEvent) inEvent).getData();
-		return inData;
-	}
-
-	private Map<String, T2Reference> getOutData(String jobIdentifier) {
-		AbstractDispatchEvent outEvent;
-		synchronized (outgoingJobs) {
-			outEvent = outgoingJobs.get(jobIdentifier);
-		}
-		Map<String, T2Reference> outData = new HashMap<>();
-		if (outEvent instanceof DispatchResultEvent)
-			outData = ((DispatchResultEvent) outEvent).getData();
-		return outData;
-	}
-
-	private String jobIdentifier(AbstractDispatchEvent event) {
-		String jobId = event.getOwningProcess()
-				+ Arrays.toString(event.getIndex());
-		return jobId;
-	}
-
-	public static final String LOOP_PORT = "loop";
-
-	public class ConditionCallBack implements AsynchronousActivityCallback {
-		private InvocationContext context;
-		private final String jobIdentifier;
-		private String processId;
-
-		public ConditionCallBack(String jobIdentifier) {
-			this.jobIdentifier = jobIdentifier;
-			AbstractDispatchEvent originalEvent;
-			synchronized (incomingJobs) {
-				originalEvent = incomingJobs.get(jobIdentifier);
-			}
-			context = originalEvent.getContext();
-			processId = originalEvent.getOwningProcess() + ":condition";
-		}
-
-		@Override
-		public void fail(String message) {
-			fail(message, null, DispatchErrorType.INVOCATION);
-		}
-
-		@Override
-		public void fail(String message, Throwable t) {
-			fail(message, t, DispatchErrorType.INVOCATION);
-		}
-
-		@Override
-		public void fail(String message, Throwable t,
-				DispatchErrorType errorType) {
-			logger.warn("Failed (" + errorType + ") invoking condition service "
-					+ jobIdentifier + ":" + message, t);
-
-			AbstractDispatchEvent originalEvent;
-			synchronized (incomingJobs) {
-				originalEvent = incomingJobs.get(jobIdentifier);
-			}
-			receiveError(new DispatchErrorEvent(originalEvent
-					.getOwningProcess(), originalEvent.getIndex(),
-					originalEvent.getContext(),
-					"Can't invoke condition service ", t,
-					DispatchErrorType.INVOCATION, null));
-		}
-
-		@Override
-		public InvocationContext getContext() {
-			return context;
-		}
-
-		@Override
-		public String getParentProcessIdentifier() {
-			return processId;
-		}
-
-		@Override
-		public void receiveCompletion(int[] completionIndex) {
-			// Ignore streaming
-		}
-
-		@Override
-		public void receiveResult(Map<String, T2Reference> data, int[] index) {
-			if (index.length > 0) {
-				// Ignore streaming
-				return;
-			}
-			T2Reference loopRef = data.get(LOOP_PORT);
-			if (loopRef == null) {
-				fail("Condition service didn't contain output port " + LOOP_PORT);
-				return;
-			}
-			if (loopRef.containsErrors()) {
-				fail("Condition service failed: " + loopRef);
-				return;
-			}
-			if (loopRef.getDepth() != 0) {
-				fail("Condition service output " + LOOP_PORT
-						+ " depth is not 0, but " + loopRef.getDepth());
-			}
-			ReferenceService referenceService = context.getReferenceService();
-			String loop = (String) referenceService.renderIdentifier(loopRef,
-					String.class, context);
-
-			if (Boolean.parseBoolean(loop)) {
-				// Push it down again
-				AbstractDispatchEvent dispatchEvent;
-				synchronized (incomingJobs) {
-					dispatchEvent = incomingJobs.get(jobIdentifier);
-				}
-				if (dispatchEvent == null) {
-					fail("Unknown job identifier " + jobIdentifier);
-				}
-				if (dispatchEvent instanceof DispatchJobEvent) {
-					DispatchJobEvent newJobEvent = prepareNewJobEvent(data,
-							dispatchEvent);
-					getBelow().receiveJob(newJobEvent);
-				} else if (dispatchEvent instanceof DispatchJobQueueEvent) {
-					getBelow().receiveJobQueue(
-							(DispatchJobQueueEvent) dispatchEvent);
-				} else {
-					fail("Unknown type of incoming event " + dispatchEvent);
-				}
-				return;
-
-			} else {
-				// We'll push it up, end of loop for now
-
-				AbstractDispatchEvent outgoingEvent;
-				synchronized (outgoingJobs) {
-					outgoingEvent = outgoingJobs.get(jobIdentifier);
-				}
-				if (outgoingEvent == null && !config.get("runFirst").asBoolean()) {
-					fail("Initial loop condition failed");
-				}
-				if (outgoingEvent instanceof DispatchCompletionEvent) {
-					getAbove().receiveResultCompletion(
-							(DispatchCompletionEvent) outgoingEvent);
-				} else if (outgoingEvent instanceof DispatchResultEvent) {
-					getAbove().receiveResult(
-							(DispatchResultEvent) outgoingEvent);
-				} else {
-					fail("Unknown type of outgoing event " + outgoingEvent);
-				}
-			}
-
-		}
-
-		private DispatchJobEvent prepareNewJobEvent(
-				Map<String, T2Reference> data,
-				AbstractDispatchEvent dispatchEvent) {
-			DispatchJobEvent dispatchJobEvent = (DispatchJobEvent) dispatchEvent;
-			Map<String, T2Reference> newInputs = new HashMap<String, T2Reference>(
-					dispatchJobEvent.getData());
-			newInputs.putAll(data);
-			DispatchJobEvent newJobEvent = new DispatchJobEvent(dispatchEvent
-					.getOwningProcess(), dispatchEvent.getIndex(),
-					dispatchEvent.getContext(), newInputs,
-					((DispatchJobEvent) dispatchEvent).getActivities());
-			/*
-			 * TODO: Should this be registered as an incomingJobs? If so the
-			 * conditional could even feed to itself, and we should also keep a
-			 * list of originalJobs.
-			 */
-			return newJobEvent;
-		}
-
-		@Override
-		public void requestRun(Runnable runMe) {
-			String newThreadName = "Condition service "
-					+ getParentProcessIdentifier();
-			Thread thread = new Thread(runMe, newThreadName);
-			thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-				@Override
-				public void uncaughtException(Thread t, Throwable e) {
-					fail("Uncaught exception while invoking " + jobIdentifier,
-							e);
-				}
-			});
-			thread.start();
-		}
-	}
-
-	@Override
-	public Processor getProcessor() {
-		if (dispatchStack == null)
-			return null;
-		return dispatchStack.getProcessor();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java b/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
deleted file mode 100644
index 7cfa2a5..0000000
--- a/taverna-workflowmodel-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.util.Properties;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-/**
- * Configuration bean for the {@link Loop}.
- * <p>
- * Set the {@link #setCondition(Activity)} for an activity with an output port
- * called "loop". The LoopLayer will re-send a job only if this port exist and
- * it's output can be dereferenced to a string equal to "true".
- * </p>
- * <p>
- * If {@link #isRunFirst()} is false, the loop layer will check the condition
- * before invoking the job for the first time, otherwise the condition will be
- * invoked after the job has come back with successful results.
- * </p>
- * 
- * @author Stian Soiland-Reyes
- * 
- */
-@ConfigurationBean(uri = Loop.URI + "#Config")
-public class LoopConfiguration implements Cloneable {
-	private Activity<?> condition = null;
-	private Boolean runFirst;
-	private Properties properties;
-
-	public Properties getProperties() {
-		synchronized (this) {
-			if (properties == null)
-				properties = new Properties();
-		}
-		return properties;
-	}
-
-	public void setProperties(Properties properties) {
-		this.properties = properties;
-	}
-
-	@Override
-	public LoopConfiguration clone() {
-		LoopConfiguration clone;
-		try {
-			clone = (LoopConfiguration) super.clone();
-			clone.condition = null;
-		} catch (CloneNotSupportedException e) {
-			throw new RuntimeException("Unexpected CloneNotSupportedException",
-					e);
-		}
-		return clone;
-	}
-
-	public Activity<?> getCondition() {
-		return condition;
-	}
-
-	public boolean isRunFirst() {
-		if (runFirst == null)
-			return true;
-		return runFirst;
-	}
-
-	@ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required = false)
-	public void setCondition(Activity<?> activity) {
-		this.condition = activity;
-	}
-
-	@ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required = false)
-	public void setRunFirst(boolean runFirst) {
-		this.runFirst = runFirst;
-	}
-}