You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/27 16:26:52 UTC
[5/6] incubator-taverna-engine git commit:
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
taverna-workflowmodel-core-extensions -> taverna-workflowmodel-extensions
Project: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/commit/4252fa90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/tree/4252fa90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/diff/4252fa90
Branch: refs/heads/master
Commit: 4252fa90112703905bed89dc4b46f59a77b87e9c
Parents: 46cc153
Author: Stian Soiland-Reyes <st...@apache.org>
Authored: Mon Feb 23 15:48:10 2015 +0000
Committer: Stian Soiland-Reyes <st...@apache.org>
Committed: Mon Feb 23 15:48:10 2015 +0000
----------------------------------------------------------------------
pom.xml | 2 +-
taverna-platform-integration-tests/pom.xml | 2 +-
.../.gitignore | 1 -
taverna-workflowmodel-core-extensions/pom.xml | 30 --
.../layers/CoreDispatchLayerFactory.java | 103 ----
.../processor/dispatch/layers/ErrorBounce.java | 324 ------------
.../processor/dispatch/layers/Failover.java | 111 ----
.../dispatch/layers/IntermediateProvenance.java | 508 -------------------
.../processor/dispatch/layers/Invoke.java | 369 --------------
.../processor/dispatch/layers/Loop.java | 424 ----------------
.../dispatch/layers/LoopConfiguration.java | 75 ---
.../processor/dispatch/layers/Parallelize.java | 463 -----------------
.../dispatch/layers/ParallelizeConfig.java | 50 --
.../processor/dispatch/layers/Retry.java | 180 -------
.../processor/dispatch/layers/RetryConfig.java | 97 ----
.../processor/dispatch/layers/Stop.java | 163 ------
.../processor/dispatch/layers/package.html | 4 -
...rkflowmodel-core-extensions-context-osgi.xml | 11 -
.../workflowmodel-core-extensions-context.xml | 9 -
.../processor/dispatch/layers/TestRetry.java | 110 ----
taverna-workflowmodel-extensions/.gitignore | 1 +
taverna-workflowmodel-extensions/pom.xml | 30 ++
.../layers/CoreDispatchLayerFactory.java | 103 ++++
.../processor/dispatch/layers/ErrorBounce.java | 324 ++++++++++++
.../processor/dispatch/layers/Failover.java | 111 ++++
.../dispatch/layers/IntermediateProvenance.java | 508 +++++++++++++++++++
.../processor/dispatch/layers/Invoke.java | 369 ++++++++++++++
.../processor/dispatch/layers/Loop.java | 424 ++++++++++++++++
.../dispatch/layers/LoopConfiguration.java | 75 +++
.../processor/dispatch/layers/Parallelize.java | 463 +++++++++++++++++
.../dispatch/layers/ParallelizeConfig.java | 50 ++
.../processor/dispatch/layers/Retry.java | 180 +++++++
.../processor/dispatch/layers/RetryConfig.java | 97 ++++
.../processor/dispatch/layers/Stop.java | 163 ++++++
.../processor/dispatch/layers/package.html | 4 +
...rkflowmodel-core-extensions-context-osgi.xml | 11 +
.../workflowmodel-core-extensions-context.xml | 9 +
.../processor/dispatch/layers/TestRetry.java | 110 ++++
taverna-workflowmodel-impl/pom.xml | 2 +-
39 files changed, 3035 insertions(+), 3035 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index daef4fd..001d17c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<module>taverna-services-impl</module>
<module>taverna-stringconstant-activity</module>
<module>taverna-workflowmodel-api</module>
- <module>taverna-workflowmodel-core-extensions</module>
+ <module>taverna-workflowmodel-extensions</module>
<module>taverna-workflowmodel-impl</module>
</modules>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-platform-integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-platform-integration-tests/pom.xml b/taverna-platform-integration-tests/pom.xml
index 362fbdc..a958d1c 100644
--- a/taverna-platform-integration-tests/pom.xml
+++ b/taverna-platform-integration-tests/pom.xml
@@ -246,7 +246,7 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>taverna-workflowmodel-core-extensions</artifactId>
+ <artifactId>taverna-workflowmodel-extensions</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/.gitignore
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/.gitignore b/taverna-workflowmodel-core-extensions/.gitignore
deleted file mode 100644
index b83d222..0000000
--- a/taverna-workflowmodel-core-extensions/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target/
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/pom.xml b/taverna-workflowmodel-core-extensions/pom.xml
deleted file mode 100644
index 700e56d..0000000
--- a/taverna-workflowmodel-core-extensions/pom.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.taverna.engine</groupId>
- <artifactId>taverna-engine</artifactId>
- <version>3.1.0-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>taverna-workflowmodel-core-extensions</artifactId>
- <packaging>bundle</packaging>
- <name>Apache Taverna Workflow Model Extension Points</name>
- <description>Implementation of core extension points to the workflow model</description>
- <dependencies>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>taverna-workflowmodel-api</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
deleted file mode 100644
index 0b11627..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
-
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayerFactory;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Factory for creating core dispatch layers.
- *
- * The core dispatch layers are :
- * <ul>
- * <li>ErrorBounce</li>
- * <li>Parallelize</li>
- * <li>Failover</li>
- * <li>Retry</li>
- * <li>Stop</li>
- * <li>Invoke</li>
- * <li>Loop</li>
- * <li>IntermediateProvenance</li>
- * </ul>
- *
- * @author David Withers
- */
-public class CoreDispatchLayerFactory implements DispatchLayerFactory {
- private static final URI parallelizeLayer = URI.create(Parallelize.URI);
- private static final URI errorBounceLayer = URI.create(ErrorBounce.URI);
- private static final URI failoverLayer = URI.create(Failover.URI);
- private static final URI retryLayer = URI.create(Retry.URI);
- private static final URI invokeLayer = URI.create(Invoke.URI);
- private static final URI loopLayer = URI.create(Loop.URI);
- private static final URI intermediateProvenanceLayer = URI.create(IntermediateProvenance.URI);
- private static final URI stopLayer = URI.create(Stop.URI);
-
- private final static Set<URI> dispatchLayerURIs = new HashSet<URI>();
-
- static {
- dispatchLayerURIs.add(parallelizeLayer);
- dispatchLayerURIs.add(errorBounceLayer);
- dispatchLayerURIs.add(failoverLayer);
- dispatchLayerURIs.add(retryLayer);
- dispatchLayerURIs.add(invokeLayer);
- dispatchLayerURIs.add(loopLayer);
- dispatchLayerURIs.add(intermediateProvenanceLayer);
- dispatchLayerURIs.add(stopLayer);
- }
-
- @Override
- public DispatchLayer<?> createDispatchLayer(URI uri) {
- if (parallelizeLayer.equals(uri))
- return new Parallelize();
- else if (errorBounceLayer.equals(uri))
- return new ErrorBounce();
- else if (failoverLayer.equals(uri))
- return new Failover();
- else if (retryLayer.equals(uri))
- return new Retry();
- else if (invokeLayer.equals(uri))
- return new Invoke();
- else if (loopLayer.equals(uri))
- return new Loop();
- else if (intermediateProvenanceLayer.equals(uri))
- return new IntermediateProvenance();
- else if (stopLayer.equals(uri))
- return new Stop();
- return null;
- }
-
- @Override
- public JsonNode getDispatchLayerConfigurationSchema(URI uri) {
- // TODO
- return null;
- }
-
- @Override
- public Set<URI> getDispatchLayerTypes() {
- return dispatchLayerURIs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
deleted file mode 100644
index dfde240..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.NO_EFFECT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_PROCESS_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-import net.sf.taverna.t2.reference.ErrorDocument;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultCompletionReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.SupportsStreamedResult;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-/**
- * Receives job events, checks to see whether any parameters in the job are
- * error tokens or collections which contain errors. If so then sends a
- * corresponding result message back where all outputs are error tokens having
- * registered such with the invocation context's data manager. It also re-writes
- * any failure messages as result messages containing error tokens at the
- * appropriate depth - this means that it must be placed above any error
- * handling layers in order for those to have an effect at all. In general this
- * layer should be placed immediately below the parallelize layer in most
- * default cases (this will guarantee the processor never sees a failure message
- * though, which may or may not be desirable)
- *
- * @author Tom Oinn
- *
- */
-@DispatchLayerErrorReaction(emits = { RESULT }, relaysUnmodified = false, stateEffects = {
- CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE })
-@DispatchLayerJobReaction(emits = { RESULT }, relaysUnmodified = true, stateEffects = {
- CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE, NO_EFFECT })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@DispatchLayerResultCompletionReaction(emits = {}, relaysUnmodified = true, stateEffects = {})
-@SupportsStreamedResult
-public class ErrorBounce extends AbstractDispatchLayer<JsonNode> implements
- PropertyContributingDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce";
-
- /**
- * Track the number of reflected and translated errors handled by this error
- * bounce instance
- */
- private Map<String, ErrorBounceState> state = new ConcurrentHashMap<>();
-
- private int totalTranslatedErrors = 0;
- private int totalReflectedErrors = 0;
-
- private synchronized ErrorBounceState getState(String owningProcess) {
- if (state.containsKey(owningProcess))
- return state.get(owningProcess);
- ErrorBounceState ebs = new ErrorBounceState();
- state.put(owningProcess, ebs);
- return ebs;
- }
-
- /**
- * If the job contains errors, or collections which contain errors
- * themselves then bounce a result message with error documents in back up
- * to the layer above
- */
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- Set<T2Reference> errorReferences = new HashSet<>();
- for (T2Reference ei : jobEvent.getData().values())
- if (ei.containsErrors())
- errorReferences.add(ei);
- if (errorReferences.isEmpty())
- // relay the message down...
- getBelow().receiveJob(jobEvent);
- else {
- getState(jobEvent.getOwningProcess()).incrementErrorsReflected();
- sendErrorOutput(jobEvent, null, errorReferences);
- }
- }
-
- /**
- * Always send the error document job result on receiving a failure, at
- * least for now! This should be configurable, in effect this is the part
- * that ensures the processor never sees a top level failure.
- */
- @Override
- public void receiveError(DispatchErrorEvent errorEvent) {
- getState(errorEvent.getOwningProcess()).incrementErrorsTranslated();
- sendErrorOutput(errorEvent, errorEvent.getCause(), null);
- }
-
- /**
- * Construct and send a new result message with error documents in place of
- * all outputs at the appropriate depth
- *
- * @param event
- * @param cause
- * @param errorReferences
- */
- private void sendErrorOutput(Event<?> event, Throwable cause,
- Set<T2Reference> errorReferences) {
- ReferenceService rs = event.getContext().getReferenceService();
-
- Processor p = dispatchStack.getProcessor();
- Map<String, T2Reference> outputDataMap = new HashMap<>();
- String[] owningProcessArray = event.getOwningProcess().split(":");
- String processor = owningProcessArray[owningProcessArray.length - 1];
- for (OutputPort op : p.getOutputPorts()) {
- String message = "Processor '" + processor + "' - Port '"
- + op.getName() + "'";
- if (event instanceof DispatchErrorEvent)
- message += ": " + ((DispatchErrorEvent) event).getMessage();
- ErrorDocument ed;
- if (cause != null)
- ed = rs.getErrorDocumentService().registerError(message, cause,
- op.getDepth(), event.getContext());
- else
- ed = rs.getErrorDocumentService().registerError(message,
- errorReferences, op.getDepth(), event.getContext());
- outputDataMap.put(op.getName(), ed.getId());
- }
- DispatchResultEvent dre = new DispatchResultEvent(
- event.getOwningProcess(), event.getIndex(), event.getContext(),
- outputDataMap, false);
- getAbove().receiveResult(dre);
- }
-
- @Override
- public void configure(JsonNode config) {
- // Do nothing - no configuration required
- }
-
- @Override
- public JsonNode getConfiguration() {
- // Layer has no configuration associated
- return null;
- }
-
- @Override
- public void finishedWith(final String owningProcess) {
- /*
- * Delay the removal of the state to give the monitor a chance to poll
- */
- cleanupTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- state.remove(owningProcess);
- }
- }, CLEANUP_DELAY_MS);
- }
-
- /**
- * Two properties, dispatch.errorbounce.reflected(integer) is the number of
- * incoming jobs which have been bounced back as results with errors,
- * dispatch.errorbounce.translated(integer) is the number of failures from
- * downstream in the stack that have been re-written as complete results
- * containing error documents.
- */
- @Override
- public void injectPropertiesFor(final String owningProcess) {
- MonitorableProperty<Integer> errorsReflectedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce", "reflected" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- ErrorBounceState ebs = state.get(owningProcess);
- if (ebs == null)
- return 0;
- return ebs.getErrorsReflected();
- }
- };
- dispatchStack.receiveMonitorableProperty(errorsReflectedProperty,
- owningProcess);
-
- MonitorableProperty<Integer> errorsTranslatedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce", "translated" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- ErrorBounceState ebs = state.get(owningProcess);
- if (ebs == null)
- return 0;
- return ebs.getErrorsTranslated();
- }
- };
- dispatchStack.receiveMonitorableProperty(errorsTranslatedProperty,
- owningProcess);
-
- MonitorableProperty<Integer> totalTranslatedTranslatedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce",
- "totalTranslated" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- return totalTranslatedErrors;
- }
- };
- dispatchStack.receiveMonitorableProperty(
- totalTranslatedTranslatedProperty, owningProcess);
-
- MonitorableProperty<Integer> totalReflectedTranslatedProperty = new MonitorableProperty<Integer>() {
- @Override
- public Date getLastModified() {
- return new Date();
- }
-
- @Override
- public String[] getName() {
- return new String[] { "dispatch", "errorbounce",
- "totalReflected" };
- }
-
- @Override
- public Integer getValue() throws NoSuchPropertyException {
- return totalReflectedErrors;
- }
- };
- dispatchStack.receiveMonitorableProperty(
- totalReflectedTranslatedProperty, owningProcess);
- }
-
- public class ErrorBounceState {
- private int errorsReflected = 0;
- private int errorsTranslated = 0;
-
- /**
- * Number of times the bounce layer has converted an incoming job event
- * where the input data contained error tokens into a result event
- * containing all errors.
- */
- int getErrorsReflected() {
- return this.errorsReflected;
- }
-
- /**
- * Number of times the bounce layer has converted an incoming failure
- * event into a result containing error tokens
- */
- int getErrorsTranslated() {
- return errorsTranslated;
- }
-
- void incrementErrorsReflected() {
- synchronized (this) {
- errorsReflected++;
- }
- synchronized (ErrorBounce.this) {
- totalReflectedErrors++;
- }
- }
-
- void incrementErrorsTranslated() {
- synchronized (this) {
- errorsTranslated++;
- }
- synchronized (ErrorBounce.this) {
- totalTranslatedErrors++;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
deleted file mode 100644
index 1c5ef03..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.CREATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.REMOVE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerStateEffect.UPDATE_LOCAL_STATE;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.JOB;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractErrorHandlerLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerErrorReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerResultReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Failure handling dispatch layer, consumes job events with multiple activities
- * and emits the same job but with only the first activity. On failures the job
- * is resent to the layer below with a new activity list containing the second
- * in the original list and so on. If a failure is received and there are no
- * further activities to use the job fails and the failure is sent back up to
- * the layer above.
- *
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- */
-@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
- UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
-@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
-@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
-public class Failover extends AbstractErrorHandlerLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover";
-
- @Override
- protected JobState getStateObject(DispatchJobEvent jobEvent) {
- return new FailoverState(jobEvent);
- }
-
- /**
- * Receive a job from the layer above, store it in the state map then relay
- * it to the layer below with a modified activity list containing only the
- * activity at index 0
- */
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- addJobToStateList(jobEvent);
- List<Activity<?>> newActivityList = new ArrayList<>();
- newActivityList.add(jobEvent.getActivities().get(0));
- getBelow().receiveJob(
- new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
- .getIndex(), jobEvent.getContext(), jobEvent.getData(),
- newActivityList));
- }
-
- class FailoverState extends JobState {
- int currentActivityIndex = 0;
-
- public FailoverState(DispatchJobEvent jobEvent) {
- super(jobEvent);
- }
-
- @Override
- public boolean handleError() {
- currentActivityIndex++;
- if (currentActivityIndex == jobEvent.getActivities().size())
- return false;
- List<Activity<?>> newActivityList = new ArrayList<>();
- newActivityList.add(jobEvent.getActivities().get(
- currentActivityIndex));
- getBelow().receiveJob(
- new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
- .getIndex(), jobEvent.getContext(), jobEvent
- .getData(), newActivityList));
- return true;
- }
- }
-
- @Override
- public void configure(JsonNode config) {
- // Do nothing - there is no configuration to do
- }
-
- @Override
- public JsonNode getConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
deleted file mode 100644
index 718079a..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static java.lang.System.currentTimeMillis;
-
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.Event;
-import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-/**
- * Sits above the {@link Invoke} layer and collects information about the
- * current workflow run to be stored by the {@link ProvenanceConnector}.
- *
- * @author Ian Dunlop
- * @author Stian Soiland-Reyes
- *
- */
-public class IntermediateProvenance extends AbstractDispatchLayer<String> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
- private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
-
- private ProvenanceReporter reporter;
- private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
- private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
- private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
-
- // private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
- // private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
-
- private WorkflowProvenanceItem workflowItem;
-
- @Override
- public void configure(String o) {
- }
-
- /**
- * A set of provenance events for a particular owning process has been
- * finished with so you can remove all the {@link IterationProvenanceItem}s
- * from the map
- */
- @Override
- public void finishedWith(String owningProcess) {
- processToIndexes.remove(owningProcess);
- }
-
- @Override
- public String getConfiguration() {
- return null;
- }
-
- protected Map<String, IterationProvenanceItem> getIndexesByProcess(
- String owningProcess) {
- synchronized (processToIndexes) {
- Map<String, IterationProvenanceItem> indexes = processToIndexes
- .get(owningProcess);
- if (indexes == null) {
- indexes = new HashMap<>();
- processToIndexes.put(owningProcess, indexes);
- }
- return indexes;
- }
- }
-
- protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
- String owningProcess = event.getOwningProcess();
- int[] originalIndex = event.getIndex();
- int[] index = event.getIndex();
- String indexStr = indexStr(index);
- Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
- IterationProvenanceItem iterationProvenanceItem = null;
- synchronized (indexes) {
- // find the iteration item for the int index eg [1]
- iterationProvenanceItem = indexes.get(indexStr);
- // if it is null then strip the index and look again
-
- while (iterationProvenanceItem == null) {
- try {
- index = removeLastIndex(index);
- iterationProvenanceItem = indexes.get(indexStr(index));
- /*
- * if we have a 'parent' iteration then create a new
- * iteration for the original index and link it to the
- * activity and the input data
- *
- * FIXME should this be linked to the parent iteration
- * instead?
- */
- if (iterationProvenanceItem != null) {
- // set the index to the one from the event
- IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
- iterationProvenanceItem1.setIteration(originalIndex);
- iterationProvenanceItem1.setProcessId(owningProcess);
- iterationProvenanceItem1.setIdentifier(UUID
- .randomUUID().toString());
- iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
- iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
- iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
- iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());
-
-// for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
-// .entrySet()) {
-// List<Object> value = entrySet.getValue();
-// int[] newIndex = (int[]) value.get(0);
-// String owner = (String) value.get(1);
-// String indexString = indexStr(newIndex);
-// String indexString2 = indexStr(index);
-//
-// if (owningProcess.equalsIgnoreCase(owner)
-// && indexString
-// .equalsIgnoreCase(indexString2))
-// iterationProvenanceItem1.setParentId(entrySet
-// .getKey().getIdentifier());
-// }
-// for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
-// .entrySet()) {
-// List<Object> value = entrySet.getValue();
-// int[] newIndex = (int[]) value.get(0);
-// String owner = (String) value.get(1);
-// String indexString = indexStr(newIndex);
-// String indexString2 = indexStr(index);
-// if (owningProcess.equalsIgnoreCase(owner)
-// && indexString
-// .equalsIgnoreCase(indexString2))
-// iterationProvenanceItem1
-// .setInputDataItem(entrySet.getKey());
-// }
-
- // for (ActivityProvenanceItem item :
- // activityProvenanceItemList) {
- // if (owningProcess.equalsIgnoreCase(item
- // .getProcessId())) {
- // iterationProvenanceItem1.setParentId(item
- // .getIdentifier());
- // }
- // }
- // for (InputDataProvenanceItem item :
- // inputDataProvenanceItemList) {
- // if (owningProcess.equalsIgnoreCase(item
- // .getProcessId())) {
- // iterationProvenanceItem1.setInputDataItem(item);
- // }
- // indexes.put(indexStr, iterationProvenanceItem1);
- // return iterationProvenanceItem1;
- // // }
- // }
-
- // add this new iteration item to the map
- getIndexesByProcess(event.getOwningProcess()).put(
- indexStr(event.getIndex()),
- iterationProvenanceItem1);
- return iterationProvenanceItem1;
- }
- /*
- * if we have not found an iteration items and the index is
- * [] then something is wrong remove the last index in the
- * int array before we go back through the while
- */
- } catch (IllegalStateException e) {
- logger
- .warn("Cannot find a parent iteration with index [] for owning process: "
- + owningProcess
- + "Workflow invocation is in an illegal state");
- throw e;
- }
- }
-
- // if (iterationProvenanceItem == null) {
- // logger.info("Iteration item was null for: "
- // + event.getOwningProcess() + " " + event.getIndex());
- // System.out.println("Iteration item was null for: "
- // + event.getOwningProcess() + " " + event.getIndex());
- // iterationProvenanceItem = new IterationProvenanceItem(index);
- // iterationProvenanceItem.setProcessId(owningProcess);
- // iterationProvenanceItem.setIdentifier(UUID.randomUUID()
- // .toString());
- // // for (ActivityProvenanceItem
- // item:activityProvenanceItemList)
- // // {
- // // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
- // // iterationProvenanceItem.setParentId(item.getIdentifier());
- // // }
- // // }
- // // for (InputDataProvenanceItem item:
- // // inputDataProvenanceItemList) {
- // // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
- // // iterationProvenanceItem.setInputDataItem(item);
- // // }
- // // }
- // indexes.put(indexStr, iterationProvenanceItem);
-
- }
- return iterationProvenanceItem;
- }
-
- private String indexStr(int[] index) {
- StringBuilder indexStr = new StringBuilder();
- for (int ind : index)
- indexStr.append(":").append(ind);
- return indexStr.toString();
- }
-
- /**
- * Remove the last index of the int array in the form 1:2:3 etc
- *
- * @param index
- * @return
- */
- @SuppressWarnings("unused")
- private String[] stripLastIndex(int[] index) {
- // will be in form :1:2:3
- return indexStr(index).split(":");
- }
-
- /**
- * Remove the last value in the int array
- *
- * @param index
- * @return
- */
- private int[] removeLastIndex(int[] index) {
- if (index.length == 0)
- throw new IllegalStateException(
- "There is no parent iteration of index [] for this result");
- int[] newIntArray = new int[index.length - 1];
- for (int i = 0; i < index.length - 1; i++)
- newIntArray[i] = index[i];
- return newIntArray;
- }
-
- private static String uuid() {
- return UUID.randomUUID().toString();
- }
-
- /**
- * Create an {@link ErrorProvenanceItem} and send across to the
- * {@link ProvenanceConnector}
- */
- @Override
- public void receiveError(DispatchErrorEvent errorEvent) {
- IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
- // get using errorEvent.getOwningProcess();
-
- ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
- errorItem.setCause(errorEvent
- .getCause());
- errorItem.setErrorType(errorEvent
- .getFailureType().toString());
- errorItem.setMessage(errorEvent.getMessage());
-
- errorItem.setProcessId(errorEvent.getOwningProcess());
- errorItem.setIdentifier(uuid());
- errorItem.setParentId(iterationProvItem.getIdentifier());
- // iterationProvItem.setErrorItem(errorItem);
- // FIXME don't need to add to the processor item earlier
- getReporter().addProvenanceItem(errorItem);
- super.receiveError(errorEvent);
- }
-
- /**
- * Create the {@link ProvenanceItem}s and send them all across to the
- * {@link ProvenanceConnector} except for the
- * {@link IterationProvenanceItem}, this one is told what it's inputs are
- * but has to wait until the results are received before being sent across.
- * Each item has a unique identifier and also knows who its parent item is
- */
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- try {
- // FIXME do we need this ProcessProvenanceItem?
- ProcessProvenanceItem provenanceItem;
- String[] split = jobEvent.getOwningProcess().split(":");
- provenanceItem = new ProcessProvenanceItem();
- String parentDataflowId = workflowItem.getParentId();
- provenanceItem.setWorkflowId(parentDataflowId);
- provenanceItem.setFacadeID(split[0]);
- provenanceItem.setDataflowID(split[1]);
- provenanceItem.setProcessId(jobEvent.getOwningProcess());
- provenanceItem.setIdentifier(uuid());
- provenanceItem.setParentId(workflowItem.getIdentifier());
- ProcessorProvenanceItem processorProvItem;
- processorProvItem = new ProcessorProvenanceItem();
- processorProvItem.setWorkflowId(parentDataflowId);
- processorProvItem.setProcessId(jobEvent
- .getOwningProcess());
- processorProvItem.setIdentifier(uuid());
- processorProvItem.setParentId(provenanceItem.getIdentifier());
- provenanceItem.setProcessId(jobEvent.getOwningProcess());
- getReporter().addProvenanceItem(provenanceItem);
- getReporter().addProvenanceItem(processorProvItem);
-
- IterationProvenanceItem iterationProvItem = null;
- iterationProvItem = new IterationProvenanceItem();
- iterationProvItem.setWorkflowId(parentDataflowId);
- iterationProvItem.setIteration(jobEvent.getIndex());
- iterationProvItem.setIdentifier(uuid());
-
- ReferenceService referenceService = jobEvent.getContext()
- .getReferenceService();
-
- InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
- inputDataItem.setDataMap(jobEvent.getData());
- inputDataItem.setReferenceService(referenceService);
- inputDataItem.setIdentifier(uuid());
- inputDataItem.setParentId(iterationProvItem.getIdentifier());
- inputDataItem.setProcessId(jobEvent.getOwningProcess());
-
- List<Object> inputIndexOwnerList = new ArrayList<>();
- inputIndexOwnerList.add(jobEvent.getIndex());
- inputIndexOwnerList.add(jobEvent.getOwningProcess());
- inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
-
- // inputDataProvenanceItemList.add(inputDataItem);
- iterationProvItem.setInputDataItem(inputDataItem);
- iterationProvItem.setIteration(jobEvent.getIndex());
- iterationProvItem.setProcessId(jobEvent.getOwningProcess());
-
- for (Activity<?> activity : jobEvent.getActivities())
- if (activity instanceof AsynchronousActivity) {
- ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
- activityProvItem.setWorkflowId(parentDataflowId);
- activityProvItem.setIdentifier(uuid());
- iterationProvItem.setParentId(activityProvItem.getIdentifier());
- // getConnector().addProvenanceItem(iterationProvItem);
- activityProvItem.setParentId(processorProvItem.getIdentifier());
- // processorProvItem.setActivityProvenanceItem(activityProvItem);
- activityProvItem.setProcessId(jobEvent.getOwningProcess());
- List<Object> activityIndexOwnerList = new ArrayList<>();
- activityIndexOwnerList.add(jobEvent.getOwningProcess());
- activityIndexOwnerList.add(jobEvent.getIndex());
- activityProvenanceItemMap.put(activityProvItem,
- inputIndexOwnerList);
- // activityProvenanceItemList.add(activityProvItem);
- // activityProvItem.setIterationProvenanceItem(iterationProvItem);
- getReporter().addProvenanceItem(activityProvItem);
- break;
- }
- getIndexesByProcess(jobEvent.getOwningProcess()).put(
- indexStr(jobEvent.getIndex()), iterationProvItem);
- iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
- getReporter().addProvenanceItem(iterationProvItem);
- } catch (RuntimeException ex) {
- logger.error("Could not store provenance for " + jobEvent, ex);
- }
-
- super.receiveJob(jobEvent);
- }
-
- @Override
- public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
- super.receiveJobQueue(jobQueueEvent);
- }
-
- /**
- * Populate an {@link OutputDataProvenanceItem} with the results and attach
- * it to the appropriate {@link IterationProvenanceItem}. Then send the
- * {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
- */
- @Override
- public void receiveResult(DispatchResultEvent resultEvent) {
- try {
- // FIXME use the connector from the result event context
- IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
- iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
-
- ReferenceService referenceService = resultEvent.getContext()
- .getReferenceService();
-
- OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
- outputDataItem.setDataMap(resultEvent.getData());
- outputDataItem.setReferenceService(referenceService);
- outputDataItem.setIdentifier(uuid());
- outputDataItem.setProcessId(resultEvent.getOwningProcess());
- outputDataItem.setParentId(iterationProvItem.getIdentifier());
- iterationProvItem.setOutputDataItem(outputDataItem);
-
- getReporter().addProvenanceItem(iterationProvItem);
- // getConnector().addProvenanceItem(outputDataItem);
-
- // PM -- testing
- // add xencoding of data value here??
- // Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
- // for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
- // // create a simpler bean that we can serialize?
- //
- // T2Reference ref = entry.getValue();
- //
- // SimplerT2Reference t2RefBean = new SimplerT2Reference();
- // t2RefBean.setReferenceType(ref.getReferenceType());
- // t2RefBean.setDepth(ref.getDepth());
- // t2RefBean.setLocalPart(ref.getLocalPart());
- // t2RefBean.setNamespacePart(ref.getNamespacePart());
- //
- // System.out.println("data ref: "+ref);
- // String serializedInput = SerializeParam(t2RefBean);
- // System.out.println("serialized reference:" + serializedInput);
- //
- // System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
-// }
- } catch (Exception ex) {
- logger.error("Could not store provenance for "
- + resultEvent.getOwningProcess() + " "
- + Arrays.toString(resultEvent.getIndex()), ex);
- // But don't break super.receiveResult() !!
- }
- super.receiveResult(resultEvent);
- }
-
- @Override
- public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
- super.receiveResultCompletion(completionEvent);
- }
-
- /**
- * Tell this layer what {@link ProvenanceConnector} implementation is being
- * used to capture the {@link ProvenanceItem}s. NOTE: should probably use
- * the connector from the result events context where possible
- *
- * @param connector
- */
- public void setReporter(ProvenanceReporter connector) {
- this.reporter = connector;
- }
-
- public ProvenanceReporter getReporter() {
- return reporter;
- }
-
- /**
- * So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
- * enacted this layer has to know about the {@link WorkflowProvenanceItem}
- *
- * @param workflowItem
- */
- public void setWorkflow(WorkflowProvenanceItem workflowItem) {
- this.workflowItem = workflowItem;
- }
-
- // TODO is this unused?
- public static String SerializeParam(Object ParamValue) {
- ByteArrayOutputStream BStream = new ByteArrayOutputStream();
- XMLEncoder encoder = new XMLEncoder(BStream);
- encoder.writeObject(ParamValue);
- encoder.close();
- return BStream.toString();
- }
-
- // TODO is this unused?
- public static Object DeserializeParam(String SerializedParam) {
- InputStream IStream = new ByteArrayInputStream(
- SerializedParam.getBytes());
- XMLDecoder decoder = new XMLDecoder(IStream);
- Object output = decoder.readObject();
- decoder.close();
- return output;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
deleted file mode 100644
index f8403df..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.ERROR;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT;
-import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT_COMPLETION;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.sql.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.ControlBoundary;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.activity.MonitorableAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Context free invoker layer, does not pass index arrays of jobs into activity
- * instances.
- * <p>
- * This layer will invoke the first invokable activity in the activity list, so
- * any sane dispatch stack will have narrowed this down to a single item list by
- * this point, i.e. by the insertion of a failover layer.
- * <p>
- * Currently only handles activities implementing {@link AsynchronousActivity}.
- *
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- *
- */
-@DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {})
-@ControlBoundary
-public class Invoke extends AbstractDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke";
- private static Logger logger = Logger.getLogger(Invoke.class);
- private static Long invocationCount = 0L;
-
- private MonitorManager monMan;
-
- private static String getNextProcessID() {
- long count;
- synchronized (invocationCount) {
- count = ++invocationCount;
- }
- return "invocation" + count;
- }
-
- public Invoke() {
- super();
- monMan = MonitorManager.getInstance();
- }
-
- @Override
- public void configure(JsonNode config) {
- // No configuration, do nothing
- }
-
- @Override
- public JsonNode getConfiguration() {
- return null;
- }
-
- /**
- * Receive a job from the layer above and pick the first concrete activity
- * from the list to invoke. Invoke this activity, creating a callback which
- * will wrap up the result messages in the appropriate collection depth
- * before sending them on (in general activities are not aware of their
- * invocation context and should not be responsible for providing correct
- * index arrays for results)
- * <p>
- * This layer will invoke the first invokable activity in the activity list,
- * so any sane dispatch stack will have narrowed this down to a single item
- * list by this point, i.e. by the insertion of a failover layer.
- */
- @Override
- public void receiveJob(final DispatchJobEvent jobEvent) {
- for (Activity<?> activity : jobEvent.getActivities())
- if (activity instanceof AsynchronousActivity) {
- invoke(jobEvent, (AsynchronousActivity<?>) activity);
- break;
- }
- }
-
- protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
- // Register with the monitor
- final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
- getNextProcessID()).getOwningProcess();
- monMan.registerNode(activity, invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
- monMan.registerNode(jobEvent, invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
-
- /*
- * The activity is an AsynchronousActivity so we invoke it with an
- * AsynchronousActivityCallback object containing appropriate callback
- * methods to push results, completions and failures back to the
- * invocation layer.
- *
- * Get the registered DataManager for this process. In most cases this
- * will just be a single DataManager for the entire workflow system but
- * it never hurts to generalize
- */
-
- InvocationContext context = jobEvent.getContext();
- final ReferenceService refService = context.getReferenceService();
-
- InvocationStartedProvenanceItem invocationItem = null;
- ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
- if (provenanceReporter != null) {
- IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
- if (intermediateProvenance != null) {
- invocationItem = new InvocationStartedProvenanceItem();
- IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
- invocationItem.setIdentifier(UUID.randomUUID().toString());
- invocationItem.setActivity(activity);
- invocationItem.setProcessId(jobEvent.getOwningProcess());
- invocationItem.setInvocationProcessId(invocationProcessIdentifier);
- invocationItem.setParentId(parentItem.getIdentifier());
- invocationItem.setWorkflowId(parentItem.getWorkflowId());
- invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
- provenanceReporter.addProvenanceItem(invocationItem);
- }
- }
-
- /*
- * Create a Map of EntityIdentifiers named appropriately given the
- * activity mapping
- */
- Map<String, T2Reference> inputData = new HashMap<>();
- for (String inputName : jobEvent.getData().keySet()) {
- String activityInputName = activity
- .getInputPortMapping().get(inputName);
- if (activityInputName != null)
- inputData.put(activityInputName, jobEvent.getData()
- .get(inputName));
- }
-
- /*
- * Create a callback object to receive events, completions and failure
- * notifications from the activity
- */
- AsynchronousActivityCallback callback = new InvokeCallBack(
- jobEvent, refService, invocationProcessIdentifier,
- activity);
-
- if (activity instanceof MonitorableAsynchronousActivity<?>) {
- /*
- * Monitorable activity so get the monitorable properties and push
- * them into the state tree after launching the job
- */
- MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
- Set<MonitorableProperty<?>> props = maa
- .executeAsynchWithMonitoring(inputData, callback);
- monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
- } else {
- /*
- * Run the job, passing in the callback we've just created along
- * with the (possibly renamed) input data map
- */
- activity.executeAsynch(inputData, callback);
- }
- }
-
- protected IntermediateProvenance findIntermediateProvenance() {
- for (DispatchLayer<?> layer : getProcessor().getDispatchStack()
- .getLayers())
- if (layer instanceof IntermediateProvenance)
- return (IntermediateProvenance) layer;
- return null;
- }
-
- protected class InvokeCallBack implements AsynchronousActivityCallback {
- protected final AsynchronousActivity<?> activity;
- protected final String invocationProcessIdentifier;
- protected final DispatchJobEvent jobEvent;
- protected final ReferenceService refService;
- protected boolean sentJob = false;
-
- protected InvokeCallBack(DispatchJobEvent jobEvent,
- ReferenceService refService,
- String invocationProcessIdentifier,
- AsynchronousActivity<?> asyncActivity) {
- this.jobEvent = jobEvent;
- this.refService = refService;
- this.invocationProcessIdentifier = invocationProcessIdentifier;
- this.activity = asyncActivity;
- }
-
- @Override
- public void fail(String message) {
- fail(message, null);
- }
-
- @Override
- public void fail(String message, Throwable t) {
- fail(message, t, DispatchErrorType.INVOCATION);
- }
-
- @Override
- public void fail(String message, Throwable t,
- DispatchErrorType errorType) {
- logger.warn("Failed (" + errorType + ") invoking " + activity
- + " for job " + jobEvent + ": " + message, t);
- monMan.deregisterNode(
- invocationProcessIdentifier);
- getAbove().receiveError(
- new DispatchErrorEvent(jobEvent.getOwningProcess(),
- jobEvent.getIndex(), jobEvent.getContext(),
- message, t, errorType, activity));
- }
-
- @Override
- public InvocationContext getContext() {
- return jobEvent.getContext();
- }
-
- @Override
- public String getParentProcessIdentifier() {
- return invocationProcessIdentifier;
- }
-
- @Override
- public void receiveCompletion(int[] completionIndex) {
- if (completionIndex.length == 0)
- // Final result, clean up monitor state
- monMan.deregisterNode(invocationProcessIdentifier);
- if (sentJob) {
- int[] newIndex;
- if (completionIndex.length == 0)
- newIndex = jobEvent.getIndex();
- else {
- newIndex = new int[jobEvent.getIndex().length
- + completionIndex.length];
- int i = 0;
- for (int indexValue : jobEvent.getIndex())
- newIndex[i++] = indexValue;
- for (int indexValue : completionIndex)
- newIndex[i++] = indexValue;
- }
- DispatchCompletionEvent c = new DispatchCompletionEvent(
- jobEvent.getOwningProcess(), newIndex, jobEvent
- .getContext());
- getAbove().receiveResultCompletion(c);
- } else {
- /*
- * We haven't sent any 'real' data prior to completing a stream.
- * This in effect means we're sending an empty top level
- * collection so we need to register empty collections for each
- * output port with appropriate depth (by definition if we're
- * streaming all outputs are collection types of some kind)
- */
- Map<String, T2Reference> emptyListMap = new HashMap<>();
- for (OutputPort op : activity.getOutputPorts()) {
- String portName = op.getName();
- int portDepth = op.getDepth();
- emptyListMap.put(portName, refService.getListService()
- .registerEmptyList(portDepth, jobEvent.getContext()).getId());
- }
- receiveResult(emptyListMap, new int[0]);
- }
- }
-
- @Override
- public void receiveResult(Map<String, T2Reference> data, int[] index) {
- /*
- * Construct a new result map using the activity mapping (activity
- * output name to processor output name)
- */
- Map<String, T2Reference> resultMap = new HashMap<>();
- for (String outputName : data.keySet()) {
- String processorOutputName = activity
- .getOutputPortMapping().get(outputName);
- if (processorOutputName != null)
- resultMap.put(processorOutputName, data.get(outputName));
- }
- /*
- * Construct a new index array if the specified index is non zero
- * length, otherwise just use the original job's index array (means
- * we're not streaming)
- */
- int[] newIndex;
- boolean streaming = false;
- if (index.length == 0)
- newIndex = jobEvent.getIndex();
- else {
- streaming = true;
- newIndex = new int[jobEvent.getIndex().length + index.length];
- int i = 0;
- for (int indexValue : jobEvent.getIndex())
- newIndex[i++] = indexValue;
- for (int indexValue : index)
- newIndex[i++] = indexValue;
- }
- DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent
- .getOwningProcess(), newIndex, jobEvent.getContext(),
- resultMap, streaming);
- if (!streaming) {
- monMan.registerNode(resultEvent, invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
- // Final result, clean up monitor state
- monMan.deregisterNode(invocationProcessIdentifier);
- }
- // Push the modified data to the layer above in the dispatch stack
- getAbove().receiveResult(resultEvent);
-
- sentJob = true;
- }
-
- @Override
- public void requestRun(Runnable runMe) {
- String newThreadName = jobEvent.toString();
- Thread thread = new Thread(runMe, newThreadName);
- thread.setContextClassLoader(activity.getClass()
- .getClassLoader());
- thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- fail("Uncaught exception while invoking " + activity, e);
- }
- });
- thread.start();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
deleted file mode 100644
index d5077a4..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2008 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.AbstractDispatchEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-
-/**
- * A layer that allows while-style loops.
- * <p>
- * The layer is configured with a {@link LoopConfiguration}, where an activity
- * has been set as the
- * {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity)
- * condition}.
- * <p>
- * After a job has been successful further down the dispatch stack, the loop
- * layer will invoke the conditional activity to determine if the job will be
- * invoked again. If {@link LoopConfiguration#isRunFirst()} is false, this test
- * will be performed even before the first invocation. (The default
- * runFirst=true is equivalent to a do..while construct, while runFirst=false is
- * equivalent to a while.. construct.)
- * <p>
- * A job will be resent down the dispatch stack only if the conditional activity
- * returns a reference to a string equal to "true" on its output port "loop".
- * <p>
- * If a job or the conditional activity fails, the while-loop is interrupted and
- * the error is sent further up.
- * <p>
- * Note that the LoopLayer will be invoked for each item in an iteration, if you
- * want to do the loop for the whole collection (ie. re-iterating if the
- * loop-condition fails after processing the full list) - create a nested
- * workflow with the desired depths on it's input ports and insert this
- * LoopLayer in the stack of the nested workflow's processor in parent workflow.
- * <p>
- * It is recommended that the LoopLayer is to be inserted after the
- * {@link ErrorBounce} layer, as this layer is needed for registering errors
- * produced by the LoopLayer. If the user requires {@link Retry retries} and
- * {@link Failover failovers} before checking the while condition, such layers
- * should be below LoopLayer.
- *
- * @author Stian Soiland-Reyes
- */
-// FIXME Doesn't work
-@SuppressWarnings({"unchecked","rawtypes"})
-public class Loop extends AbstractDispatchLayer<JsonNode> {
- public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop";
- private static Logger logger = Logger.getLogger(Loop.class);
-
- private JsonNode config = JsonNodeFactory.instance.objectNode();
-
- protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>();
- protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>();
-
- @Override
- public void configure(JsonNode config) {
- this.config = config;
- }
-
- @Override
- public void finishedWith(String owningProcess) {
- String prefix = owningProcess + "[";
- synchronized (outgoingJobs) {
- for (String key : new ArrayList<>(outgoingJobs.keySet()))
- if (key.startsWith(prefix))
- outgoingJobs.remove(key);
- }
- synchronized (incomingJobs) {
- for (String key : new ArrayList<>(incomingJobs.keySet()))
- if (key.startsWith(prefix))
- incomingJobs.remove(key);
- }
- }
-
- @Override
- public JsonNode getConfiguration() {
- return config;
- }
-
- @Override
- public void receiveJob(DispatchJobEvent jobEvent) {
- synchronized (incomingJobs) {
- incomingJobs.put(jobIdentifier(jobEvent), jobEvent);
- }
- if (config.get("runFirst").asBoolean()) {
- // We'll do the conditional in receiveResult instead
- super.receiveJob(jobEvent);
- return;
- }
- checkCondition(jobEvent);
- }
-
- @Override
- public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
- synchronized (incomingJobs) {
- incomingJobs.put(jobIdentifier(jobQueueEvent), jobQueueEvent);
- }
- if (config.get("runFirst").asBoolean()) {
- // We'll do the conditional in receiveResult instead
- super.receiveJobQueue(jobQueueEvent);
- return;
- }
- checkCondition(jobQueueEvent);
- }
-
- private Activity<?> getCondition() {
- //return config.getCondition();
- return null;
- }
-
- @Override
- public void receiveResult(DispatchResultEvent resultEvent) {
- Activity<?> condition = getCondition();
- if (condition == null) {
- super.receiveResult(resultEvent);
- return;
- }
- synchronized (outgoingJobs) {
- outgoingJobs.put(jobIdentifier(resultEvent), resultEvent);
- }
- checkCondition(resultEvent);
- }
-
- @Override
- public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
- Activity<?> condition = getCondition();
- if (condition == null) {
- super.receiveResultCompletion(completionEvent);
- return;
- }
- synchronized (outgoingJobs) {
- outgoingJobs.put(jobIdentifier(completionEvent), completionEvent);
- }
- checkCondition(completionEvent);
- }
-
- private void checkCondition(AbstractDispatchEvent event) {
- Activity<?> condition = getCondition();
- if (condition == null) {
- super.receiveError(new DispatchErrorEvent(event.getOwningProcess(),
- event.getIndex(), event.getContext(),
- "Can't invoke condition service: null", null,
- DispatchErrorType.INVOCATION, condition));
- return;
- }
- if (!(condition instanceof AbstractAsynchronousActivity)) {
- DispatchErrorEvent errorEvent = new DispatchErrorEvent(
- event.getOwningProcess(),
- event.getIndex(),
- event.getContext(),
- "Can't invoke condition service "
- + condition
- + " is not an instance of AbstractAsynchronousActivity",
- null, DispatchErrorType.INVOCATION, condition);
- super.receiveError(errorEvent);
- return;
- }
- AbstractAsynchronousActivity asyncCondition = (AbstractAsynchronousActivity) condition;
- String jobIdentifier = jobIdentifier(event);
- Map<String, T2Reference> inputs = prepareInputs(asyncCondition,
- jobIdentifier);
- AsynchronousActivityCallback callback = new ConditionCallBack(
- jobIdentifier);
- asyncCondition.executeAsynch(inputs, callback);
- }
-
- private Map<String, T2Reference> prepareInputs(
- AbstractAsynchronousActivity asyncCondition, String jobIdentifier) {
- Map<String, T2Reference> inputs = new HashMap<>();
- Map<String, T2Reference> inData = getInData(jobIdentifier);
- Map<String, T2Reference> outData = getOutData(jobIdentifier);
-
- Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts();
- for (ActivityInputPort conditionIn : inputPorts) {
- String conditionPort = conditionIn.getName();
- if (outData.containsKey(conditionPort))
- // Copy from previous output
- inputs.put(conditionPort, outData.get(conditionPort));
- else if (inData.containsKey(conditionPort))
- // Copy from original input
- inputs.put(conditionPort, inData.get(conditionPort));
- }
- return inputs;
- }
-
- private Map<String, T2Reference> getInData(String jobIdentifier) {
- AbstractDispatchEvent inEvent;
- synchronized (incomingJobs) {
- inEvent = incomingJobs.get(jobIdentifier);
- }
- Map<String, T2Reference> inData = new HashMap<>();
- if (inEvent instanceof DispatchJobEvent)
- inData = ((DispatchJobEvent) inEvent).getData();
- return inData;
- }
-
- private Map<String, T2Reference> getOutData(String jobIdentifier) {
- AbstractDispatchEvent outEvent;
- synchronized (outgoingJobs) {
- outEvent = outgoingJobs.get(jobIdentifier);
- }
- Map<String, T2Reference> outData = new HashMap<>();
- if (outEvent instanceof DispatchResultEvent)
- outData = ((DispatchResultEvent) outEvent).getData();
- return outData;
- }
-
- private String jobIdentifier(AbstractDispatchEvent event) {
- String jobId = event.getOwningProcess()
- + Arrays.toString(event.getIndex());
- return jobId;
- }
-
- public static final String LOOP_PORT = "loop";
-
- public class ConditionCallBack implements AsynchronousActivityCallback {
- private InvocationContext context;
- private final String jobIdentifier;
- private String processId;
-
- public ConditionCallBack(String jobIdentifier) {
- this.jobIdentifier = jobIdentifier;
- AbstractDispatchEvent originalEvent;
- synchronized (incomingJobs) {
- originalEvent = incomingJobs.get(jobIdentifier);
- }
- context = originalEvent.getContext();
- processId = originalEvent.getOwningProcess() + ":condition";
- }
-
- @Override
- public void fail(String message) {
- fail(message, null, DispatchErrorType.INVOCATION);
- }
-
- @Override
- public void fail(String message, Throwable t) {
- fail(message, t, DispatchErrorType.INVOCATION);
- }
-
- @Override
- public void fail(String message, Throwable t,
- DispatchErrorType errorType) {
- logger.warn("Failed (" + errorType + ") invoking condition service "
- + jobIdentifier + ":" + message, t);
-
- AbstractDispatchEvent originalEvent;
- synchronized (incomingJobs) {
- originalEvent = incomingJobs.get(jobIdentifier);
- }
- receiveError(new DispatchErrorEvent(originalEvent
- .getOwningProcess(), originalEvent.getIndex(),
- originalEvent.getContext(),
- "Can't invoke condition service ", t,
- DispatchErrorType.INVOCATION, null));
- }
-
- @Override
- public InvocationContext getContext() {
- return context;
- }
-
- @Override
- public String getParentProcessIdentifier() {
- return processId;
- }
-
- @Override
- public void receiveCompletion(int[] completionIndex) {
- // Ignore streaming
- }
-
- @Override
- public void receiveResult(Map<String, T2Reference> data, int[] index) {
- if (index.length > 0) {
- // Ignore streaming
- return;
- }
- T2Reference loopRef = data.get(LOOP_PORT);
- if (loopRef == null) {
- fail("Condition service didn't contain output port " + LOOP_PORT);
- return;
- }
- if (loopRef.containsErrors()) {
- fail("Condition service failed: " + loopRef);
- return;
- }
- if (loopRef.getDepth() != 0) {
- fail("Condition service output " + LOOP_PORT
- + " depth is not 0, but " + loopRef.getDepth());
- }
- ReferenceService referenceService = context.getReferenceService();
- String loop = (String) referenceService.renderIdentifier(loopRef,
- String.class, context);
-
- if (Boolean.parseBoolean(loop)) {
- // Push it down again
- AbstractDispatchEvent dispatchEvent;
- synchronized (incomingJobs) {
- dispatchEvent = incomingJobs.get(jobIdentifier);
- }
- if (dispatchEvent == null) {
- fail("Unknown job identifier " + jobIdentifier);
- }
- if (dispatchEvent instanceof DispatchJobEvent) {
- DispatchJobEvent newJobEvent = prepareNewJobEvent(data,
- dispatchEvent);
- getBelow().receiveJob(newJobEvent);
- } else if (dispatchEvent instanceof DispatchJobQueueEvent) {
- getBelow().receiveJobQueue(
- (DispatchJobQueueEvent) dispatchEvent);
- } else {
- fail("Unknown type of incoming event " + dispatchEvent);
- }
- return;
-
- } else {
- // We'll push it up, end of loop for now
-
- AbstractDispatchEvent outgoingEvent;
- synchronized (outgoingJobs) {
- outgoingEvent = outgoingJobs.get(jobIdentifier);
- }
- if (outgoingEvent == null && !config.get("runFirst").asBoolean()) {
- fail("Initial loop condition failed");
- }
- if (outgoingEvent instanceof DispatchCompletionEvent) {
- getAbove().receiveResultCompletion(
- (DispatchCompletionEvent) outgoingEvent);
- } else if (outgoingEvent instanceof DispatchResultEvent) {
- getAbove().receiveResult(
- (DispatchResultEvent) outgoingEvent);
- } else {
- fail("Unknown type of outgoing event " + outgoingEvent);
- }
- }
-
- }
-
- private DispatchJobEvent prepareNewJobEvent(
- Map<String, T2Reference> data,
- AbstractDispatchEvent dispatchEvent) {
- DispatchJobEvent dispatchJobEvent = (DispatchJobEvent) dispatchEvent;
- Map<String, T2Reference> newInputs = new HashMap<String, T2Reference>(
- dispatchJobEvent.getData());
- newInputs.putAll(data);
- DispatchJobEvent newJobEvent = new DispatchJobEvent(dispatchEvent
- .getOwningProcess(), dispatchEvent.getIndex(),
- dispatchEvent.getContext(), newInputs,
- ((DispatchJobEvent) dispatchEvent).getActivities());
- /*
- * TODO: Should this be registered as an incomingJobs? If so the
- * conditional could even feed to itself, and we should also keep a
- * list of originalJobs.
- */
- return newJobEvent;
- }
-
- @Override
- public void requestRun(Runnable runMe) {
- String newThreadName = "Condition service "
- + getParentProcessIdentifier();
- Thread thread = new Thread(runMe, newThreadName);
- thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- fail("Uncaught exception while invoking " + jobIdentifier,
- e);
- }
- });
- thread.start();
- }
- }
-
- @Override
- public Processor getProcessor() {
- if (dispatchStack == null)
- return null;
- return dispatchStack.getProcessor();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/4252fa90/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java b/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
deleted file mode 100644
index 7cfa2a5..0000000
--- a/taverna-workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-
-import java.util.Properties;
-
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
-import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-
-/**
- * Configuration bean for the {@link Loop}.
- * <p>
- * Set the {@link #setCondition(Activity)} for an activity with an output port
- * called "loop". The LoopLayer will re-send a job only if this port exist and
- * it's output can be dereferenced to a string equal to "true".
- * </p>
- * <p>
- * If {@link #isRunFirst()} is false, the loop layer will check the condition
- * before invoking the job for the first time, otherwise the condition will be
- * invoked after the job has come back with successful results.
- * </p>
- *
- * @author Stian Soiland-Reyes
- *
- */
-@ConfigurationBean(uri = Loop.URI + "#Config")
-public class LoopConfiguration implements Cloneable {
- private Activity<?> condition = null;
- private Boolean runFirst;
- private Properties properties;
-
- public Properties getProperties() {
- synchronized (this) {
- if (properties == null)
- properties = new Properties();
- }
- return properties;
- }
-
- public void setProperties(Properties properties) {
- this.properties = properties;
- }
-
- @Override
- public LoopConfiguration clone() {
- LoopConfiguration clone;
- try {
- clone = (LoopConfiguration) super.clone();
- clone.condition = null;
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException("Unexpected CloneNotSupportedException",
- e);
- }
- return clone;
- }
-
- public Activity<?> getCondition() {
- return condition;
- }
-
- public boolean isRunFirst() {
- if (runFirst == null)
- return true;
- return runFirst;
- }
-
- @ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required = false)
- public void setCondition(Activity<?> activity) {
- this.condition = activity;
- }
-
- @ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required = false)
- public void setRunFirst(boolean runFirst) {
- this.runFirst = runFirst;
- }
-}