You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/23 17:37:48 UTC
[04/51] [partial] incubator-taverna-engine git commit:
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditsImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditsImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditsImpl.java
deleted file mode 100644
index 8873951..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditsImpl.java
+++ /dev/null
@@ -1,1251 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007-2014 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import static java.lang.Integer.parseInt;
-import static net.sf.taverna.t2.workflowmodel.utils.Tools.getUniqueMergeInputPortName;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.annotation.Annotated;
-import net.sf.taverna.t2.annotation.AnnotationAssertion;
-import net.sf.taverna.t2.annotation.AnnotationBeanSPI;
-import net.sf.taverna.t2.annotation.AnnotationChain;
-import net.sf.taverna.t2.annotation.AnnotationRole;
-import net.sf.taverna.t2.annotation.AnnotationSourceSPI;
-import net.sf.taverna.t2.annotation.CurationEvent;
-import net.sf.taverna.t2.annotation.Person;
-import net.sf.taverna.t2.annotation.impl.AnnotationAssertionImpl;
-import net.sf.taverna.t2.annotation.impl.AnnotationChainImpl;
-import net.sf.taverna.t2.facade.WorkflowInstanceFacade;
-import net.sf.taverna.t2.facade.impl.WorkflowInstanceFacadeImpl;
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.reference.ExternalReferenceSPI;
-import net.sf.taverna.t2.workflowmodel.CompoundEdit;
-import net.sf.taverna.t2.workflowmodel.Condition;
-import net.sf.taverna.t2.workflowmodel.Configurable;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.Edit;
-import net.sf.taverna.t2.workflowmodel.EditException;
-import net.sf.taverna.t2.workflowmodel.Edits;
-import net.sf.taverna.t2.workflowmodel.EventForwardingOutputPort;
-import net.sf.taverna.t2.workflowmodel.EventHandlingInputPort;
-import net.sf.taverna.t2.workflowmodel.InvalidDataflowException;
-import net.sf.taverna.t2.workflowmodel.Merge;
-import net.sf.taverna.t2.workflowmodel.MergeInputPort;
-import net.sf.taverna.t2.workflowmodel.OrderedPair;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.ProcessorInputPort;
-import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractActivity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityOutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.impl.ActivityInputPortImpl;
-import net.sf.taverna.t2.workflowmodel.processor.activity.impl.ActivityOutputPortImpl;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchStack;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.impl.AbstractDispatchLayerEdit;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.impl.DispatchStackImpl;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.ErrorBounce;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.Failover;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.Invoke;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.Parallelize;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.Retry;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.Stop;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.IterationStrategy;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.IterationStrategyStack;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.NamedInputPortNode;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.impl.IterationStrategyImpl;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.impl.IterationStrategyStackImpl;
-
-/**
- * Implementation of {@link Edits}
- * @author Donal Fellows (et al)
- */
-public class EditsImpl implements Edits {
- // ----------------------------------------------------------------
- // Basic factory methods
-
- @Override
- public Dataflow createDataflow() {
- return new DataflowImpl();
- }
-
- @Override
- public Datalink createDatalink(EventForwardingOutputPort source,
- EventHandlingInputPort sink) {
- return new DatalinkImpl(source, sink);
- }
-
- @Override
- public DataflowInputPort createDataflowInputPort(String name, int depth,
- int granularDepth, Dataflow dataflow) {
- return new DataflowInputPortImpl(name, depth, granularDepth, dataflow);
- }
-
- @Override
- public DataflowOutputPort createDataflowOutputPort(String name,
- Dataflow dataflow) {
- return new DataflowOutputPortImpl(name, dataflow);
- }
-
- @Override
- public MergeInputPort createMergeInputPort(Merge merge, String name,
- int depth) {
- if (merge instanceof MergeImpl)
- return new MergeInputPortImpl((MergeImpl) merge, name, depth);
- return null;
- }
-
- @Override
- public ProcessorOutputPort createProcessorOutputPort(Processor processor,
- String name, int depth, int granularDepth) {
- return new ProcessorOutputPortImpl((ProcessorImpl) processor, name,
- depth, granularDepth);
- }
-
- @Override
- public ProcessorInputPort createProcessorInputPort(Processor processor,
- String name, int depth) {
- return new ProcessorInputPortImpl((ProcessorImpl) processor, name,
- depth);
- }
-
- @Override
- public AnnotationChain createAnnotationChain() {
- return new AnnotationChainImpl();
- }
-
- /**
- * Creates a MergeImpl instance. Merge names are generated as 'Merge0',
- * 'Merge1', 'Merge2', etc. The next merge to be added always gets the name
- * as the previous merge in the list with its index incremented by one. If a
- * merge is deleted, that is not taken into account when generating merges'
- * names.
- */
- @Override
- public Merge createMerge(Dataflow dataflow) {
- String mergeName;
-
- // Work out what the name of the merge should be.
- List<? extends Merge> merges = dataflow.getMerges();
- if (merges.isEmpty())
- mergeName = "Merge0"; // the first merge to be added to the list
- else
- mergeName = "Merge"
- + String.valueOf(parseInt(merges.get(merges.size() - 1)
- .getLocalName().substring(5)) + 1);
-
- return new MergeImpl(mergeName);
- }
-
- @Override
- public Processor createProcessor(String name) {
- ProcessorImpl processor = new ProcessorImpl();
- processor.setName(name);
- return processor;
- }
-
- @Override
- public IterationStrategy createIterationStrategy() {
- return new IterationStrategyImpl();
- }
-
- /**
- * Builds an instance of {@link ActivityInputPortImpl}
- */
- @Override
- public ActivityInputPort createActivityInputPort(
- String portName,
- int portDepth,
- boolean allowsLiteralValues,
- List<Class<? extends ExternalReferenceSPI>> handledReferenceSchemes,
- Class<?> translatedElementClass) {
- return new ActivityInputPortImpl(portName, portDepth,
- allowsLiteralValues, handledReferenceSchemes,
- translatedElementClass);
- }
-
- /**
- * Builds an instance of {@link ActivityOutputPortImpl}
- */
- @Override
- public ActivityOutputPort createActivityOutputPort(String portName, int portDepth,
- int portGranularDepth) {
- return new ActivityOutputPortImpl(portName, portDepth,
- portGranularDepth);
- }
-
- @Override
- public WorkflowInstanceFacade createWorkflowInstanceFacade(
- Dataflow dataflow, InvocationContext context, String parentProcess)
- throws InvalidDataflowException {
- return new WorkflowInstanceFacadeImpl(dataflow, context, parentProcess);
- }
-
- // ----------------------------------------------------------------
- // Edits (structured transformation) factory methods
-
- @Override
- public Edit<Dataflow> getAddProcessorEdit(Dataflow dataflow,
- Processor processor) {
- if (!(processor instanceof ProcessorImpl))
- throw new RuntimeException(
- "The Processor is of the wrong implmentation,"
- + " it should be of type ProcessorImpl");
- final ProcessorImpl p = (ProcessorImpl) processor;
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow)
- throws EditException {
- dataflow.addProcessor(p);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getAddMergeEdit(Dataflow dataflow, Merge merge) {
- if (!(merge instanceof MergeImpl))
- throw new RuntimeException(
- "The Merge is of the wrong implmentation, "
- + "it should be of type MergeImpl");
- final MergeImpl m = (MergeImpl) merge;
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow)
- throws EditException {
- dataflow.addMerge(m);
- }
- };
- }
-
- @Override
- public Edit<DispatchStack> getAddDispatchLayerEdit(DispatchStack stack,
- final DispatchLayer<?> layer, final int position) {
- return new AbstractDispatchLayerEdit(stack) {
- @Override
- protected void doEditAction(DispatchStackImpl stack) throws EditException {
- stack.addLayer(layer, position);
- }
-
- @Override
- protected void undoEditAction(DispatchStackImpl stack) {
- stack.removeLayer(layer);
- }
- };
- }
-
- @Override
- public Edit<Processor> getAddActivityEdit(Processor processor,
- final Activity<?> activity) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor)
- throws EditException {
- List<Activity<?>> activities = processor.activityList;
- if (activities.contains(activity))
- throw new EditException(
- "Cannot add a duplicate activity to processor");
- activities.add(activity);
- }
- };
- }
-
- @Override
- public Edit<Processor> getAddProcessorInputPortEdit(Processor processor,
- final ProcessorInputPort port) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor)
- throws EditException {
- /*
- * Add a new InputPort object to the processor and also create
- * an appropriate NamedInputPortNode in any iteration
- * strategies. By default set the desired drill depth on each
- * iteration strategy node to the same as the input port, so
- * this won't automatically trigger iteration staging unless the
- * depth is altered on the iteration strategy itself.)
- */
- if (processor.getInputPortWithName(port.getName()) != null)
- throw new EditException(
- "Attempt to create duplicate input port with name '"
- + port.getName() + "'");
- processor.inputPorts.add((ProcessorInputPortImpl) port);
- for (IterationStrategyImpl is : processor.iterationStack
- .getStrategies()) {
- NamedInputPortNode nipn = new NamedInputPortNode(
- port.getName(), port.getDepth());
- is.addInput(nipn);
- is.connectDefault(nipn);
- }
- }
- };
- }
-
- @Override
- public Edit<Processor> getAddProcessorOutputPortEdit(Processor processor,
- final ProcessorOutputPort port) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor)
- throws EditException {
- if (processor.getOutputPortWithName(port.getName()) != null)
- throw new EditException("Duplicate output port name");
- processor.outputPorts.add((ProcessorOutputPortImpl) port);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getCreateDataflowInputPortEdit(Dataflow dataflow,
- final String portName, final int portDepth, final int granularDepth) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow)
- throws EditException {
- dataflow.createInputPort(portName, portDepth, granularDepth);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getCreateDataflowOutputPortEdit(Dataflow dataflow,
- final String portName) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow)
- throws EditException {
- dataflow.createOutputPort(portName);
- }
- };
- }
-
- @Override
- public Edit<DispatchStack> getDeleteDispatchLayerEdit(DispatchStack stack,
- final DispatchLayer<?> layer) {
- return new AbstractDispatchLayerEdit(stack) {
- private int index;
-
- @Override
- protected void doEditAction(DispatchStackImpl stack) {
- index = stack.removeLayer(layer);
- }
-
- @Override
- protected void undoEditAction(DispatchStackImpl stack) {
- stack.addLayer(layer, index);
- }
- };
- }
-
- @Override
- public Edit<Merge> getRenameMergeEdit(Merge merge, final String newName) {
- return new AbstractMergeEdit(merge) {
- @Override
- protected void doEditAction(MergeImpl merge) {
- merge.setName(newName);
- }
- };
- }
-
- @Override
- public Edit<Processor> getRenameProcessorEdit(Processor processor,
- final String newName) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor) {
- processor.setName(newName);
- }
- };
- }
-
- @Override
- public Edit<DataflowInputPort> getRenameDataflowInputPortEdit(
- DataflowInputPort dataflowInputPort, final String newName) {
- return new AbstractDataflowInputPortEdit(dataflowInputPort) {
- @Override
- protected void doEditAction(DataflowInputPortImpl inputPort) {
- inputPort.setName(newName);
- }
- };
- }
-
- @Override
- public Edit<DataflowOutputPort> getRenameDataflowOutputPortEdit(
- DataflowOutputPort dataflowOutputPort, final String newName) {
- return new AbstractDataflowOutputPortEdit(dataflowOutputPort) {
- @Override
- protected void doEditAction(DataflowOutputPortImpl outputPort) {
- outputPort.setName(newName);
- }
- };
- }
-
- @Override
- public Edit<DataflowInputPort> getChangeDataflowInputPortDepthEdit(
- DataflowInputPort dataflowInputPort, final int depth) {
- return new AbstractDataflowInputPortEdit(dataflowInputPort) {
- @Override
- protected void doEditAction(DataflowInputPortImpl port) {
- port.setDepth(depth);
- }
- };
- }
-
- @Override
- public Edit<DataflowInputPort> getChangeDataflowInputPortGranularDepthEdit(
- DataflowInputPort dataflowInputPort, final int granularDepth) {
- return new AbstractDataflowInputPortEdit(dataflowInputPort) {
- @Override
- protected void doEditAction(DataflowInputPortImpl port) {
- port.setGranularDepth(granularDepth);
- }
- };
- }
-
- @Override
- public Edit<Processor> getConnectProcessorOutputEdit(Processor processor,
- final String outputPortName, final EventHandlingInputPort targetPort) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor) throws EditException {
- for (BasicEventForwardingOutputPort popi : processor.outputPorts)
- if (popi.getName().equals(outputPortName)) {
- addOutgoingLink(popi);
- return;
- }
- throw new EditException("Cannot locate output port with name '"
- + outputPortName + "'");
- }
-
- private void addOutgoingLink(BasicEventForwardingOutputPort popi) {
- DatalinkImpl newLink = new DatalinkImpl(popi, targetPort);
- popi.addOutgoingLink(newLink);
- if (targetPort instanceof AbstractEventHandlingInputPort)
- ((AbstractEventHandlingInputPort) targetPort)
- .setIncomingLink(newLink);
- }
- };
- }
-
- @Override
- public Edit<Datalink> getConnectDatalinkEdit(Datalink datalink) {
- return new AbstractDatalinkEdit(datalink) {
- @Override
- protected void doEditAction(DatalinkImpl datalink) throws EditException {
- EventForwardingOutputPort source = datalink.getSource();
- EventHandlingInputPort sink = datalink.getSink();
- if (source instanceof BasicEventForwardingOutputPort)
- ((BasicEventForwardingOutputPort) source).addOutgoingLink(datalink);
- if (sink instanceof AbstractEventHandlingInputPort)
- ((AbstractEventHandlingInputPort) sink).setIncomingLink(datalink);
- }
- };
- }
-
- @Override
- public Edit<AnnotationChain> getAddAnnotationAssertionEdit(
- AnnotationChain annotationChain,
- final AnnotationAssertion<?> annotationAssertion) {
- if (!(annotationChain instanceof AnnotationChainImpl))
- throw new RuntimeException(
- "Object being edited must be instance of AnnotationChainImpl");
- final AnnotationChainImpl chain = (AnnotationChainImpl) annotationChain;
- return new EditSupport<AnnotationChain>() {
- @Override
- public AnnotationChain applyEdit() {
- synchronized (chain) {
- chain.addAnnotationAssertion(annotationAssertion);
- }
- return chain;
- }
-
- @Override
- public Object getSubject() {
- return chain;
- }
- };
- }
-
- /**
- * @return a new instance of ConnectMergedDatalinkEdit constructed from the
- * provided parameters.
- *
- * @param merge
- * a Merge instance
- * @param sourcePort
- * the source port from which a link is to be created.
- * @param sinkPort
- * the sink port to which the link is to be created.
- */
- @Override
- public Edit<Merge> getConnectMergedDatalinkEdit(Merge merge,
- final EventForwardingOutputPort sourcePort,
- final EventHandlingInputPort sinkPort) {
- if (sourcePort == null)
- throw new RuntimeException("The sourceport cannot be null");
- if (sinkPort == null)
- throw new RuntimeException("The sinkport cannot be null");
- return new AbstractMergeEdit(merge) {
- private boolean needToCreateDatalink(MergeImpl mergeImpl)
- throws EditException {
- Collection<? extends Datalink> outgoing = mergeImpl.getOutputPort()
- .getOutgoingLinks();
- if (outgoing.size() == 0) {
- return true;
- } else if (outgoing.size() != 1)
- throw new EditException(
- "The merge instance cannot have more that 1 outgoing Datalink");
- if (outgoing.iterator().next().getSink() != sinkPort)
- throw new EditException(
- "Cannot add a different sinkPort to a Merge that already has one defined");
- return false;
- }
-
- @Override
- protected void doEditAction(MergeImpl merge) throws EditException {
- boolean linkOut = needToCreateDatalink(merge);
- String name = getUniqueMergeInputPortName(merge,
- sourcePort.getName() + "To" + merge.getLocalName()
- + "_input", 0);
- MergeInputPortImpl mergeInputPort = new MergeInputPortImpl(
- merge, name, sinkPort.getDepth());
- merge.addInputPort(mergeInputPort);
- getConnectDatalinkEdit(
- createDatalink(sourcePort, mergeInputPort)).doEdit();
- if (linkOut)
- getConnectDatalinkEdit(
- createDatalink(merge.getOutputPort(), sinkPort))
- .doEdit();
- }
- };
- }
-
- @Override
- public Edit<OrderedPair<Processor>> getCreateConditionEdit(
- Processor control, Processor target) {
- return new AbstractBinaryProcessorEdit(control, target) {
- @Override
- protected void doEditAction(ProcessorImpl control,
- ProcessorImpl target) throws EditException {
- ConditionImpl condition = new ConditionImpl(control, target);
- // Check for duplicates
- for (Condition c : control.controlledConditions)
- if (c.getTarget() == target)
- throw new EditException(
- "Attempt to create duplicate control link");
- control.controlledConditions.add(condition);
- target.conditions.add(condition);
- }
- };
- }
-
- @Override
- public Edit<OrderedPair<Processor>> getRemoveConditionEdit(
- Processor control, Processor target) {
- return new AbstractBinaryProcessorEdit(control, target) {
- @Override
- protected void doEditAction(ProcessorImpl control, ProcessorImpl target)
- throws EditException {
- for (ConditionImpl c : control.controlledConditions)
- if (c.getTarget() == target) {
- control.controlledConditions.remove(c);
- target.conditions.remove(c);
- return;
- }
- throw new EditException(
- "Can't remove a control link as it doesn't exist");
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddAnnotationBean(
- AnnotationAssertion annotationAssertion,
- final AnnotationBeanSPI bean) {
- return new AbstractAnnotationEdit(annotationAssertion) {
- @Override
- protected void doEditAction(AnnotationAssertionImpl assertion)
- throws EditException {
- assertion.setAnnotationBean(bean);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddCurationEvent(
- AnnotationAssertion annotationAssertion,
- final CurationEvent curationEvent) {
- return new AbstractAnnotationEdit(annotationAssertion) {
- @Override
- protected void doEditAction(AnnotationAssertionImpl assertion)
- throws EditException {
- assertion.addCurationEvent(curationEvent);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddAnnotationRole(
- AnnotationAssertion annotationAssertion,
- final AnnotationRole role) {
- return new AbstractAnnotationEdit(annotationAssertion) {
- @Override
- protected void doEditAction(AnnotationAssertionImpl assertion)
- throws EditException {
- assertion.setAnnotationRole(role);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddAnnotationSource(
- AnnotationAssertion annotationAssertion,
- final AnnotationSourceSPI source) {
- return new AbstractAnnotationEdit(annotationAssertion) {
- @Override
- protected void doEditAction(AnnotationAssertionImpl assertion)
- throws EditException {
- assertion.setAnnotationSource(source);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddCreator(
- AnnotationAssertion annotationAssertion, final Person person) {
- return new AbstractAnnotationEdit(annotationAssertion) {
- @Override
- protected void doEditAction(AnnotationAssertionImpl assertion)
- throws EditException {
- assertion.addCreator(person);
- }
- };
- }
-
- @Override
- public Edit<?> getAddAnnotationChainEdit(Annotated<?> annotated,
- AnnotationBeanSPI annotation) {
- List<Edit<?>> editList = new ArrayList<>();
-
- AnnotationAssertion<?> assertion = new AnnotationAssertionImpl();
- AnnotationChain chain = new AnnotationChainImpl();
- editList.add(getAddAnnotationBean(assertion, annotation));
- editList.add(getAddAnnotationAssertionEdit(chain, assertion));
- editList.add(annotated.getAddAnnotationEdit(chain));
-
- return new CompoundEdit(editList);
- }
-
- @Override
- public Edit<Dataflow> getUpdateDataflowNameEdit(Dataflow dataflow,
- final String newName) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow) {
- dataflow.setLocalName(newName);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getUpdateDataflowInternalIdentifierEdit(
- Dataflow dataflow, final String newId) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow) {
- dataflow.setIdentifier(newId);
- }
- };
- }
-
- @Override
- public Edit<Datalink> getDisconnectDatalinkEdit(Datalink datalink) {
- return new AbstractDatalinkEdit(datalink) {
- @Override
- protected void doEditAction(DatalinkImpl datalink) throws EditException {
- EventForwardingOutputPort source = datalink.getSource();
- EventHandlingInputPort sink = datalink.getSink();
-
- if (source instanceof BasicEventForwardingOutputPort)
- ((BasicEventForwardingOutputPort) source)
- .removeOutgoingLink(datalink);
-
- if (sink instanceof AbstractEventHandlingInputPort)
- ((AbstractEventHandlingInputPort) sink).setIncomingLink(null);
- if (sink instanceof MergeInputPortImpl) {
- MergeInputPortImpl mip = (MergeInputPortImpl) sink;
- ((MergeImpl) mip.getMerge()).removeInputPort(mip);
- }
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getRemoveDataflowInputPortEdit(Dataflow dataflow,
- final DataflowInputPort dataflowInputPort) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow) throws EditException {
- dataflow.removeDataflowInputPort(dataflowInputPort);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getRemoveDataflowOutputPortEdit(Dataflow dataflow,
- final DataflowOutputPort dataflowOutputPort) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow) throws EditException {
- dataflow.removeDataflowOutputPort(dataflowOutputPort);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getRemoveProcessorEdit(Dataflow dataflow,
- final Processor processor) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow) {
- dataflow.removeProcessor(processor);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getRemoveMergeEdit(Dataflow dataflow, final Merge merge) {
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow) {
- dataflow.removeMerge(merge);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getAddDataflowInputPortEdit(Dataflow dataflow,
- DataflowInputPort dataflowInputPort) {
- if (!(dataflowInputPort instanceof DataflowInputPortImpl))
- throw new RuntimeException(
- "The DataflowInputPort is of the wrong implmentation, "
- + "it should be of type DataflowInputPortImpl");
- final DataflowInputPortImpl port = (DataflowInputPortImpl) dataflowInputPort;
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow)
- throws EditException {
- dataflow.addInputPort(port);
- }
- };
- }
-
- @Override
- public Edit<Dataflow> getAddDataflowOutputPortEdit(Dataflow dataflow,
- final DataflowOutputPort dataflowOutputPort) {
- if (!(dataflowOutputPort instanceof DataflowOutputPortImpl))
- throw new RuntimeException(
- "The DataflowOutputPort is of the wrong implmentation, "
- + "it should be of type DataflowOutputPortImpl");
- return new AbstractDataflowEdit(dataflow) {
- @Override
- protected void doEditAction(DataflowImpl dataflow)
- throws EditException {
- dataflow.addOutputPort((DataflowOutputPortImpl) dataflowOutputPort);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getAddActivityInputPortEdit(Activity<?> activity,
- final ActivityInputPort activityInputPort) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity) {
- activity.getInputPorts().add(activityInputPort);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getAddActivityInputPortMappingEdit(
- Activity<?> activity, final String processorPortName,
- final String activityPortName) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity)
- throws EditException {
- if (activity.getInputPortMapping().containsKey(
- processorPortName))
- throw new EditException(
- "The output mapping for processor name:"
- + processorPortName + " already exists");
- /*
- * Note javadoc of getOutputPortMapping - the mapping is
- * processorPort -> activityPort -- opposite of the
- * outputPortMapping
- */
- activity.getInputPortMapping().put(processorPortName,
- activityPortName);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getAddActivityOutputPortEdit(Activity<?> activity,
- final ActivityOutputPort activityOutputPort) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity) {
- activity.getOutputPorts().add(activityOutputPort);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getAddActivityOutputPortMappingEdit(
- Activity<?> activity, final String processorPortName,
- final String activityPortName) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity)
- throws EditException {
- Map<String, String> opm = activity.getOutputPortMapping();
- if (opm.containsKey(activityPortName))
- throw new EditException("The mapping starting with:"
- + activityPortName + " already exists");
- /*
- * Note javadoc of getOutputPortMapping - the mapping is
- * activityPort -> processorPort -- opposite of the
- * outputPortMapping
- */
- opm.put(activityPortName, processorPortName);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getRemoveActivityInputPortEdit(
- Activity<?> activity, final ActivityInputPort activityInputPort) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity)
- throws EditException {
- activity.getInputPorts().remove(activityInputPort);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getRemoveActivityInputPortMappingEdit(
- Activity<?> activity, final String processorPortName) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity)
- throws EditException {
- if (!activity.getInputPortMapping().containsKey(processorPortName))
- throw new EditException(
- "The input port mapping for the processor port name:"
- + processorPortName + " doesn't exist");
- activity.getInputPortMapping().remove(processorPortName);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getRemoveActivityOutputPortEdit(
- Activity<?> activity, final ActivityOutputPort activityOutputPort) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity)
- throws EditException {
- activity.getOutputPorts().remove(activityOutputPort);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Edit<Activity<?>> getRemoveActivityOutputPortMappingEdit(
- Activity<?> activity, final String processorPortName) {
- return new AbstractActivityEdit(activity) {
- @Override
- protected void doEditAction(AbstractActivity activity)
- throws EditException {
- if (!activity.getOutputPortMapping().containsKey(processorPortName))
- throw new EditException(
- "The output port mapping for the processor port name:"
- + processorPortName + " doesn't exist");
- activity.getOutputPortMapping().remove(processorPortName);
- }
- };
- }
-
- @Override
- public Edit<Merge> getAddMergeInputPortEdit(Merge merge,
- MergeInputPort mergeInputPort) {
- if (!(mergeInputPort instanceof MergeInputPortImpl))
- throw new RuntimeException(
- "The MergeInputPort is of the wrong implmentation,"
- + " it should be of type MergeInputPortImpl");
- final MergeInputPortImpl port = (MergeInputPortImpl) mergeInputPort;
- return new AbstractMergeEdit(merge) {
- @Override
- protected void doEditAction(MergeImpl mergeImpl) {
- mergeImpl.addInputPort(port);
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public <T> Edit<Activity<?>> getConfigureActivityEdit(Activity<T> activity,
- T configurationBean) {
- return new ConfigureEdit(Activity.class, activity, configurationBean);
- }
-
- @Override
- public Edit<Processor> getRemoveProcessorInputPortEdit(Processor processor,
- final ProcessorInputPort port) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor) throws EditException {
- if (processor.getInputPortWithName(port.getName()) == null)
- throw new EditException("The processor doesn't have a port named:"
- + port.getName());
- for (IterationStrategyImpl is : processor.iterationStack
- .getStrategies())
- is.removeInputByName(port.getName());
- processor.inputPorts.remove(port);
- }
- };
- }
-
- @Override
- public Edit<Processor> getRemoveProcessorOutputPortEdit(
- Processor processor, final ProcessorOutputPort port) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor) throws EditException {
- if (processor.getOutputPortWithName(port.getName()) == null)
- throw new EditException("The processor doesn't have a port named:"
- + port.getName());
- processor.outputPorts.remove(port);
- }
- };
- }
-
- @Override
- public Edit<Processor> getMapProcessorPortsForActivityEdit(
- final Processor processor) {
- return new EditSupport<Processor>() {
- @Override
- public Processor applyEdit() throws EditException {
- List<Edit<?>> edits = new ArrayList<>();
- Activity<?> a = processor.getActivityList().get(0);
-
- List<ProcessorInputPort> inputPortsForRemoval = determineInputPortsForRemoval(
- processor, a);
- List<ProcessorOutputPort> outputPortsForRemoval = determineOutputPortsForRemoval(
- processor, a);
- Map<ProcessorInputPort, ActivityInputPort> changedInputPorts = determineChangedInputPorts(
- processor, a);
- Map<ProcessorOutputPort, ActivityOutputPort> changedOutputPorts = determineChangedOutputPorts(
- processor, a);
-
- for (ProcessorInputPort ip : inputPortsForRemoval) {
- if (ip.getIncomingLink() != null)
- edits.add(getDisconnectDatalinkEdit(ip
- .getIncomingLink()));
- edits.add(getRemoveProcessorInputPortEdit(processor, ip));
- if (a.getInputPortMapping().containsKey(ip.getName()))
- edits.add(getRemoveActivityInputPortMappingEdit(a, ip
- .getName()));
- }
-
- for (ProcessorOutputPort op : outputPortsForRemoval) {
- if (!op.getOutgoingLinks().isEmpty())
- for (Datalink link : op.getOutgoingLinks())
- edits.add(getDisconnectDatalinkEdit(link));
- edits.add(getRemoveProcessorOutputPortEdit(processor, op));
- if (a.getOutputPortMapping().containsKey(op.getName()))
- edits.add(getRemoveActivityOutputPortMappingEdit(a, op
- .getName()));
- }
-
- for (ProcessorInputPort ip : changedInputPorts.keySet()) {
- Datalink incomingLink = ip.getIncomingLink();
- if (incomingLink != null)
- edits.add(getDisconnectDatalinkEdit(incomingLink));
- edits.add(getRemoveProcessorInputPortEdit(processor, ip));
-
- if (incomingLink != null) {
- ActivityInputPort aip = changedInputPorts.get(ip);
- ProcessorInputPort pip = createProcessorInputPort(processor,
- aip.getName(), aip.getDepth());
- edits.add(getAddProcessorInputPortEdit(processor, pip));
- edits.add(getConnectDatalinkEdit(new DatalinkImpl(
- incomingLink.getSource(), pip)));
- }
- }
-
- for (ProcessorOutputPort op : changedOutputPorts.keySet()) {
- Set<? extends Datalink> outgoingLinks = op.getOutgoingLinks();
- for (Datalink link : outgoingLinks)
- edits.add(getDisconnectDatalinkEdit(link));
- edits.add(getRemoveProcessorOutputPortEdit(processor, op));
-
- if (!outgoingLinks.isEmpty()) {
- ActivityOutputPort aop = changedOutputPorts.get(op);
- ProcessorOutputPort pop = createProcessorOutputPort(
- processor, aop.getName(), aop.getDepth(),
- aop.getGranularDepth());
- edits.add(getAddProcessorOutputPortEdit(processor,
- pop));
- for (Datalink link : outgoingLinks)
- edits.add(getConnectDatalinkEdit(createDatalink(
- pop, link.getSink())));
- }
- }
-
- new CompoundEdit(edits).doEdit();
- return processor;
- }
-
- @Override
- public Object getSubject() {
- return processor;
- }
-
- private List<ProcessorInputPort> determineInputPortsForRemoval(Processor p,
- Activity<?> a) {
- List<ProcessorInputPort> result = new ArrayList<>();
- processorPorts: for (ProcessorInputPort pPort : p.getInputPorts()) {
- for (ActivityInputPort aPort : a.getInputPorts())
- if (aPort.getName().equals(pPort.getName()))
- continue processorPorts;
- result.add(pPort);
- }
- return result;
- }
-
- private List<ProcessorOutputPort> determineOutputPortsForRemoval(
- Processor p, Activity<?> a) {
- List<ProcessorOutputPort> result = new ArrayList<>();
- processorPorts: for (ProcessorOutputPort pPort : p.getOutputPorts()) {
- for (OutputPort aPort : a.getOutputPorts())
- if (aPort.getName().equals(pPort.getName()))
- continue processorPorts;
- result.add(pPort);
- }
- return result;
- }
-
- private Map<ProcessorInputPort, ActivityInputPort> determineChangedInputPorts(
- Processor p, Activity<?> a) {
- Map<ProcessorInputPort, ActivityInputPort> result = new HashMap<>();
- for (ProcessorInputPort pPort : p.getInputPorts())
- for (ActivityInputPort aPort : a.getInputPorts())
- if (aPort.getName().equals(pPort.getName())) {
- if (pPort.getDepth() != aPort.getDepth())
- result.put(pPort, aPort);
- break;
- }
- return result;
- }
-
- private Map<ProcessorOutputPort, ActivityOutputPort> determineChangedOutputPorts(
- Processor p, Activity<?> a) {
- Map<ProcessorOutputPort, ActivityOutputPort> result = new HashMap<>();
- for (ProcessorOutputPort pPort : p.getOutputPorts())
- for (OutputPort aPort : a.getOutputPorts())
- if (aPort.getName().equals(pPort.getName())) {
- if ((pPort.getDepth() != aPort.getDepth())
- || (pPort.getGranularDepth() != aPort
- .getGranularDepth()))
- result.put(pPort, (ActivityOutputPort) aPort);
- break;
- }
- return result;
- }
- };
- }
-
- private static final int DEFAULT_MAX_JOBS = 1;
-
- @Override
- public Edit<Processor> getDefaultDispatchStackEdit(Processor processor) {
- DispatchStackImpl stack = ((ProcessorImpl) processor)
- .getDispatchStack();
- // Top level parallelise layer
- int layer = 0;
- List<Edit<?>> edits = new ArrayList<>();
- edits.add(getAddDispatchLayerEdit(stack, new Parallelize(DEFAULT_MAX_JOBS),
- layer++));
- edits.add(getAddDispatchLayerEdit(stack, new ErrorBounce(), layer++));
- edits.add(getAddDispatchLayerEdit(stack, new Failover(), layer++));
- edits.add(getAddDispatchLayerEdit(stack, new Retry(), layer++));
- edits.add(getAddDispatchLayerEdit(stack, new Stop(), layer++));
- edits.add(getAddDispatchLayerEdit(stack, new Invoke(), layer++));
-
- final Edit<?> compoundEdit = new CompoundEdit(edits);
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor)
- throws EditException {
- compoundEdit.doEdit();
- }
- };
- }
-
- @Override
- public Edit<Processor> getSetIterationStrategyStackEdit(
- Processor processor,
- final IterationStrategyStack iterationStrategyStack) {
- if (!(iterationStrategyStack instanceof IterationStrategyStackImpl))
- throw new RuntimeException(
- "Unknown implementation of iteration strategy "
- + iterationStrategyStack);
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor)
- throws EditException {
- processor.iterationStack = (IterationStrategyStackImpl) iterationStrategyStack;
- }
- };
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public <T> Edit<? extends Configurable<T>> getConfigureEdit(
- Configurable<T> configurable, T configBean) {
- return new ConfigureEdit(Configurable.class, configurable, configBean);
- }
-
- @Override
- public Edit<Merge> getReorderMergeInputPortsEdit(Merge merge,
- final List<MergeInputPort> reorderedMergeInputPortList) {
- return new AbstractMergeEdit(merge) {
- @Override
- protected void doEditAction(MergeImpl mergeImpl) {
- mergeImpl.reorderInputPorts(reorderedMergeInputPortList);
- }
- };
- }
-
- @Override
- public Edit<Processor> getRemoveActivityEdit(Processor processor,
- final Activity<?> activity) {
- return new AbstractProcessorEdit(processor) {
- @Override
- protected void doEditAction(ProcessorImpl processor) {
- processor.activityList.remove(activity);
- }
- };
- }
-
- @Override
- public Edit<IterationStrategyStack> getAddIterationStrategyEdit(
- IterationStrategyStack iterationStrategyStack,
- final IterationStrategy strategy) {
- if (!(iterationStrategyStack instanceof IterationStrategyStackImpl))
- throw new RuntimeException(
- "Object being edited must be instance of IterationStrategyStackImpl");
- final IterationStrategyStackImpl stack = (IterationStrategyStackImpl) iterationStrategyStack;
- return new EditSupport<IterationStrategyStack>() {
- @Override
- public IterationStrategyStack applyEdit() {
- stack.addStrategy(strategy);
- return stack;
- }
-
- @Override
- public IterationStrategyStack getSubject() {
- return stack;
- }
- };
- }
-
- @Override
- public Edit<IterationStrategy> getAddIterationStrategyInputNodeEdit(
- IterationStrategy iterationStrategy,
- final NamedInputPortNode namedInputPortNode) {
- if (!(iterationStrategy instanceof IterationStrategyImpl))
- throw new RuntimeException(
- "Object being edited must be instance of IterationStrategyImpl");
- final IterationStrategyImpl strategy = (IterationStrategyImpl) iterationStrategy;
- return new EditSupport<IterationStrategy>() {
- @Override
- public IterationStrategy applyEdit() throws EditException {
- strategy.addInput(namedInputPortNode);
- return strategy;
- }
-
- @Override
- public IterationStrategy getSubject() {
- return strategy;
- }
- };
- }
-
- @Override
- public Edit<IterationStrategyStack> getClearIterationStrategyStackEdit(
- final IterationStrategyStack iterationStrategyStack) {
- if (!(iterationStrategyStack instanceof IterationStrategyStackImpl))
- throw new RuntimeException(
- "Object being edited must be instance of IterationStrategyStackImpl");
- return new EditSupport<IterationStrategyStack>() {
- @Override
- public IterationStrategyStack applyEdit() throws EditException {
- ((IterationStrategyStackImpl) iterationStrategyStack).clear();
- return iterationStrategyStack;
- }
-
- @Override
- public IterationStrategyStack getSubject() {
- return iterationStrategyStack;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeImpl.java
deleted file mode 100644
index aba722a..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeImpl.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007-2008 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import static java.lang.System.arraycopy;
-import static java.util.Collections.nCopies;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.reference.IdentifiedList;
-import net.sf.taverna.t2.reference.ListService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.EventForwardingOutputPort;
-import net.sf.taverna.t2.workflowmodel.InputPort;
-import net.sf.taverna.t2.workflowmodel.Merge;
-import net.sf.taverna.t2.workflowmodel.MergeInputPort;
-import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.IterationTypeMismatchException;
-
-import org.apache.log4j.Logger;
-
-/**
- * Implementation of {@link Merge}
- *
- * @author Tom Oinn
- * @author Stian Soiland-Reyes
- *
- */
-class MergeImpl implements Merge {
- @SuppressWarnings("unused")
- private static Logger logger = Logger.getLogger(MergeImpl.class);
-
- private List<MergeInputPortImpl> inputs = new ArrayList<>();
- private String name;
- private BasicEventForwardingOutputPort output;
- private Map<String, List<T2Reference>> partialOutputsByProcess = new HashMap<>();
-
- public MergeImpl(String mergeName) {
- super();
- this.name = mergeName;
- this.output = new MergeOutputPortImpl(this, name + "_output", 0, 0);
- }
-
- @Override
- public String getLocalName() {
- return this.name;
- }
-
- protected void setName(String name) {
- this.name = name;
- }
-
- /**
- * Adds a new input port to the internal list of ports.
- *
- * @param inputPort
- * the MergeInputPortImpl
- */
- public void addInputPort(MergeInputPortImpl inputPort) {
- inputs.add(inputPort);
- }
-
- /**
- * Removes an input port from the internal list of ports.
- *
- * @param inputPort
- */
- public void removeInputPort(MergeInputPortImpl inputPort) {
- inputs.remove(inputPort);
- }
-
- @Override
- public List<? extends MergeInputPort> getInputPorts() {
- return inputs;
- }
-
- @Override
- public EventForwardingOutputPort getOutputPort() {
- return this.output;
- }
-
- /**
- * Return the index of the port with the specified name, or -1 if the port
- * can't be found (this is a bad thing!)
- *
- * @param portName
- * @return
- */
- private int inputPortNameToIndex(String portName) {
- int i = 0;
- for (InputPort ip : inputs) {
- if (ip.getName().equals(portName))
- return i;
- i++;
- }
- return -1; // FIXME: as the javadoc states, this is a bad thing!
- }
-
- protected void receiveEvent(WorkflowDataToken token, String portName) {
- List<T2Reference> outputList;
- String owningProcess = token.getOwningProcess();
- synchronized (partialOutputsByProcess) {
- outputList = partialOutputsByProcess.get(owningProcess);
- if (outputList == null) {
- int numPorts = getInputPorts().size();
- outputList = new ArrayList<>(nCopies(numPorts, (T2Reference) null));
- partialOutputsByProcess.put(owningProcess, outputList);
- }
- }
- int portIndex = inputPortNameToIndex(portName);
- if (portIndex == -1)
- throw new WorkflowStructureException(
- "Received event on unknown port " + portName);
- int[] currentIndex = token.getIndex();
- int[] newIndex = new int[currentIndex.length + 1];
- newIndex[0] = portIndex;
- arraycopy(currentIndex, 0, newIndex, 1, currentIndex.length);
- InvocationContext context = token.getContext();
- output.sendEvent(new WorkflowDataToken(owningProcess,
- newIndex, token.getData(), context));
- if (token.getIndex().length == 0)
- // Add to completion list
- synchronized (outputList) {
- if (outputList.size() <= portIndex)
- // Ports changed after initiating running as our list is
- // smaller than portIndex
- throw new WorkflowStructureException(
- "Unexpected addition of output port " + portName
- + " at " + portIndex);
- if (outputList.get(portIndex) != null)
- throw new WorkflowStructureException(
- "Already received completion for port " + portName
- + " " + outputList.get(portIndex));
-
- outputList.set(portIndex, token.getData());
- if (!outputList.contains(null)) {
- // We're finished, let's register and send out the list
- ListService listService = context.getReferenceService()
- .getListService();
- IdentifiedList<T2Reference> registeredList = listService
- .registerList(outputList, context);
- WorkflowDataToken workflowDataToken = new WorkflowDataToken(
- owningProcess, new int[0], registeredList.getId(),
- context);
- synchronized (partialOutputsByProcess) {
- partialOutputsByProcess.remove(owningProcess);
- }
- output.sendEvent(workflowDataToken);
- }
- }
- }
-
- /**
- * There is only ever a single output from a merge node but the token
- * processing entity interface defines a list, in this case it always
- * contains exactly one item.
- */
- @Override
- public List<? extends EventForwardingOutputPort> getOutputPorts() {
- List<EventForwardingOutputPort> result = new ArrayList<>();
- result.add(output);
- return result;
- }
-
- @Override
- public boolean doTypeCheck() throws IterationTypeMismatchException {
- if (inputs.size() == 0)
- /*
- * Arguable, but technically a merge with no inputs is valid, it may
- * make more sense to throw an exception here though as it has no
- * actual meaning.
- */
- return true;
- /*
- * Return false if we have unbound input ports or bound ports where the
- * resolved depth hasn't been calculated yet
- */
- for (MergeInputPort ip : inputs)
- if (ip.getIncomingLink() == null
- || ip.getIncomingLink().getResolvedDepth() == -1)
- return false;
-
- // Got all input ports, now scan for input depths
- int inputDepth = inputs.get(0).getIncomingLink().getResolvedDepth();
- for (MergeInputPort ip : inputs)
- if (ip.getIncomingLink().getResolvedDepth() != inputDepth)
- throw new IterationTypeMismatchException();
-
- // Set the granular depth to be the input depth as this will be the granularity of the output
- output.setGranularDepth(inputDepth);
- /*
- * Got to here so all the input resolved depths match, push depth+1 to
- * all outgoing links and return true
- */
- for (DatalinkImpl dli : output.outgoingLinks)
- dli.setResolvedDepth(inputDepth+1);
- return true;
- }
-
- @SuppressWarnings("unchecked")
- public void reorderInputPorts(
- List<? extends MergeInputPort> reorderedInputPortList) {
- // Just set the inputs to the already reordered list of ports
- inputs = (List<MergeInputPortImpl>) reorderedInputPortList;
- }
-
- @Override
- public String toString() {
- return "Merge " + getLocalName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeInputPortImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeInputPortImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeInputPortImpl.java
deleted file mode 100644
index a56a710..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeInputPortImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.workflowmodel.Merge;
-import net.sf.taverna.t2.workflowmodel.MergeInputPort;
-
-class MergeInputPortImpl extends AbstractEventHandlingInputPort implements
- MergeInputPort {
- private MergeImpl parent;
-
- protected MergeInputPortImpl(MergeImpl merge, String name, int depth) {
- super(name, depth);
- this.parent = merge;
- }
-
- @Override
- public void receiveEvent(WorkflowDataToken t) {
- parent.receiveEvent(t, this.name);
- }
-
- @Override
- public Merge getMerge() {
- return parent;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeOutputPortImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeOutputPortImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeOutputPortImpl.java
deleted file mode 100644
index 6ef19b2..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/MergeOutputPortImpl.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.workflowmodel.Merge;
-import net.sf.taverna.t2.workflowmodel.MergeOutputPort;
-
-class MergeOutputPortImpl extends BasicEventForwardingOutputPort
- implements MergeOutputPort {
- private Merge merge;
-
- public MergeOutputPortImpl(Merge merge, String portName, int portDepth,
- int granularDepth) {
- super(portName, portDepth, granularDepth);
- this.merge = merge;
- }
-
- @Override
- public Merge getMerge() {
- return merge;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorCrystalizerImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorCrystalizerImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorCrystalizerImpl.java
deleted file mode 100644
index 90debf0..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorCrystalizerImpl.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import net.sf.taverna.t2.invocation.Completion;
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
-
-/**
- * AbstractCrystalizer bound to a specific ProcessorImpl
- *
- * @author Tom Oinn
- */
-public class ProcessorCrystalizerImpl extends AbstractCrystalizer {
- private ProcessorImpl parent;
-
- /**
- * Create and bind to the specified ProcessorImpl
- *
- * @param parent
- */
- protected ProcessorCrystalizerImpl(ProcessorImpl parent) {
- this.parent = parent;
- }
-
- @Override
- public void completionCreated(Completion completion) {
- throw new WorkflowStructureException(
- "Should never see this if everything is working,"
- + "if this occurs it is likely that the internal "
- + "logic is broken, talk to Tom");
- }
-
- @Override
- public void jobCreated(Job outputJob) {
- for (String outputPortName : outputJob.getData().keySet()) {
- WorkflowDataToken token = new WorkflowDataToken(
- outputJob.getOwningProcess(), outputJob.getIndex(),
- outputJob.getData().get(outputPortName),
- outputJob.getContext());
- parent.getOutputPortWithName(outputPortName).receiveEvent(token);
- }
- }
-
- @Override
- /**
- * Used to construct a Job of empty lists at the appropriate depth in the
- * event of a completion hitting the crystalizer before it sees a child
- * node, i.e. the result of iterating over an empty collection structure of
- * some kind.
- */
- public Job getEmptyJob(String owningProcess, int[] index,
- InvocationContext context) {
- int wrappingDepth = parent.resultWrappingDepth;
- if (wrappingDepth < 0)
- throw new RuntimeException("Processor [" + owningProcess
- + "] hasn't been configured, cannot emit empty job");
- /*
- * The wrapping depth is the length of index array that would be used if
- * a single item of the output port type were returned. We can examine
- * the index array for the node we're trying to create and use this to
- * work out how much we need to add to the output port depth to create
- * empty lists of the right type given the index array.
- */
- int depth = wrappingDepth - index.length;
- // TODO - why was this incrementing?
- // depth++;
-
- ReferenceService rs = context.getReferenceService();
- Map<String, T2Reference> emptyJobMap = new HashMap<>();
- for (OutputPort op : parent.getOutputPorts())
- emptyJobMap.put(op.getName(), rs.getListService()
- .registerEmptyList(depth + op.getDepth(), context).getId());
- return new Job(owningProcess, index, emptyJobMap, context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorImpl.java
deleted file mode 100644
index d4ecd28..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorImpl.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import static java.util.Collections.unmodifiableList;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import net.sf.taverna.t2.annotation.AbstractAnnotatedThing;
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.invocation.IterationInternalEvent;
-import net.sf.taverna.t2.lang.observer.MultiCaster;
-import net.sf.taverna.t2.lang.observer.Observer;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.monitor.MonitorableProperty;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Condition;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowValidationReport;
-import net.sf.taverna.t2.workflowmodel.InvalidDataflowException;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.ProcessorFinishedEvent;
-import net.sf.taverna.t2.workflowmodel.ProcessorInputPort;
-import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
-import net.sf.taverna.t2.workflowmodel.processor.activity.NestedDataflow;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.PropertyContributingDispatchLayer;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.impl.DispatchStackImpl;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.IterationTypeMismatchException;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.MissingIterationInputException;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.impl.IterationStrategyImpl;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.impl.IterationStrategyStackImpl;
-
-import org.apache.log4j.Logger;
-
-/**
- * Implementation of Processor
- *
- * @author Tom Oinn
- * @author Stuart Owen
- * @author Alex Nenadic
- *
- */
-public final class ProcessorImpl extends AbstractAnnotatedThing<Processor>
- implements Processor {
- private static int pNameCounter = 0;
- private static Logger logger = Logger.getLogger(ProcessorImpl.class);
-
- protected List<ConditionImpl> conditions = new ArrayList<>();
- protected List<ConditionImpl> controlledConditions = new ArrayList<>();
- protected List<ProcessorInputPortImpl> inputPorts = new ArrayList<>();
- protected List<ProcessorOutputPortImpl> outputPorts = new ArrayList<>();
- protected List<Activity<?>> activityList = new ArrayList<>();
- protected AbstractCrystalizer crystalizer;
- protected DispatchStackImpl dispatchStack;
- protected IterationStrategyStackImpl iterationStack;
- protected String name;
- public transient int resultWrappingDepth = -1;
- protected transient Map<String, Set<MonitorableProperty<?>>> monitorables = new HashMap<>();
- private MultiCaster<ProcessorFinishedEvent> processorFinishedMultiCaster = new MultiCaster<>(
- this);
-
- /**
- * <p>
- * Create a new processor implementation with default blank iteration
- * strategy and dispatch stack.
- * </p>
- * <p>
- * This constructor is protected to enforce that an instance can only be
- * created via the {@link EditsImpl#createProcessor(String)} method.
- * </p>
- */
-
- protected ProcessorImpl() {
- // Set a default name
- name = "UnnamedProcessor" + (pNameCounter++);
-
- /*
- * Create iteration stack, configure it to send jobs and completion
- * events to the dispatch stack.
- */
- iterationStack = new IterationStrategyStackImpl() {
- @Override
- protected void receiveEventFromStrategy(IterationInternalEvent<?> e) {
- dispatchStack.receiveEvent(e);
- }
- };
- iterationStack.addStrategy(new IterationStrategyImpl());
-
- // Configure dispatch stack to push output events to the crystalizer
- dispatchStack = new DispatchStackImpl() {
- @Override
- protected String getProcessName() {
- return ProcessorImpl.this.name;
- }
-
- @Override
- public Processor getProcessor() {
- return ProcessorImpl.this;
- }
-
- /**
- * Called when an event bubbles out of the top of the dispatch
- * stack. In this case we pass it into the crystalizer.
- */
- @Override
- protected void pushEvent(IterationInternalEvent<?> e) {
- crystalizer.receiveEvent(e);
- }
-
- /**
- * Iterate over all the preconditions and return true if and only if
- * all are satisfied for the given process identifier.
- */
- @Override
- protected boolean conditionsSatisfied(String owningProcess) {
- for (Condition c : conditions)
- if (c.isSatisfied(owningProcess) == false)
- return false;
- return true;
- }
-
- @Override
- protected List<? extends Activity<?>> getActivities() {
- return ProcessorImpl.this.getActivityList();
- }
-
- /**
- * We've finished here, set the satisfied property on any controlled
- * condition objects to true and notify the targets.
- */
- @Override
- protected void finishedWith(String owningProcess) {
- if (!controlledConditions.isEmpty()) {
- String enclosingProcess = owningProcess.substring(0,
- owningProcess.lastIndexOf(':'));
- for (ConditionImpl ci : controlledConditions) {
- ci.satisfy(enclosingProcess);
- ci.getTarget().getDispatchStack()
- .satisfyConditions(enclosingProcess);
- }
- }
- /*
- * Tell whoever is interested that the processor has finished
- * executing
- */
- processorFinishedMultiCaster.notify(new ProcessorFinishedEvent(
- this.getProcessor(), owningProcess));
- }
-
- @Override
- public void receiveMonitorableProperty(MonitorableProperty<?> prop,
- String processID) {
- synchronized (monitorables) {
- Set<MonitorableProperty<?>> props = monitorables
- .get(processID);
- if (props == null) {
- props = new HashSet<>();
- monitorables.put(processID, props);
- }
- props.add(prop);
- }
- }
- };
-
- // Configure crystalizer to send realized events to the output ports
- crystalizer = new ProcessorCrystalizerImpl(this);
- }
-
- /**
- * When called this method configures input port filters and the
- * crystalizer, pushing cardinality information into outgoing datalinks.
- *
- * @return true if the typecheck was successful or false if the check failed
- * because there were preconditions missing such as unsatisfied
- * input types
- * @throws IterationTypeMismatchException
- * if the typing occured but didn't match because of an
- * iteration mismatch
- * @throws InvalidDataflowException
- * if the entity depended on a dataflow that was not valid
- */
- @Override
- public boolean doTypeCheck() throws IterationTypeMismatchException,
- InvalidDataflowException {
- // Check for any nested dataflows, they should all be valid
- for (Activity<?> activity : getActivityList())
- if (activity instanceof NestedDataflow) {
- NestedDataflow nestedDataflowActivity = (NestedDataflow) activity;
- Dataflow nestedDataflow = nestedDataflowActivity
- .getNestedDataflow();
- DataflowValidationReport validity = nestedDataflow
- .checkValidity();
- if (!validity.isValid())
- throw new InvalidDataflowException(nestedDataflow, validity);
- }
-
- /*
- * Check whether all our input ports have inbound links
- */
-
- Map<String, Integer> inputDepths = new HashMap<>();
- for (ProcessorInputPortImpl input : inputPorts) {
- if (input.getIncomingLink() == null)
- return false;
- if (input.getIncomingLink().getResolvedDepth() == -1)
- /*
- * Incoming link hasn't been resolved yet, can't do this
- * processor at the moment
- */
- return false;
-
- // Get the conceptual resolved depth of the datalink
- inputDepths.put(input.getName(), input.getIncomingLink()
- .getResolvedDepth());
- /*
- * Configure the filter with the finest grained item from the link
- * source
- */
- input.setFilterDepth(input.getIncomingLink().getSource()
- .getGranularDepth());
- }
-
- /*
- * Got here so we have all the inputs, now test whether the iteration
- * strategy typechecks correctly
- */
-
- try {
- this.resultWrappingDepth = iterationStack
- .getIterationDepth(inputDepths);
- for (BasicEventForwardingOutputPort output : outputPorts)
- for (DatalinkImpl outgoingLink : output.outgoingLinks)
- // Set the resolved depth on each output edge
- outgoingLink.setResolvedDepth(this.resultWrappingDepth
- + output.getDepth());
- } catch (MissingIterationInputException e) {
- /*
- * This should never happen as we only get here if we've already
- * checked that all the inputs have been provided. If it does happen
- * we've got some deeper issues.
- */
- logger.error(e);
- return false;
- }
-
- // If we get to here everything has been configured appropriately
- return true;
- }
-
- /* Utility methods */
-
- protected ProcessorInputPortImpl getInputPortWithName(String name) {
- for (ProcessorInputPortImpl p : inputPorts) {
- String portName = p.getName();
- if (portName.equals(name))
- return p;
- }
- return null;
- }
-
- protected ProcessorOutputPortImpl getOutputPortWithName(String name) {
- for (ProcessorOutputPortImpl p : outputPorts) {
- String portName = p.getName();
- if (portName.equals(name))
- return p;
- }
- return null;
- }
-
- /* Implementations of Processor interface */
-
- @Override
- public void fire(String enclosingProcess, InvocationContext context) {
- Job newJob = new Job(enclosingProcess + ":" + this.name, new int[0],
- new HashMap<String, T2Reference>(), context);
- dispatchStack.receiveEvent(newJob);
- }
-
- @Override
- public List<? extends Condition> getPreconditionList() {
- return unmodifiableList(conditions);
- }
-
- @Override
- public List<? extends Condition> getControlledPreconditionList() {
- return unmodifiableList(controlledConditions);
- }
-
- @Override
- public DispatchStackImpl getDispatchStack() {
- return dispatchStack;
- }
-
- @Override
- public IterationStrategyStackImpl getIterationStrategy() {
- return iterationStack;
- }
-
- @Override
- public List<? extends ProcessorInputPort> getInputPorts() {
- return unmodifiableList(inputPorts);
- }
-
- @Override
- public List<? extends ProcessorOutputPort> getOutputPorts() {
- return unmodifiableList(outputPorts);
- }
-
- @Override
- public List<? extends Activity<?>> getActivityList() {
- return unmodifiableList(activityList);
- }
-
- protected void setName(String newName) {
- this.name = newName;
- }
-
- @Override
- public String getLocalName() {
- return this.name;
- }
-
- /**
- * Called by the DataflowImpl containing this processor requesting that it
- * register itself with the monitor tree under the specified process
- * identifier.
- *
- * @param dataflowOwningProcess
- * the process identifier of the parent dataflow, the processor
- * must register with this as the base path plus the local name
- */
- void registerWithMonitor(String dataflowOwningProcess) {
- /*
- * Given the dataflow process identifier, so append local name to get
- * the process identifier that will be applied to incoming data tokens
- */
- String processID = dataflowOwningProcess + ":" + getLocalName();
-
- /*
- * The set of monitorable (and steerable) properties for this processor
- * level monitor node
- */
- Set<MonitorableProperty<?>> properties = new HashSet<>();
-
- /*
- * If any dispatch layers implement PropertyContributingDispatchLayer
- * then message them to push their properties into the property store
- * within the dispatch stack. In this case the anonymous inner class
- * implements this by storing them in a protected map within
- * ProcessoImpl from where they can be recovered after the iteration has
- * finished.
- */
- for (DispatchLayer<?> layer : dispatchStack.getLayers())
- if (layer instanceof PropertyContributingDispatchLayer)
- ((PropertyContributingDispatchLayer<?>) layer)
- .injectPropertiesFor(processID);
- /*
- * All layers have now injected properties into the parent dispatch
- * stack, which has responded by building an entry in the monitorables
- * map in this class. We can pull everything out of it and remove the
- * entry quite safely at this point.
- */
- synchronized (monitorables) {
- Set<MonitorableProperty<?>> layerProps = monitorables
- .get(processID);
- if (layerProps != null) {
- for (MonitorableProperty<?> prop : layerProps)
- properties.add(prop);
- monitorables.remove(processID);
- }
- }
-
- /*
- * Register the node with the monitor tree, including any aggregated
- * properties from layers.
- */
- MonitorManager.getInstance().registerNode(this,
- dataflowOwningProcess + ":" + getLocalName(), properties);
- }
-
- @Override
- public void addObserver(Observer<ProcessorFinishedEvent> observer) {
- processorFinishedMultiCaster.addObserver(observer);
- }
-
- @Override
- public List<Observer<ProcessorFinishedEvent>> getObservers() {
- return processorFinishedMultiCaster.getObservers();
- }
-
- @Override
- public void removeObserver(Observer<ProcessorFinishedEvent> observer) {
- processorFinishedMultiCaster.removeObserver(observer);
- }
-
- @Override
- public String toString() {
- return "Processor " + getLocalName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorInputPortImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorInputPortImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorInputPortImpl.java
deleted file mode 100644
index d0a23eb..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorInputPortImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.ProcessorInputPort;
-
-/**
- * An implementation of the filtering input port interface used as an input for
- * a ProcessorImpl. If the filter level is undefined this input port will always
- * throw workflow structure exceptions when you push data into it. This port
- * must be linked to a crystalizer or something which offers the same
- * operational contract, it requires a full hierarchy of data tokens (i.e. if
- * you push something in with an index you must at some point subsequent to that
- * push at least a single list in with the empty index)
- *
- * @author Tom Oinn
- */
-class ProcessorInputPortImpl extends AbstractFilteringInputPort implements
- ProcessorInputPort {
- private ProcessorImpl parent;
-
- protected ProcessorInputPortImpl(ProcessorImpl parent, String name,
- int depth) {
- super(name, depth);
- this.parent = parent;
- }
-
- @Override
- public String transformOwningProcess(String oldOwner) {
- return oldOwner + ":" + parent.getLocalName();
- }
-
- @Override
- protected void pushCompletion(String portName, String owningProcess,
- int[] index, InvocationContext context) {
- parent.iterationStack.receiveCompletion(portName, owningProcess, index,
- context);
- }
-
- @Override
- protected void pushData(String portName, String owningProcess, int[] index,
- T2Reference data, InvocationContext context) {
- parent.iterationStack.receiveData(portName, owningProcess, index, data,
- context);
- }
-
- @Override
- public Processor getProcessor() {
- return this.parent;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorOutputPortImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorOutputPortImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorOutputPortImpl.java
deleted file mode 100644
index 86817a0..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorOutputPortImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort;
-
-/**
- * Extension of AbstractOutputPort for use as the output port on a
- * ProcessorImpl. Contains additional logic to relay workflow data tokens from
- * the internal crystalizer to each in a set of target FilteringInputPort
- * instances.
- *
- * @author Tom Oinn
- * @author Stuart Owen
- */
-class ProcessorOutputPortImpl extends BasicEventForwardingOutputPort implements
- ProcessorOutputPort {
- private ProcessorImpl parent;
-
- protected ProcessorOutputPortImpl(ProcessorImpl parent, String portName,
- int portDepth, int granularDepth) {
- super(portName, portDepth, granularDepth);
- this.parent = parent;
- }
-
- /**
- * Strip off the last id in the owning process stack (as this will have been
- * pushed onto the stack on entry to the processor) and relay the event to
- * the targets.
- */
- protected void receiveEvent(WorkflowDataToken token) {
- sendEvent(token.popOwningProcess());
- }
-
- @Override
- public Processor getProcessor() {
- return this.parent;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/package.html
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/package.html b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/package.html
deleted file mode 100644
index 46b9fee..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/package.html
+++ /dev/null
@@ -1,3 +0,0 @@
-<body>
-Implementation package for workflow entities
-</body>
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/processor/activity/impl/ActivityInputPortImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/processor/activity/impl/ActivityInputPortImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/processor/activity/impl/ActivityInputPortImpl.java
deleted file mode 100644
index 0d261a2..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/processor/activity/impl/ActivityInputPortImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.processor.activity.impl;
-
-import static java.util.Collections.unmodifiableList;
-
-import java.util.List;
-
-import net.sf.taverna.t2.reference.ExternalReferenceSPI;
-import net.sf.taverna.t2.workflowmodel.AbstractPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort;
-
-/**
- * An input port on an Activity instance. Simply used as a bean to hold port
- * name and depth properties.
- *
- * @author Tom Oinn
- * @author Stuart Owen
- */
-public class ActivityInputPortImpl extends AbstractPort implements
- ActivityInputPort {
- private Class<?> translatedElementClass;
- private List<Class<? extends ExternalReferenceSPI>> handledReferenceSchemes;
- boolean allowsLiteralValues;
-
- /**
- * Constructs an Activity input port instance with the provided name and
- * depth.
- *
- * @param portName
- * @param portDepth
- */
- public ActivityInputPortImpl(String portName, int portDepth) {
- super(portName, portDepth);
- }
-
- /**
- * Constructs an Activity input port with the provided name and depth,
- * together with a list of predetermined annotations.
- *
- * @param portName
- * @param portDepth
- */
- public ActivityInputPortImpl(
- String portName,
- int portDepth,
- boolean allowsLiteralValues,
- List<Class<? extends ExternalReferenceSPI>> handledReferenceSchemes,
- Class<?> translatedElementClass) {
- this(portName, portDepth);
- this.allowsLiteralValues = allowsLiteralValues;
- this.handledReferenceSchemes = handledReferenceSchemes;
- this.translatedElementClass = translatedElementClass;
- }
-
- @Override
- public boolean allowsLiteralValues() {
- return this.allowsLiteralValues;
- }
-
- @Override
- public List<Class<? extends ExternalReferenceSPI>> getHandledReferenceSchemes() {
- return unmodifiableList(this.handledReferenceSchemes);
- }
-
- @Override
- public Class<?> getTranslatedElementClass() {
- return this.translatedElementClass;
- }
-}