You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/23 17:38:29 UTC
[45/51] [partial] incubator-taverna-engine git commit:
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/org/apache/taverna/platform/execution/impl/local/WorkflowToDataflowMapper.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/org/apache/taverna/platform/execution/impl/local/WorkflowToDataflowMapper.java b/taverna-execution-local/src/main/java/org/apache/taverna/platform/execution/impl/local/WorkflowToDataflowMapper.java
new file mode 100644
index 0000000..796ae9f
--- /dev/null
+++ b/taverna-execution-local/src/main/java/org/apache/taverna/platform/execution/impl/local/WorkflowToDataflowMapper.java
@@ -0,0 +1,526 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.taverna.platform.execution.impl.local;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.taverna.reference.ExternalReferenceSPI;
+import org.apache.taverna.workflowmodel.Dataflow;
+import org.apache.taverna.workflowmodel.DataflowInputPort;
+import org.apache.taverna.workflowmodel.DataflowOutputPort;
+import org.apache.taverna.workflowmodel.Datalink;
+import org.apache.taverna.workflowmodel.EditException;
+import org.apache.taverna.workflowmodel.Edits;
+import org.apache.taverna.workflowmodel.EventForwardingOutputPort;
+import org.apache.taverna.workflowmodel.EventHandlingInputPort;
+import org.apache.taverna.workflowmodel.Merge;
+import org.apache.taverna.workflowmodel.MergeInputPort;
+import org.apache.taverna.workflowmodel.ProcessorInputPort;
+import org.apache.taverna.workflowmodel.ProcessorOutputPort;
+import org.apache.taverna.workflowmodel.processor.activity.ActivityInputPort;
+import org.apache.taverna.workflowmodel.processor.activity.ActivityOutputPort;
+import org.apache.taverna.workflowmodel.processor.activity.NestedDataflow;
+import org.apache.taverna.workflowmodel.processor.dispatch.DispatchLayer;
+import org.apache.taverna.workflowmodel.processor.dispatch.DispatchStack;
+import org.apache.taverna.workflowmodel.processor.iteration.IterationStrategy;
+import org.apache.taverna.workflowmodel.processor.iteration.NamedInputPortNode;
+import org.apache.taverna.platform.capability.api.ActivityConfigurationException;
+import org.apache.taverna.platform.capability.api.ActivityNotFoundException;
+import org.apache.taverna.platform.capability.api.ActivityService;
+import org.apache.taverna.platform.capability.api.DispatchLayerConfigurationException;
+import org.apache.taverna.platform.capability.api.DispatchLayerNotFoundException;
+import org.apache.taverna.platform.capability.api.DispatchLayerService;
+import org.apache.taverna.platform.execution.api.InvalidWorkflowException;
+import org.apache.taverna.scufl2.api.activity.Activity;
+import org.apache.taverna.scufl2.api.common.Scufl2Tools;
+import org.apache.taverna.scufl2.api.configurations.Configuration;
+import org.apache.taverna.scufl2.api.container.WorkflowBundle;
+import org.apache.taverna.scufl2.api.core.BlockingControlLink;
+import org.apache.taverna.scufl2.api.core.ControlLink;
+import org.apache.taverna.scufl2.api.core.DataLink;
+import org.apache.taverna.scufl2.api.core.Processor;
+import org.apache.taverna.scufl2.api.core.Workflow;
+import org.apache.taverna.scufl2.api.iterationstrategy.CrossProduct;
+import org.apache.taverna.scufl2.api.iterationstrategy.DotProduct;
+import org.apache.taverna.scufl2.api.iterationstrategy.IterationStrategyNode;
+import org.apache.taverna.scufl2.api.iterationstrategy.IterationStrategyStack;
+import org.apache.taverna.scufl2.api.iterationstrategy.IterationStrategyTopNode;
+import org.apache.taverna.scufl2.api.iterationstrategy.PortNode;
+import org.apache.taverna.scufl2.api.port.InputActivityPort;
+import org.apache.taverna.scufl2.api.port.InputProcessorPort;
+import org.apache.taverna.scufl2.api.port.InputWorkflowPort;
+import org.apache.taverna.scufl2.api.port.OutputActivityPort;
+import org.apache.taverna.scufl2.api.port.OutputProcessorPort;
+import org.apache.taverna.scufl2.api.port.OutputWorkflowPort;
+import org.apache.taverna.scufl2.api.port.Port;
+import org.apache.taverna.scufl2.api.port.ReceiverPort;
+import org.apache.taverna.scufl2.api.port.SenderPort;
+import org.apache.taverna.scufl2.api.profiles.ProcessorBinding;
+import org.apache.taverna.scufl2.api.profiles.ProcessorInputPortBinding;
+import org.apache.taverna.scufl2.api.profiles.ProcessorOutputPortBinding;
+import org.apache.taverna.scufl2.api.profiles.Profile;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Translates a scufl2 {@link Workflow} into a {@link Dataflow}.
+ *
+ * @author David Withers
+ */
+public class WorkflowToDataflowMapper {
+ private static final URI NESTED_WORKFLOW_URI = URI
+ .create("http://ns.taverna.org.uk/2010/activity/nested-workflow");
+
+ private Edits edits;
+ private final Scufl2Tools scufl2Tools = new Scufl2Tools();
+ private final Map<Port, EventHandlingInputPort> inputPorts;
+ private final Map<Port, EventForwardingOutputPort> outputPorts;
+ private final Map<Port, Merge> merges;
+ private final Map<Workflow, Dataflow> workflowToDataflow;
+ private final Map<Dataflow, Workflow> dataflowToWorkflow;
+ private final Map<Processor, org.apache.taverna.workflowmodel.Processor> workflowToDataflowProcessors;
+ private final Map<org.apache.taverna.workflowmodel.Processor, Processor> dataflowToWorkflowProcessors;
+ private final Map<Activity, org.apache.taverna.workflowmodel.processor.activity.Activity<?>> workflowToDataflowActivities;
+ private final Map<org.apache.taverna.workflowmodel.processor.activity.Activity<?>, Activity> dataflowToWorkflowActivities;
+ @SuppressWarnings("unused")
+ private final WorkflowBundle workflowBundle;
+ private final Profile profile;
+ private final ActivityService activityService;
+ private final DispatchLayerService dispatchLayerService;
+
+ public WorkflowToDataflowMapper(WorkflowBundle workflowBundle,
+ Profile profile, Edits edits, ActivityService activityService,
+ DispatchLayerService dispatchLayerService) {
+ this.workflowBundle = workflowBundle;
+ this.profile = profile;
+ this.edits = edits;
+ this.activityService = activityService;
+ this.dispatchLayerService = dispatchLayerService;
+ inputPorts = new IdentityHashMap<>();
+ outputPorts = new IdentityHashMap<>();
+ merges = new IdentityHashMap<>();
+ workflowToDataflow = new IdentityHashMap<>();
+ dataflowToWorkflow = new HashMap<>();
+ workflowToDataflowProcessors = new IdentityHashMap<>();
+ dataflowToWorkflowProcessors = new HashMap<>();
+ workflowToDataflowActivities = new IdentityHashMap<>();
+ dataflowToWorkflowActivities = new HashMap<>();
+ }
+
+ public Workflow getWorkflow(Dataflow dataflow) {
+ return dataflowToWorkflow.get(dataflow);
+ }
+
+ public Dataflow getDataflow(Workflow workflow)
+ throws InvalidWorkflowException {
+ if (!workflowToDataflow.containsKey(workflow)) {
+ try {
+ Dataflow dataflow = createDataflow(workflow);
+ workflowToDataflow.put(workflow, dataflow);
+ dataflowToWorkflow.put(dataflow, workflow);
+ } catch (EditException | ActivityConfigurationException
+ | DispatchLayerConfigurationException
+ | ActivityNotFoundException
+ | DispatchLayerNotFoundException e) {
+ throw new InvalidWorkflowException(e);
+ }
+ }
+ return workflowToDataflow.get(workflow);
+ }
+
+ public Processor getWorkflowProcessor(
+ org.apache.taverna.workflowmodel.Processor dataflowProcessor) {
+ return dataflowToWorkflowProcessors.get(dataflowProcessor);
+ }
+
+ public org.apache.taverna.workflowmodel.Processor getDataflowProcessor(
+ Processor workflowProcessor) {
+ return workflowToDataflowProcessors.get(workflowProcessor);
+ }
+
+ public Activity getWorkflowActivity(
+ org.apache.taverna.workflowmodel.processor.activity.Activity<?> dataflowActiviy) {
+ return dataflowToWorkflowActivities.get(dataflowActiviy);
+ }
+
+ public org.apache.taverna.workflowmodel.processor.activity.Activity<?> getDataflowActivity(
+ Activity workflowActivity) {
+ return workflowToDataflowActivities.get(workflowActivity);
+ }
+
+ protected Dataflow createDataflow(Workflow workflow) throws EditException,
+ ActivityNotFoundException, ActivityConfigurationException,
+ InvalidWorkflowException, DispatchLayerNotFoundException,
+ DispatchLayerConfigurationException {
+ // create the dataflow
+ Dataflow dataflow = edits.createDataflow();
+ // set the dataflow name
+ edits.getUpdateDataflowNameEdit(dataflow, workflow.getName()).doEdit();
+
+ addInputPorts(workflow, dataflow);
+ addOutputPorts(workflow, dataflow);
+ addProcessors(workflow, dataflow);
+ addDataLinks(workflow, dataflow);
+ addControlLinks(workflow);
+
+ return dataflow;
+ }
+
+ private void addProcessors(Workflow workflow, Dataflow dataflow)
+ throws EditException, ActivityNotFoundException,
+ ActivityConfigurationException, InvalidWorkflowException,
+ DispatchLayerNotFoundException, DispatchLayerConfigurationException {
+ for (Processor processor : workflow.getProcessors()) {
+ org.apache.taverna.workflowmodel.Processor dataflowProcessor = edits
+ .createProcessor(processor.getName());
+ edits.getAddProcessorEdit(dataflow, dataflowProcessor).doEdit();
+ // map the processor
+ workflowToDataflowProcessors.put(processor, dataflowProcessor);
+ dataflowToWorkflowProcessors.put(dataflowProcessor, processor);
+ // add input ports
+ for (InputProcessorPort inputProcessorPort : processor
+ .getInputPorts()) {
+ if (inputProcessorPort.getDatalinksTo().isEmpty())
+ continue;
+ ProcessorInputPort processorInputPort = edits
+ .createProcessorInputPort(dataflowProcessor,
+ inputProcessorPort.getName(),
+ inputProcessorPort.getDepth());
+ edits.getAddProcessorInputPortEdit(dataflowProcessor,
+ processorInputPort).doEdit();
+ inputPorts.put(inputProcessorPort, processorInputPort);
+ }
+ // add output ports
+ for (OutputProcessorPort outputProcessorPort : processor
+ .getOutputPorts()) {
+ ProcessorOutputPort processorOutputPort = edits
+ .createProcessorOutputPort(dataflowProcessor,
+ outputProcessorPort.getName(),
+ outputProcessorPort.getDepth(),
+ outputProcessorPort.getGranularDepth());
+ edits.getAddProcessorOutputPortEdit(dataflowProcessor,
+ processorOutputPort).doEdit();
+ outputPorts.put(outputProcessorPort, processorOutputPort);
+ }
+
+ // add dispatch stack
+ addDispatchStack(processor, dataflowProcessor);
+
+ addIterationStrategy(processor, dataflowProcessor);
+
+ // add bound activities
+ for (ProcessorBinding processorBinding : scufl2Tools
+ .processorBindingsForProcessor(processor, profile))
+ addActivity(processorBinding);
+ }
+ }
+
+ private void addDispatchStack(Processor processor,
+ org.apache.taverna.workflowmodel.Processor dataflowProcessor)
+ throws DispatchLayerNotFoundException,
+ DispatchLayerConfigurationException, EditException {
+ DispatchStack dispatchStack = dataflowProcessor.getDispatchStack();
+
+ JsonNode json = null;
+ try {
+ json = processor.getConfiguration(profile).getJson();
+ } catch (IndexOutOfBoundsException e) {
+ // no configuration for processor
+ }
+
+ int layer = 0;
+ addDispatchLayer(
+ dispatchStack,
+ URI.create("http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Parallelize"),
+ layer++, json == null ? null : json.get("parallelize"));
+ addDispatchLayer(
+ dispatchStack,
+ URI.create("http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce"),
+ layer++, null);
+ addDispatchLayer(
+ dispatchStack,
+ URI.create("http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover"),
+ layer++, null);
+ addDispatchLayer(
+ dispatchStack,
+ URI.create("http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Retry"),
+ layer++, json == null ? null : json.get("retry"));
+ addDispatchLayer(
+ dispatchStack,
+ URI.create("http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop"),
+ layer++, null);
+ addDispatchLayer(
+ dispatchStack,
+ URI.create("http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke"),
+ layer++, null);
+
+ }
+
+ private void addDispatchLayer(DispatchStack dispatchStack,
+ URI dispatchLayerType, int layer, JsonNode json)
+ throws DispatchLayerConfigurationException,
+ DispatchLayerNotFoundException, EditException {
+ // create the dispatch layer
+ DispatchLayer<?> dispatchLayer = dispatchLayerService
+ .createDispatchLayer(dispatchLayerType, json);
+ // add the dispatch layer to the dispatch layer stack
+ edits.getAddDispatchLayerEdit(dispatchStack, dispatchLayer, layer)
+ .doEdit();
+ }
+
+ private void addIterationStrategy(Processor processor,
+ org.apache.taverna.workflowmodel.Processor dataflowProcessor)
+ throws EditException, InvalidWorkflowException {
+ // get the iteration strategy from the processor
+ org.apache.taverna.workflowmodel.processor.iteration.IterationStrategyStack dataflowIterationStrategyStack = dataflowProcessor
+ .getIterationStrategy();
+ // clear the iteration strategy
+ edits.getClearIterationStrategyStackEdit(dataflowIterationStrategyStack)
+ .doEdit();
+ IterationStrategyStack iterationStrategyStack = processor
+ .getIterationStrategyStack();
+ for (IterationStrategyTopNode iterationStrategyTopNode : iterationStrategyStack) {
+ // create iteration strategy
+ IterationStrategy dataflowIterationStrategy = edits
+ .createIterationStrategy();
+ // add iteration strategy to the stack
+ edits.getAddIterationStrategyEdit(dataflowIterationStrategyStack,
+ dataflowIterationStrategy).doEdit();
+ // add the node to the iteration strategy
+ addIterationStrategyNode(dataflowIterationStrategy,
+ dataflowIterationStrategy.getTerminalNode(),
+ iterationStrategyTopNode);
+ }
+ }
+
+ private void addIterationStrategyNode(
+ IterationStrategy dataflowIterationStrategy,
+ org.apache.taverna.workflowmodel.processor.iteration.IterationStrategyNode dataflowIterationStrategyNode,
+ IterationStrategyNode iterationStrategyNode) throws EditException,
+ InvalidWorkflowException {
+ org.apache.taverna.workflowmodel.processor.iteration.IterationStrategyNode childDataflowIterationStrategyNode = null;
+ if (iterationStrategyNode instanceof CrossProduct) {
+ CrossProduct crossProduct = (CrossProduct) iterationStrategyNode;
+ childDataflowIterationStrategyNode = new org.apache.taverna.workflowmodel.processor.iteration.CrossProduct();
+ for (IterationStrategyNode iterationStrategyNode2 : crossProduct)
+ addIterationStrategyNode(dataflowIterationStrategy,
+ childDataflowIterationStrategyNode,
+ iterationStrategyNode2);
+ } else if (iterationStrategyNode instanceof DotProduct) {
+ DotProduct dotProduct = (DotProduct) iterationStrategyNode;
+ childDataflowIterationStrategyNode = new org.apache.taverna.workflowmodel.processor.iteration.DotProduct();
+ for (IterationStrategyNode iterationStrategyNode2 : dotProduct)
+ addIterationStrategyNode(dataflowIterationStrategy,
+ childDataflowIterationStrategyNode,
+ iterationStrategyNode2);
+ } else if (iterationStrategyNode instanceof PortNode) {
+ PortNode portNode = (PortNode) iterationStrategyNode;
+ Integer desiredDepth = portNode.getDesiredDepth();
+ if (desiredDepth == null)
+ desiredDepth = portNode.getInputProcessorPort().getDepth();
+ NamedInputPortNode namedInputPortNode = new NamedInputPortNode(
+ portNode.getInputProcessorPort().getName(), desiredDepth);
+ edits.getAddIterationStrategyInputNodeEdit(
+ dataflowIterationStrategy, namedInputPortNode).doEdit();
+ childDataflowIterationStrategyNode = namedInputPortNode;
+ } else {
+ throw new InvalidWorkflowException(
+ "Unknown IterationStrategyNode type : "
+ + iterationStrategyNode.getClass().getName());
+ }
+ childDataflowIterationStrategyNode
+ .setParent(dataflowIterationStrategyNode);
+ }
+
+ private void addActivity(ProcessorBinding processorBinding)
+ throws EditException, ActivityNotFoundException,
+ ActivityConfigurationException, InvalidWorkflowException {
+ org.apache.taverna.workflowmodel.Processor processor = workflowToDataflowProcessors
+ .get(processorBinding.getBoundProcessor());
+ Activity scufl2Activity = processorBinding.getBoundActivity();
+ URI activityType = scufl2Activity.getType();
+ if (!activityService.activityExists(activityType))
+ throw new ActivityNotFoundException("No activity exists for "
+ + activityType);
+ Configuration configuration = scufl2Activity.getConfiguration();
+
+ // create the activity
+ org.apache.taverna.workflowmodel.processor.activity.Activity<?> activity = activityService
+ .createActivity(activityType, configuration.getJson());
+ // check if we have a nested workflow
+ if (activityType.equals(NESTED_WORKFLOW_URI)) {
+ if (activity instanceof NestedDataflow) {
+ Workflow nestedWorkflow = scufl2Tools
+ .nestedWorkflowForProcessor(
+ processorBinding.getBoundProcessor(), profile);
+ ((NestedDataflow) activity)
+ .setNestedDataflow(getDataflow(nestedWorkflow));
+ } else
+ throw new ActivityConfigurationException(
+ "Activity is not an instance of NestedDataflow");
+ }
+
+ // add the activity to the processor
+ edits.getAddActivityEdit(processor, activity).doEdit();
+
+ // add input ports
+ for (InputActivityPort inputActivityPort : scufl2Activity
+ .getInputPorts()) {
+ ActivityInputPort activityInputPort = edits
+ .createActivityInputPort(
+ inputActivityPort.getName(),
+ inputActivityPort.getDepth(),
+ false,
+ new ArrayList<Class<? extends ExternalReferenceSPI>>(),
+ String.class);
+ edits.getAddActivityInputPortEdit(activity, activityInputPort)
+ .doEdit();
+ }
+ // add output ports
+ for (OutputActivityPort outputActivityPort : scufl2Activity
+ .getOutputPorts()) {
+ ActivityOutputPort activitytOutputPort = edits
+ .createActivityOutputPort(outputActivityPort.getName(),
+ outputActivityPort.getDepth(),
+ outputActivityPort.getGranularDepth());
+ edits.getAddActivityOutputPortEdit(activity, activitytOutputPort)
+ .doEdit();
+ }
+ // map input ports
+ for (ProcessorInputPortBinding portBinding : processorBinding
+ .getInputPortBindings()) {
+ InputProcessorPort processorPort = portBinding
+ .getBoundProcessorPort();
+ InputActivityPort activityPort = portBinding.getBoundActivityPort();
+ edits.getAddActivityInputPortMappingEdit(activity,
+ processorPort.getName(), activityPort.getName()).doEdit();
+ }
+ // map output ports
+ for (ProcessorOutputPortBinding portBinding : processorBinding
+ .getOutputPortBindings()) {
+ OutputProcessorPort processorPort = portBinding
+ .getBoundProcessorPort();
+ OutputActivityPort activityPort = portBinding
+ .getBoundActivityPort();
+ edits.getAddActivityOutputPortMappingEdit(activity,
+ processorPort.getName(), activityPort.getName()).doEdit();
+ }
+ workflowToDataflowActivities.put(scufl2Activity, activity);
+ dataflowToWorkflowActivities.put(activity, scufl2Activity);
+ }
+
+ private void addDataLinks(Workflow workflow, Dataflow dataflow)
+ throws EditException {
+ for (DataLink dataLink : workflow.getDataLinks()) {
+ ReceiverPort receiverPort = dataLink.getSendsTo();
+ SenderPort senderPort = dataLink.getReceivesFrom();
+ EventForwardingOutputPort source = outputPorts.get(senderPort);
+ EventHandlingInputPort sink = inputPorts.get(receiverPort);
+ Integer mergePosition = dataLink.getMergePosition();
+ if (mergePosition != null) {
+ if (!merges.containsKey(receiverPort)) {
+ Merge merge = edits.createMerge(dataflow);
+ edits.getAddMergeEdit(dataflow, merge).doEdit();
+ merges.put(receiverPort, merge);
+ }
+ Merge merge = merges.get(receiverPort);
+ // create merge input port
+ MergeInputPort mergeInputPort = edits.createMergeInputPort(
+ merge, "input" + mergePosition, sink.getDepth());
+ // add it to the correct position in the merge
+ @SuppressWarnings("unchecked")
+ List<MergeInputPort> mergeInputPorts = (List<MergeInputPort>) merge
+ .getInputPorts();
+ if (mergePosition > mergeInputPorts.size())
+ mergeInputPorts.add(mergeInputPort);
+ else
+ mergeInputPorts.add(mergePosition, mergeInputPort);
+ // connect a datalink into the merge
+ Datalink datalinkIn = edits.createDatalink(source,
+ mergeInputPort);
+ edits.getConnectDatalinkEdit(datalinkIn).doEdit();
+ // check if the merge output has been connected
+ EventForwardingOutputPort mergeOutputPort = merge
+ .getOutputPort();
+ if (mergeOutputPort.getOutgoingLinks().isEmpty()) {
+ Datalink datalinkOut = edits.createDatalink(
+ merge.getOutputPort(), sink);
+ edits.getConnectDatalinkEdit(datalinkOut).doEdit();
+ } else if (mergeOutputPort.getOutgoingLinks().size() == 1) {
+ if (mergeOutputPort.getOutgoingLinks().iterator().next()
+ .getSink() != sink)
+ throw new EditException(
+ "Cannot add a different sinkPort to a Merge that already has one defined");
+ } else
+ throw new EditException(
+ "The merge instance cannot have more that 1 outgoing Datalink");
+ } else {
+ Datalink datalink = edits.createDatalink(source, sink);
+ edits.getConnectDatalinkEdit(datalink).doEdit();
+ }
+ }
+ }
+
+ private void addControlLinks(Workflow workflow) throws EditException {
+ for (ControlLink controlLink : workflow.getControlLinks()) {
+ if (controlLink instanceof BlockingControlLink) {
+ BlockingControlLink blockingControlLink = (BlockingControlLink) controlLink;
+ Processor untilFinished = blockingControlLink
+ .getUntilFinished();
+ Processor block = blockingControlLink.getBlock();
+ edits.getCreateConditionEdit(
+ workflowToDataflowProcessors.get(untilFinished),
+ workflowToDataflowProcessors.get(block)).doEdit();
+ }
+ }
+ }
+
+ private void addOutputPorts(Workflow workflow, Dataflow dataflow)
+ throws EditException {
+ for (OutputWorkflowPort outputWorkflowPort : workflow.getOutputPorts()) {
+ DataflowOutputPort dataflowOutputPort = edits
+ .createDataflowOutputPort(outputWorkflowPort.getName(),
+ dataflow);
+ edits.getAddDataflowOutputPortEdit(dataflow, dataflowOutputPort)
+ .doEdit();
+ inputPorts.put(outputWorkflowPort,
+ dataflowOutputPort.getInternalInputPort());
+ }
+ }
+
+ private void addInputPorts(Workflow workflow, Dataflow dataflow)
+ throws EditException {
+ for (InputWorkflowPort inputWorkflowPort : workflow.getInputPorts()) {
+ DataflowInputPort dataflowInputPort = edits
+ .createDataflowInputPort(inputWorkflowPort.getName(),
+ inputWorkflowPort.getDepth(),
+ inputWorkflowPort.getDepth(), dataflow);
+ edits.getAddDataflowInputPortEdit(dataflow, dataflowInputPort)
+ .doEdit();
+ outputPorts.put(inputWorkflowPort,
+ dataflowInputPort.getInternalOutputPort());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java
deleted file mode 100644
index df7e47f..0000000
--- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecution.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2010 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.local;
-
-import static java.util.logging.Level.SEVERE;
-import static uk.org.taverna.platform.execution.impl.local.T2ReferenceConverter.convertPathToObject;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.logging.Logger;
-
-import net.sf.taverna.t2.facade.ResultListener;
-import net.sf.taverna.t2.facade.WorkflowInstanceFacade;
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.invocation.TokenOrderException;
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
-import net.sf.taverna.t2.workflowmodel.Edits;
-import net.sf.taverna.t2.workflowmodel.InvalidDataflowException;
-
-import org.apache.taverna.robundle.Bundle;
-
-import org.apache.taverna.databundle.DataBundles;
-import org.apache.taverna.platform.capability.api.ActivityService;
-import org.apache.taverna.platform.capability.api.DispatchLayerService;
-import uk.org.taverna.platform.execution.api.AbstractExecution;
-import uk.org.taverna.platform.execution.api.InvalidWorkflowException;
-import uk.org.taverna.platform.report.ActivityReport;
-import uk.org.taverna.platform.report.ProcessorReport;
-import uk.org.taverna.platform.report.WorkflowReport;
-import org.apache.taverna.scufl2.api.container.WorkflowBundle;
-import org.apache.taverna.scufl2.api.core.Workflow;
-import org.apache.taverna.scufl2.api.profiles.Profile;
-
-/**
- * An {@link uk.org.taverna.platform.execution.api.Execution Execution} for
- * executing Taverna workflows on a local Taverna Dataflow Engine.
- *
- * @author David Withers
- */
-public class LocalExecution extends AbstractExecution implements ResultListener {
-
- private static Logger logger = Logger.getLogger(LocalExecution.class
- .getName());
-
- private final WorkflowToDataflowMapper mapping;
-
- private final WorkflowInstanceFacade facade;
-
- private final LocalExecutionMonitor executionMonitor;
-
- private final ReferenceService referenceService;
-
- private final Map<String, DataflowInputPort> inputPorts = new HashMap<String, DataflowInputPort>();
-
- /**
- * Constructs an Execution for executing Taverna workflows on a local
- * Taverna Dataflow Engine.
- *
- * @param workflowBundle
- * the <code>WorkflowBundle</code> containing the
- * <code>Workflow</code>s required for execution
- * @param workflow
- * the <code>Workflow</code> to execute
- * @param profile
- * the <code>Profile</code> to use when executing the
- * <code>Workflow</code>
- * @param dataBundle
- * the <code>Bundle</code> containing the data values for the
- * <code>Workflow</code>
- * @param referenceService
- * the <code>ReferenceService</code> used to register inputs,
- * outputs and intermediate values
- * @throws InvalidWorkflowException
- * if the specified workflow is invalid
- */
- public LocalExecution(WorkflowBundle workflowBundle, Workflow workflow,
- Profile profile, Bundle dataBundle,
- ReferenceService referenceService, Edits edits,
- ActivityService activityService,
- DispatchLayerService dispatchLayerService)
- throws InvalidWorkflowException {
- super(workflowBundle, workflow, profile, dataBundle);
- this.referenceService = referenceService;
- try {
- mapping = new WorkflowToDataflowMapper(workflowBundle, profile,
- edits, activityService, dispatchLayerService);
- Dataflow dataflow = mapping.getDataflow(workflow);
- for (DataflowInputPort dataflowInputPort : dataflow.getInputPorts())
- inputPorts.put(dataflowInputPort.getName(), dataflowInputPort);
- facade = edits.createWorkflowInstanceFacade(dataflow,
- createContext(), "");
- executionMonitor = new LocalExecutionMonitor(getWorkflowReport(),
- getDataBundle(), mapping, facade.getIdentifier());
- } catch (InvalidDataflowException e) {
- throw new InvalidWorkflowException(e);
- }
- }
-
- @Override
- public void delete() {
- cancel();
- }
-
- @Override
- public void start() {
- MonitorManager.getInstance().addObserver(executionMonitor);
- /*
- * have to add a result listener otherwise facade doesn't record when
- * workflow is finished
- */
- facade.addResultListener(this);
- facade.fire();
- try {
- if (DataBundles.hasInputs(getDataBundle())) {
- Path inputs = DataBundles.getInputs(getDataBundle());
- for (Entry<String, DataflowInputPort> inputPort : inputPorts
- .entrySet()) {
- String portName = inputPort.getKey();
- Path path = DataBundles.getPort(inputs, portName);
- if (!DataBundles.isMissing(path)) {
- T2Reference identifier = referenceService.register(
- convertPathToObject(path), inputPort.getValue()
- .getDepth(), true, null);
- int[] index = new int[] {};
- WorkflowDataToken token = new WorkflowDataToken("",
- index, identifier, facade.getContext());
- try {
- facade.pushData(token, portName);
- } catch (TokenOrderException e) {
- logger.log(SEVERE, "Unable to push data for input "
- + portName, e);
- }
- }
- }
- }
- } catch (IOException e) {
- logger.log(SEVERE, "Error getting input data", e);
- }
- }
-
- @Override
- public void pause() {
- facade.pauseWorkflowRun();
- }
-
- @Override
- public void resume() {
- facade.resumeWorkflowRun();
- }
-
- @Override
- public void cancel() {
- facade.cancelWorkflowRun();
- facade.removeResultListener(this);
- MonitorManager.getInstance().removeObserver(executionMonitor);
- }
-
- @Override
- protected WorkflowReport createWorkflowReport(Workflow workflow) {
- return new WorkflowReport(workflow);
- }
-
- @Override
- public ProcessorReport createProcessorReport(
- org.apache.taverna.scufl2.api.core.Processor processor) {
- return new LocalProcessorReport(processor);
- }
-
- @Override
- public ActivityReport createActivityReport(
- org.apache.taverna.scufl2.api.activity.Activity activity) {
- return new ActivityReport(activity);
- }
-
- private InvocationContext createContext() {
- InvocationContext context = new InvocationContext() {
- private List<Object> entities = Collections
- .synchronizedList(new ArrayList<Object>());
-
- @Override
- public <T> List<T> getEntities(Class<T> entityType) {
- List<T> entitiesOfType = new ArrayList<>();
- synchronized (entities) {
- for (Object entity : entities)
- if (entityType.isInstance(entity))
- entitiesOfType.add(entityType.cast(entity));
- }
- return entitiesOfType;
- }
-
- @Override
- public void addEntity(Object entity) {
- entities.add(entity);
- }
-
- @Override
- public ReferenceService getReferenceService() {
- return referenceService;
- }
-
- @Override
- public ProvenanceReporter getProvenanceReporter() {
- return null;
- }
-
- };
- return context;
- }
-
- @Override
- public void resultTokenProduced(WorkflowDataToken token, String portName) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java
deleted file mode 100644
index 35f7a99..0000000
--- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionEnvironment.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2011 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.local;
-
-import java.net.URI;
-import java.util.Set;
-
-import org.apache.taverna.platform.capability.api.ActivityConfigurationException;
-import org.apache.taverna.platform.capability.api.ActivityNotFoundException;
-import org.apache.taverna.platform.capability.api.ActivityService;
-import org.apache.taverna.platform.capability.api.DispatchLayerConfigurationException;
-import org.apache.taverna.platform.capability.api.DispatchLayerNotFoundException;
-import org.apache.taverna.platform.capability.api.DispatchLayerService;
-import uk.org.taverna.platform.execution.api.AbstractExecutionEnvironment;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * Execution Environment for a local Taverna Dataflow Engine
- *
- * @author David Withers
- */
-public class LocalExecutionEnvironment extends AbstractExecutionEnvironment {
-
- private final ActivityService activityService;
- private final DispatchLayerService dispatchLayerService;
-
- public LocalExecutionEnvironment(LocalExecutionService localExecutionService,
- ActivityService activityService, DispatchLayerService dispatchLayerService) {
- super(LocalExecutionEnvironment.class.getName(), "Taverna Local Execution Environment",
- "Execution Environment for a local Taverna Dataflow Engine", localExecutionService);
- this.activityService = activityService;
- this.dispatchLayerService = dispatchLayerService;
- }
-
- @Override
- public Set<URI> getActivityTypes() {
- return activityService.getActivityTypes();
- }
-
- @Override
- public boolean activityExists(URI uri) {
- return activityService.activityExists(uri);
- }
-
- @Override
- public JsonNode getActivityConfigurationSchema(URI uri)
- throws ActivityNotFoundException, ActivityConfigurationException {
- return activityService.getActivityConfigurationSchema(uri);
- }
-
- @Override
- public Set<URI> getDispatchLayerTypes() {
- return dispatchLayerService.getDispatchLayerTypes();
- }
-
- @Override
- public boolean dispatchLayerExists(URI uri) {
- return dispatchLayerService.dispatchLayerExists(uri);
- }
-
- @Override
- public JsonNode getDispatchLayerConfigurationSchema(URI uri)
- throws DispatchLayerNotFoundException, DispatchLayerConfigurationException {
- return dispatchLayerService.getDispatchLayerConfigurationSchema(uri);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionMonitor.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionMonitor.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionMonitor.java
deleted file mode 100755
index 4c57403..0000000
--- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionMonitor.java
+++ /dev/null
@@ -1,548 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2010 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.local;
-
-import static java.util.logging.Level.SEVERE;
-import static java.util.logging.Level.WARNING;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.logging.Logger;
-
-import net.sf.taverna.t2.facade.ResultListener;
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.lang.observer.Observable;
-import net.sf.taverna.t2.lang.observer.Observer;
-import net.sf.taverna.t2.monitor.MonitorManager.AddPropertiesMessage;
-import net.sf.taverna.t2.monitor.MonitorManager.DeregisterNodeMessage;
-import net.sf.taverna.t2.monitor.MonitorManager.MonitorMessage;
-import net.sf.taverna.t2.monitor.MonitorManager.RegisterNodeMessage;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.reference.ErrorDocument;
-import net.sf.taverna.t2.reference.ExternalReferenceSPI;
-import net.sf.taverna.t2.reference.IdentifiedList;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.ReferenceServiceException;
-import net.sf.taverna.t2.reference.ReferenceSet;
-import net.sf.taverna.t2.reference.StackTraceElementBean;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.reference.T2ReferenceType;
-import net.sf.taverna.t2.reference.impl.external.file.FileReference;
-import net.sf.taverna.t2.reference.impl.external.http.HttpReference;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
-
-import org.apache.taverna.robundle.Bundle;
-
-import org.apache.taverna.databundle.DataBundles;
-import uk.org.taverna.platform.execution.api.InvalidWorkflowException;
-import uk.org.taverna.platform.report.ActivityReport;
-import uk.org.taverna.platform.report.Invocation;
-import uk.org.taverna.platform.report.ProcessorReport;
-import uk.org.taverna.platform.report.StatusReport;
-import uk.org.taverna.platform.report.WorkflowReport;
-
-/**
- * A workflow monitor for local executions.
- *
- * @author David Withers
- */
-public class LocalExecutionMonitor implements Observer<MonitorMessage> {
- private static final Logger logger = Logger
- .getLogger(LocalExecutionMonitor.class.getName());
- private static final String ID_SEPARATOR = "/";
-
- private Map<String, StatusReport<?, ?>> reports;
- private Map<String, Invocation> invocations;
- private Map<String, String> invocationToActivity;
- private Map<T2Reference, Path> referenceToPath;
- private final String facadeId;
- private final Bundle dataBundle;
-
- public LocalExecutionMonitor(WorkflowReport workflowReport,
- Bundle dataBundle, WorkflowToDataflowMapper mapping, String facadeId)
- throws InvalidWorkflowException {
- this.dataBundle = dataBundle;
- this.facadeId = facadeId;
- reports = new HashMap<>();
- invocations = new HashMap<>();
- invocationToActivity = new HashMap<>();
- referenceToPath = new HashMap<>();
- mapReports("", workflowReport, mapping);
- }
-
- private void mapReports(String id, WorkflowReport workflowReport,
- WorkflowToDataflowMapper mapping) throws InvalidWorkflowException {
- Dataflow dataflow = mapping.getDataflow(workflowReport.getSubject());
- String dataflowId = null;
- if (id.isEmpty()) {
- dataflowId = dataflow.getLocalName();
- } else {
- dataflowId = id + ID_SEPARATOR + dataflow.getLocalName();
- }
- reports.put(dataflowId, workflowReport);
- for (ProcessorReport processorReport : workflowReport
- .getProcessorReports()) {
- Processor processor = mapping.getDataflowProcessor(processorReport
- .getSubject());
- String processorId = dataflowId + ID_SEPARATOR
- + processor.getLocalName();
- reports.put(processorId, (LocalProcessorReport) processorReport);
- for (ActivityReport activityReport : processorReport
- .getActivityReports()) {
- Activity<?> activity = mapping
- .getDataflowActivity(activityReport.getSubject());
- String activityId = processorId + ID_SEPARATOR
- + activity.hashCode();
- reports.put(activityId, activityReport);
- WorkflowReport nestedWorkflowReport = activityReport
- .getNestedWorkflowReport();
- if (nestedWorkflowReport != null)
- mapReports(activityId, nestedWorkflowReport, mapping);
- }
- }
- }
-
- @Override
- public void notify(Observable<MonitorMessage> sender, MonitorMessage message)
- throws Exception {
- String[] owningProcess = message.getOwningProcess();
- if (owningProcess.length > 0 && owningProcess[0].equals(facadeId)) {
- if (message instanceof RegisterNodeMessage) {
- RegisterNodeMessage regMessage = (RegisterNodeMessage) message;
- registerNode(regMessage.getWorkflowObject(), owningProcess,
- regMessage.getProperties());
- } else if (message instanceof DeregisterNodeMessage) {
- deregisterNode(owningProcess);
- } else if (message instanceof AddPropertiesMessage) {
- AddPropertiesMessage addMessage = (AddPropertiesMessage) message;
- addPropertiesToNode(owningProcess,
- addMessage.getNewProperties());
- } else {
- logger.warning("Unknown message " + message + " from " + sender);
- }
- }
- }
-
- public void registerNode(Object dataflowObject, String[] owningProcess,
- Set<MonitorableProperty<?>> properties) {
- if (dataflowObject instanceof Dataflow) {
- Dataflow dataflow = (Dataflow) dataflowObject;
- Invocation parentInvocation = invocations
- .get(getParentInvocationId(owningProcess));
- WorkflowReport report = (WorkflowReport) reports
- .get(getReportId(owningProcess));
- report.setStartedDate(new Date());
- Invocation invocation = new Invocation(
- getInvocationName(owningProcess), parentInvocation, report);
- if (parentInvocation == null) {
- if (DataBundles.hasInputs(dataBundle)) {
- try {
- invocation.setInputs(DataBundles.getPorts(DataBundles
- .getInputs(dataBundle)));
- } catch (IOException e) {
- logger.log(WARNING, "Error setting input ports", e);
- }
- }
- try {
- Path outputs = DataBundles.getOutputs(dataBundle);
- DataflowResultListener dataflowResultListener = new DataflowResultListener(
- outputs);
- for (DataflowOutputPort dataflowOutputPort : dataflow
- .getOutputPorts()) {
- String portName = dataflowOutputPort.getName();
- Path portPath = DataBundles.getPort(outputs, portName);
- invocation.setOutput(portName, portPath);
- dataflowOutputPort
- .addResultListener(dataflowResultListener);
- }
- } catch (IOException e) {
- logger.log(WARNING, "Error setting output ports", e);
- }
- invocations.put(getInvocationId(owningProcess), invocation);
- } else {
- invocation.setInputs(parentInvocation.getInputs());
- NestedDataflowResultListener resultListener = new NestedDataflowResultListener(
- invocation);
- for (DataflowOutputPort dataflowOutputPort : dataflow
- .getOutputPorts()) {
- dataflowOutputPort.addResultListener(resultListener);
- }
- invocations.put(getInvocationId(owningProcess), invocation);
- }
- } else if (dataflowObject instanceof Processor) {
- StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
- report.setStartedDate(new Date());
- if (report instanceof LocalProcessorReport)
- ((LocalProcessorReport) report).addProperties(properties);
- } else if (dataflowObject instanceof Activity) {
- Activity<?> activity = (Activity<?>) dataflowObject;
- invocationToActivity.put(owningProcess[owningProcess.length - 1],
- String.valueOf(activity.hashCode()));
- } else if (dataflowObject instanceof DispatchJobEvent) {
- DispatchJobEvent jobEvent = (DispatchJobEvent) dataflowObject;
- StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
- // create a new invocation
- Invocation parentInvocation;
- Invocation invocation;
-
- if (report instanceof ActivityReport) {
- parentInvocation = invocations
- .get(getParentInvocationId(owningProcess)
- + indexToString(jobEvent.getIndex()));
- invocation = new Invocation(getInvocationName(owningProcess),
- jobEvent.getIndex(), parentInvocation, report);
- invocations.put(getInvocationId(owningProcess), invocation);
- } else {
- parentInvocation = invocations
- .get(getParentInvocationId(owningProcess));
- invocation = new Invocation(getInvocationName(owningProcess)
- + indexToString(jobEvent.getIndex()),
- jobEvent.getIndex(), parentInvocation, report);
- invocations.put(getInvocationId(owningProcess)
- + indexToString(jobEvent.getIndex()), invocation);
- }
- // set the invocation inputs
- try {
- for (Entry<String, T2Reference> inputInfo : jobEvent.getData()
- .entrySet()) {
- invocation.setInput(
- inputInfo.getKey(),
- getIntermediate(inputInfo.getValue(),
- jobEvent.getContext()));
- }
- } catch (IOException | URISyntaxException e) {
- logger.log(WARNING, "Error saving intermediate inputs for "
- + jobEvent.getOwningProcess(), e);
- }
-
- } else if (dataflowObject instanceof DispatchResultEvent) {
- DispatchResultEvent resultEvent = (DispatchResultEvent) dataflowObject;
- StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
- // find the invocation
- Invocation invocation;
- if (report instanceof ActivityReport)
- invocation = invocations.remove(getInvocationId(owningProcess));
- else
- invocation = invocations.remove(getInvocationId(owningProcess)
- + indexToString(resultEvent.getIndex()));
-
- if (invocation == null) {
- logger.log(SEVERE, "Can't find invocation for owning process "
- + owningProcess);
- return;
- }
-
- // set the invocation outputs
- try {
- for (Entry<String, T2Reference> outputInfo : resultEvent.getData()
- .entrySet()) {
- invocation.setOutput(
- outputInfo.getKey(),
- getIntermediate(outputInfo.getValue(),
- resultEvent.getContext()));
- }
- } catch (IOException | URISyntaxException e) {
- logger.log(WARNING, "Error saving intermediate outputs for "
- + resultEvent.getOwningProcess(), e);
- }
- invocation.setCompletedDate(new Date());
- }
- }
-
- public void deregisterNode(String[] owningProcess) {
- StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
- if (report == null) {
- return;
- } else if (report instanceof WorkflowReport) {
- Invocation invocation = invocations
- .remove(getInvocationId(owningProcess));
- invocation.setCompletedDate(new Date());
- report.setCompletedDate(new Date());
- } else if (report instanceof LocalProcessorReport) {
- ((LocalProcessorReport) report).saveProperties();
- report.setCompletedDate(new Date());
- } else if (report instanceof ActivityReport) {
- // Invocation may still exist if the activity failed
- Invocation invocation = invocations
- .remove(getInvocationId(owningProcess));
- if (invocation != null) {
- invocation.setCompletedDate(new Date());
- report.setFailedDate(new Date());
- } else
- report.setCompletedDate(new Date());
- invocationToActivity
- .remove(owningProcess[owningProcess.length - 1]);
- }
- }
-
- public void addPropertiesToNode(String[] owningProcess,
- Set<MonitorableProperty<?>> newProperties) {
- StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
- if (report instanceof LocalProcessorReport) {
- LocalProcessorReport processorReport = (LocalProcessorReport) report;
- processorReport.addProperties(newProperties);
- }
- }
-
- private String getParentInvocationId(String[] owningProcess) {
- List<String> id = new ArrayList<>();
- for (int i = 1; i < owningProcess.length - 1; i++)
- if (i % 4 != 0)
- id.add(owningProcess[i]);
- return toPath(id);
- }
-
- private String getInvocationId(String[] owningProcess) {
- List<String> id = new ArrayList<>();
- for (int i = 1; i < owningProcess.length; i++)
- if (i % 4 != 0)
- id.add(owningProcess[i]);
- return toPath(id);
- }
-
- private String getInvocationName(String[] owningProcess) {
- return owningProcess[owningProcess.length - 1];
- }
-
- private String toPath(List<String> id) {
- StringBuilder sb = new StringBuilder();
- String sep = "";
- for (String string : id) {
- sb.append(sep).append(string);
- sep = ID_SEPARATOR;
- }
- return sb.toString();
- }
-
- private String getReportId(String[] owningProcess) {
- List<String> id = new ArrayList<>();
- for (int i = 1, position = 0; i < owningProcess.length; i++) {
- if (i % 4 == 0)
- continue;
- if (position == 2) {
- id.add(invocationToActivity.get(owningProcess[i]));
- position = 0;
- } else {
- id.add(owningProcess[i]);
- position++;
- }
- }
- return toPath(id);
- }
-
- public String getProcessorId(String[] owningProcess) {
- StringBuffer sb = new StringBuffer();
- for (int i = 1, skip = 0; i < owningProcess.length; i++, skip--)
- if (i <= 2 || skip < 0) {
- sb.append(owningProcess[i]);
- skip = 3;
- }
- return sb.toString();
- }
-
- private String indexToString(int[] index) {
- StringBuilder indexString = new StringBuilder();
- for (int i = 0; i < index.length; i++) {
- if (i != 0)
- indexString.append(":");
- indexString.append(index[i] + 1);
- }
- return indexString.toString();
- }
-
- private Path getIntermediate(T2Reference t2Reference,
- InvocationContext context) throws IOException, URISyntaxException {
- if (referenceToPath.containsKey(t2Reference))
- return referenceToPath.get(t2Reference);
-
- Path path = referencePath(t2Reference);
- convertReferenceToPath(path, t2Reference, context);
- referenceToPath.put(t2Reference, path);
- return path;
- }
-
- private Path referencePath(T2Reference t2Reference) throws IOException {
- String local = t2Reference.getLocalPart();
- try {
- return DataBundles.getIntermediate(dataBundle,
- UUID.fromString(local));
- } catch (IllegalArgumentException ex) {
- return DataBundles.getIntermediates(dataBundle)
- .resolve(t2Reference.getNamespacePart())
- .resolve(t2Reference.getLocalPart());
- }
- }
-
- public static String getStackTraceElementString(
- StackTraceElementBean stackTraceElement) {
- StringBuilder sb = new StringBuilder();
- sb.append(stackTraceElement.getClassName()).append('.')
- .append(stackTraceElement.getMethodName());
- if (stackTraceElement.getFileName() == null) {
- sb.append("(unknown file)");
- } else {
- sb.append('(').append(stackTraceElement.getFileName()).append(':')
- .append(stackTraceElement.getLineNumber()).append(')');
- }
- return sb.toString();
- }
-
- public void convertReferenceToPath(Path path, T2Reference reference,
- InvocationContext context) throws IOException, URISyntaxException {
- ReferenceService referenceService = context.getReferenceService();
- if (reference.getReferenceType() == T2ReferenceType.ReferenceSet) {
- if (DataBundles.isMissing(path)) {
- ReferenceSet rs = referenceService.getReferenceSetService()
- .getReferenceSet(reference);
- if (rs == null)
- throw new ReferenceServiceException(
- "Could not find ReferenceSet " + reference);
- // Check that there are references in the set
- if (rs.getExternalReferences().isEmpty())
- throw new ReferenceServiceException("ReferenceSet "
- + reference + " is empty");
-
- for (ExternalReferenceSPI ers : rs.getExternalReferences()) {
- if (ers instanceof FileReference) {
- URI uri = ((FileReference) ers).getFile().toURI();
- DataBundles.setReference(path, uri);
- } else if (ers instanceof HttpReference) {
- URI uri = ((HttpReference) ers).getHttpUrl().toURI();
- DataBundles.setReference(path, uri);
- } else {
- try (InputStream in = ers.openStream(context)) {
- Files.copy(in, path);
- }
- }
- }
- }
- } else if (reference.getReferenceType() == T2ReferenceType.ErrorDocument) {
- if (DataBundles.isMissing(path)) {
- ErrorDocument errorDocument = referenceService
- .getErrorDocumentService().getError(reference);
- String message = errorDocument.getMessage();
- StringBuilder trace = new StringBuilder();
- if (errorDocument.getExceptionMessage() != null
- && !errorDocument.getExceptionMessage().isEmpty()) {
- trace.append(errorDocument.getExceptionMessage());
- trace.append("\n");
- }
- List<StackTraceElementBean> stackTraceStrings = errorDocument
- .getStackTraceStrings();
- for (StackTraceElementBean stackTraceElement : stackTraceStrings) {
- trace.append(getStackTraceElementString(stackTraceElement));
- trace.append("\n");
- }
- List<Path> causes = new ArrayList<>();
- for (T2Reference errorReference : errorDocument
- .getErrorReferences())
- causes.add(getIntermediate(errorReference, context));
- DataBundles.setError(path, message, trace.toString(),
- causes.toArray(new Path[causes.size()]));
- }
- } else { // it is an IdentifiedList<T2Reference>
- IdentifiedList<T2Reference> identifiedList = referenceService
- .getListService().getList(reference);
- if (!DataBundles.isList(path))
- DataBundles.createList(path);
- for (T2Reference ref : identifiedList)
- convertReferenceToPath(DataBundles.newListItem(path), ref,
- context);
- }
- }
-
- private class NestedDataflowResultListener implements ResultListener {
- private final Invocation invocation;
-
- public NestedDataflowResultListener(Invocation invocation) {
- this.invocation = invocation;
- }
-
- @Override
- public void resultTokenProduced(WorkflowDataToken token, String portName) {
- try {
- if (token.isFinal())
- invocation
- .setOutput(
- portName,
- getIntermediate(token.getData(),
- token.getContext()));
- } catch (IOException | URISyntaxException e) {
- logger.log(SEVERE, "Unable to convert T2Reference", e);
- }
- }
-
- }
-
- private class DataflowResultListener implements ResultListener {
- private Path outputs;
- private Map<String, Integer> depthSeen = new HashMap<>();
-
- public DataflowResultListener(Path outputs) {
- this.outputs = outputs;
- }
-
- @Override
- public void resultTokenProduced(WorkflowDataToken token, String portName) {
- Integer depth = depthSeen.get(portName);
- if (depth == null || depth.equals(token.getIndex().length)) {
- if (depth == null)
- depthSeen.put(portName, token.getIndex().length);
- try {
- Path port = DataBundles.getPort(outputs, portName);
- Path path = getPath(port, 0, token.getIndex());
- convertReferenceToPath(path, token.getData(),
- token.getContext());
- } catch (IOException | URISyntaxException e) {
- logger.log(SEVERE, "Unable to convert T2Reference", e);
- }
- }
- }
-
- private Path getPath(Path path, int depth, int[] index)
- throws IOException {
- if (depth == index.length)
- return path;
- if (!DataBundles.isList(path))
- DataBundles.createList(path);
- return getPath(DataBundles.getListItem(path, index[depth]),
- depth + 1, index);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionService.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionService.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionService.java
deleted file mode 100644
index ba1ee7f..0000000
--- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalExecutionService.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2010 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.local;
-
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.WeakHashMap;
-
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.Edits;
-
-import org.apache.taverna.robundle.Bundle;
-
-import org.apache.taverna.platform.capability.api.ActivityService;
-import org.apache.taverna.platform.capability.api.DispatchLayerService;
-import uk.org.taverna.platform.execution.api.AbstractExecutionService;
-import uk.org.taverna.platform.execution.api.Execution;
-import uk.org.taverna.platform.execution.api.ExecutionEnvironment;
-import uk.org.taverna.platform.execution.api.InvalidWorkflowException;
-import uk.org.taverna.platform.execution.api.WorkflowCompiler;
-import org.apache.taverna.scufl2.api.container.WorkflowBundle;
-import org.apache.taverna.scufl2.api.core.Workflow;
-import org.apache.taverna.scufl2.api.profiles.Profile;
-
-/**
- * Service for executing Taverna workflows on a local Taverna Dataflow Engine.
- *
- * @author David Withers
- */
-public class LocalExecutionService extends AbstractExecutionService implements
- WorkflowCompiler {
- private Edits edits;
- private ActivityService activityService;
- private DispatchLayerService dispatchLayerService;
- private ReferenceService referenceService;
-
- /**
- * Constructs an execution service that executes workflows using the T2
- * dataflow engine.
- */
- public LocalExecutionService() {
- super(
- LocalExecutionService.class.getName(),
- "Taverna Local Execution Service",
- "Execution Service for executing Taverna workflows using a local Taverna Dataflow Engine");
- }
-
- @Override
- public Set<ExecutionEnvironment> getExecutionEnvironments() {
- Set<ExecutionEnvironment> executionEnvironments = new HashSet<>();
- executionEnvironments.add(new LocalExecutionEnvironment(this,
- activityService, dispatchLayerService));
- return executionEnvironments;
- }
-
- @Override
- protected Execution createExecutionImpl(WorkflowBundle workflowBundle,
- Workflow workflow, Profile profile, Bundle dataBundle)
- throws InvalidWorkflowException {
- return new LocalExecution(workflowBundle, workflow, profile,
- dataBundle, referenceService, edits, activityService,
- dispatchLayerService);
- }
-
- /**
- * Sets the Edits Service for creating Taverna Dataflows.
- *
- * @param edits
- * the Edits Service for creating Taverna Dataflows
- */
- public void setEdits(Edits edits) {
- this.edits = edits;
- }
-
- /**
- * Sets the service for creating activities.
- *
- * @param activityService
- * the service for creating activities
- */
- public void setActivityService(ActivityService activityService) {
- this.activityService = activityService;
- }
-
- /**
- * Sets the service for creating dispatch layers.
- *
- * @param dispatchLayerService
- * the service for creating dispatch layers
- */
- public void setDispatchLayerService(DispatchLayerService dispatchLayerService) {
- this.dispatchLayerService = dispatchLayerService;
- }
-
- /**
- * Sets the reference service.
- *
- * @param referenceService
- * the reference service
- */
- public void setReferenceService(ReferenceService referenceService) {
- this.referenceService = referenceService;
- }
-
- private WeakHashMap<URI, WorkflowToDataflowMapper> cache = new WeakHashMap<>();
-
- private synchronized WorkflowToDataflowMapper getMapper(
- WorkflowBundle bundle) {
- WorkflowToDataflowMapper m = cache.get(bundle.getIdentifier());
- if (m == null) {
- m = new WorkflowToDataflowMapper(bundle, bundle.getMainProfile(),
- edits, activityService, dispatchLayerService);
- cache.put(bundle.getIdentifier(), m);
- }
- return m;
- }
-
- @Override
- public Dataflow getDataflow(Workflow workflow)
- throws InvalidWorkflowException {
- return getMapper(workflow.getParent()).getDataflow(workflow);
- }
-
- @Override
- public synchronized Dataflow getDataflow(WorkflowBundle bundle)
- throws InvalidWorkflowException {
- return getMapper(bundle).getDataflow(bundle.getMainWorkflow());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalProcessorReport.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalProcessorReport.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalProcessorReport.java
deleted file mode 100644
index 2db354c..0000000
--- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/LocalProcessorReport.java
+++ /dev/null
@@ -1,141 +0,0 @@
-package uk.org.taverna.platform.execution.impl.local;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-import net.sf.taverna.t2.monitor.SteerableProperty;
-import uk.org.taverna.platform.report.ProcessorReport;
-import org.apache.taverna.scufl2.api.core.Processor;
-
-/**
- * ProcessorReport implementation based on MonitorableProperty objects.
- *
- * @author David Withers
- */
-public class LocalProcessorReport extends ProcessorReport {
- private static final String DISPATCH_ERRORBOUNCE_TOTAL_TRANSLATED = "dispatch:errorbounce:totalTranslated";
- private static final String DISPATCH_PARALLELIZE_COMPLETEDJOBS = "dispatch:parallelize:completedjobs";
- private static final String DISPATCH_PARALLELIZE_SENTJOBS = "dispatch:parallelize:sentjobs";
- private static final String DISPATCH_PARALLELIZE_QUEUESIZE = "dispatch:parallelize:queuesize";
-
- private Map<String, MonitorableProperty<?>> propertyMap;
-
- public LocalProcessorReport(Processor processor) {
- super(processor);
- propertyMap = new HashMap<String, MonitorableProperty<?>>();
- }
-
- public void addProperties(Set<MonitorableProperty<?>> properties) {
- for (MonitorableProperty<?> property : properties) {
- propertyMap.put(getPropertyName(property), property);
- }
- }
-
- public void saveProperties() {
- for (Entry<String, MonitorableProperty<?>> entry : propertyMap
- .entrySet())
- entry.setValue(new StaticProperty(entry.getValue()));
- }
-
- @Override
- public int getJobsQueued() {
- int result = -1;
- MonitorableProperty<?> property = propertyMap
- .get(DISPATCH_PARALLELIZE_QUEUESIZE);
- try {
- if (property != null)
- result = (Integer) property.getValue();
- } catch (NoSuchPropertyException e) {
- }
- return result;
- }
-
- @Override
- public int getJobsStarted() {
- int result = -1;
- MonitorableProperty<?> property = propertyMap
- .get(DISPATCH_PARALLELIZE_SENTJOBS);
- if (property != null) {
- try {
- result = (Integer) property.getValue();
- } catch (NoSuchPropertyException e) {
- }
- }
- return result;
- }
-
- @Override
- public int getJobsCompleted() {
- int result = -1;
- MonitorableProperty<?> property = propertyMap
- .get(DISPATCH_PARALLELIZE_COMPLETEDJOBS);
- try {
- if (property != null)
- result = (Integer) property.getValue();
- } catch (NoSuchPropertyException e) {
- }
- return result;
- }
-
- @Override
- public int getJobsCompletedWithErrors() {
- int result = -1;
- MonitorableProperty<?> property = propertyMap
- .get(DISPATCH_ERRORBOUNCE_TOTAL_TRANSLATED);
- try {
- if (property != null)
- result = (Integer) property.getValue();
- } catch (NoSuchPropertyException e) {
- }
- return result;
- }
-
- @Override
- public Set<String> getPropertyKeys() {
- if (!propertyMap.isEmpty())
- return new HashSet<>(propertyMap.keySet());
- return super.getPropertyKeys();
- }
-
- @Override
- public Object getProperty(String key) {
- Object result = null;
- MonitorableProperty<?> property = propertyMap.get(key);
- try {
- if (property != null)
- result = property.getValue();
- } catch (NoSuchPropertyException e) {
- }
- return result;
- }
-
- @Override
- public void setProperty(String key, Object value) {
- MonitorableProperty<?> monitorableProperty = propertyMap.get(key);
- if (monitorableProperty instanceof SteerableProperty<?>) {
- @SuppressWarnings("unchecked")
- SteerableProperty<Object> steerableProperty = (SteerableProperty<Object>) monitorableProperty;
- try {
- steerableProperty.setProperty(value);
- } catch (NoSuchPropertyException e) {
- }
- }
- }
-
- private String getPropertyName(MonitorableProperty<?> property) {
- StringBuilder sb = new StringBuilder();
- String[] name = property.getName();
- for (int i = 0; i < name.length; i++) {
- if (i > 0)
- sb.append(':');
- sb.append(name[i]);
- }
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/StaticProperty.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/StaticProperty.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/StaticProperty.java
deleted file mode 100755
index e8c2c07..0000000
--- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/StaticProperty.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2010 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package uk.org.taverna.platform.execution.impl.local;
-
-import java.util.Date;
-
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.monitor.NoSuchPropertyException;
-
-/**
- * A MonitorableProperty with fixed values.
- *
- * @author David Withers
- */
-public class StaticProperty implements MonitorableProperty<Object> {
- private Object value;
- private String[] name;
- private Date lastModified;
-
- /**
- * Records the state of the MonitorableProperty.
- *
- * @param property
- */
- public StaticProperty(MonitorableProperty<?> property) {
- try {
- value = property.getValue();
- } catch (NoSuchPropertyException e) {
- }
- name = property.getName();
- lastModified = property.getLastModified();
- }
-
- @Override
- public Object getValue() throws NoSuchPropertyException {
- return value;
- }
-
- @Override
- public String[] getName() {
- return name;
- }
-
- @Override
- public Date getLastModified() {
- return lastModified;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/T2ReferenceConverter.java
----------------------------------------------------------------------
diff --git a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/T2ReferenceConverter.java b/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/T2ReferenceConverter.java
deleted file mode 100644
index 7de9f48..0000000
--- a/taverna-execution-local/src/main/java/uk/org/taverna/platform/execution/impl/local/T2ReferenceConverter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- *
- */
-package uk.org.taverna.platform.execution.impl.local;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.taverna.databundle.DataBundles;
-
-/**
- * @author David Withers
- */
-public class T2ReferenceConverter {
- public static Object convertPathToObject(Path path) throws IOException {
- Object object = null;
- if (DataBundles.isValue(path)) {
- object = DataBundles.getStringValue(path);
- } else if (DataBundles.isReference(path)) {
- URI reference = DataBundles.getReference(path);
- String scheme = reference.getScheme();
- if ("file".equals(scheme)) {
- object = new File(reference);
- } else {
- object = reference.toURL();
- }
- } else if (DataBundles.isList(path)) {
- List<Path> list = DataBundles.getList(path);
- List<Object> objectList = new ArrayList<Object>(list.size());
- for (Path pathElement : list) {
- objectList.add(convertPathToObject(pathElement));
- }
- object = objectList;
- }
- return object;
- }
-}