You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/23 17:37:49 UTC
[05/51] [partial] incubator-taverna-engine git commit:
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractEventHandlingInputPort.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractEventHandlingInputPort.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractEventHandlingInputPort.java
deleted file mode 100644
index 8e4359f..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractEventHandlingInputPort.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.workflowmodel.AbstractPort;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.EventHandlingInputPort;
-
-/**
- * Extends AbstractPort with the getIncomingLink method and an additional
- * implementation method to set the incoming data link
- *
- * @author Tom Oinn
- */
-public abstract class AbstractEventHandlingInputPort extends AbstractPort
- implements EventHandlingInputPort {
- private Datalink incomingLink = null;
-
- protected AbstractEventHandlingInputPort(String name, int depth) {
- super(name, depth);
- }
-
- @Override
- public Datalink getIncomingLink() {
- return this.incomingLink;
- }
-
- protected void setIncomingLink(Datalink newLink) {
- this.incomingLink = newLink;
- }
-
- protected void setName(String name) {
- this.name = name;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractFilteringInputPort.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractFilteringInputPort.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractFilteringInputPort.java
deleted file mode 100644
index a0f48be..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractFilteringInputPort.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.reference.ContextualizedT2Reference;
-import net.sf.taverna.t2.reference.ReferenceService;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.FilteringInputPort;
-import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
-
-/**
- * Abstract superclass for filtering input ports, extend and implement the
- * pushXXX methods to configure behaviour
- *
- * @author Tom Oinn
- */
-public abstract class AbstractFilteringInputPort extends
- AbstractEventHandlingInputPort implements FilteringInputPort {
- protected AbstractFilteringInputPort(String name, int depth) {
- super(name, depth);
- this.filterDepth = depth;
- }
-
- @Override
- public int getFilterDepth() {
- return this.filterDepth;
- }
-
- private int filterDepth;
-
- @Override
- public void receiveEvent(WorkflowDataToken token) {
- receiveToken(token);
- }
-
- public void pushToken(WorkflowDataToken dt, String owningProcess,
- int desiredDepth) {
- if (dt.getData().getDepth() == desiredDepth)
- pushData(getName(), owningProcess, dt.getIndex(), dt.getData(), dt
- .getContext());
- else {
- ReferenceService rs = dt.getContext().getReferenceService();
- Iterator<ContextualizedT2Reference> children = rs.traverseFrom(dt
- .getData(), dt.getData().getDepth() - 1);
-
- while (children.hasNext()) {
- ContextualizedT2Reference ci = children.next();
- int[] newIndex = new int[dt.getIndex().length
- + ci.getIndex().length];
- int i = 0;
- for (int indx : dt.getIndex())
- newIndex[i++] = indx;
- for (int indx : ci.getIndex())
- newIndex[i++] = indx;
- pushToken(new WorkflowDataToken(owningProcess, newIndex, ci
- .getReference(), dt.getContext()), owningProcess,
- desiredDepth);
- }
- pushCompletion(getName(), owningProcess, dt.getIndex(), dt
- .getContext());
- }
- }
-
- public void receiveToken(WorkflowDataToken token) {
- String newOwner = transformOwningProcess(token.getOwningProcess());
- if (filterDepth == -1)
- throw new WorkflowStructureException(
- "Input depth filter not configured on input port, failing");
-
- int tokenDepth = token.getData().getDepth();
- if (tokenDepth == filterDepth) {
- if (filterDepth == getDepth())
- /*
- * Pass event straight through, the filter depth is the same as
- * the desired input port depth
- */
- pushData(getName(), newOwner, token.getIndex(),
- token.getData(), token.getContext());
- else {
- pushToken(token, newOwner, getDepth());
- /*
- * Shred the input identifier into the appropriate port depth
- * and send the events through, pushing a completion event at
- * the end.
- */
- }
- } else if (tokenDepth > filterDepth) {
- // Convert to a completion event and push into the iteration strategy
- pushCompletion(getName(), newOwner, token.getIndex(), token
- .getContext());
- } else if (tokenDepth < filterDepth) {
- /*
- * Normally we can ignore these, but there is a special case where
- * token depth is less than filter depth and there is no index
- * array. In this case we can't throw the token away as there will
- * never be an enclosing one so we have to use the data manager to
- * register a new single element collection and recurse.
- */
- if (token.getIndex().length == 0) {
- T2Reference ref = token.getData();
- ReferenceService rs = token.getContext().getReferenceService();
- int currentDepth = tokenDepth;
- while (currentDepth < filterDepth) {
- // Wrap in a single item list
- List<T2Reference> newList = new ArrayList<>();
- newList.add(ref);
- ref = rs.getListService()
- .registerList(newList, token.getContext()).getId();
- currentDepth++;
- }
- pushData(getName(), newOwner, new int[0], ref,
- token.getContext());
- }
- }
- }
-
- public void setFilterDepth(int filterDepth) {
- this.filterDepth = filterDepth;
- if (filterDepth < getDepth())
- this.filterDepth = getDepth();
- }
-
- /**
- * Action to take when the filter pushes a completion event out
- *
- * @param portName
- * @param owningProcess
- * @param index
- */
- protected abstract void pushCompletion(String portName,
- String owningProcess, int[] index, InvocationContext context);
-
- /**
- * Action to take when a data event is created by the filter
- *
- * @param portName
- * @param owningProcess
- * @param index
- * @param data
- */
- protected abstract void pushData(String portName, String owningProcess,
- int[] index, T2Reference data, InvocationContext context);
-
- /**
- * Override this to transform owning process identifiers as they pass
- * through the filter, by default this is the identity transformation
- *
- * @param oldOwner
- * @return
- */
- protected String transformOwningProcess(String oldOwner) {
- return oldOwner;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractMergeEdit.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractMergeEdit.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractMergeEdit.java
deleted file mode 100644
index 8e14f00..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractMergeEdit.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.workflowmodel.EditException;
-import net.sf.taverna.t2.workflowmodel.Merge;
-
-public abstract class AbstractMergeEdit extends EditSupport<Merge> {
- private final MergeImpl merge;
-
- public AbstractMergeEdit(Merge merge) {
- if (merge == null)
- throw new RuntimeException(
- "Cannot construct a merge edit with a null merge");
- if (!(merge instanceof MergeImpl))
- throw new RuntimeException("Merge must be an instanceof MergeImpl");
- this.merge = (MergeImpl) merge;
- }
-
- @Override
- public final Merge applyEdit() throws EditException {
- synchronized (merge) {
- doEditAction(merge);
- }
- return merge;
- }
-
- protected abstract void doEditAction(MergeImpl mergeImpl) throws EditException;
-
- @Override
- public final Object getSubject() {
- return merge;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractProcessorEdit.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractProcessorEdit.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractProcessorEdit.java
deleted file mode 100644
index a00bc0c..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/AbstractProcessorEdit.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.workflowmodel.EditException;
-import net.sf.taverna.t2.workflowmodel.Processor;
-
-/**
- * Abstraction of an edit acting on a Processor instance. Handles the check to
- * see that the Processor supplied is really a ProcessorImpl.
- *
- * @author Tom Oinn
- *
- */
-public abstract class AbstractProcessorEdit extends EditSupport<Processor> {
- private final ProcessorImpl processor;
-
- protected AbstractProcessorEdit(Processor processor) {
- if (processor == null)
- throw new RuntimeException(
- "Cannot construct a processor edit with null processor");
- if (!(processor instanceof ProcessorImpl))
- throw new RuntimeException(
- "Edit cannot be applied to a Processor which isn't an instance of ProcessorImpl");
- this.processor = (ProcessorImpl) processor;
- }
-
- @Override
- public final Processor applyEdit() throws EditException {
- synchronized (processor) {
- doEditAction(processor);
- }
- return processor;
- }
-
- /**
- * Do the actual edit here
- *
- * @param processor
- * The ProcessorImpl to which the edit applies
- * @throws EditException
- */
- protected abstract void doEditAction(ProcessorImpl processor)
- throws EditException;
-
- @Override
- public final Processor getSubject() {
- return processor;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/BasicEventForwardingOutputPort.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/BasicEventForwardingOutputPort.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/BasicEventForwardingOutputPort.java
deleted file mode 100644
index 7b29a8a..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/BasicEventForwardingOutputPort.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.workflowmodel.AbstractOutputPort;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.EventForwardingOutputPort;
-
-/**
- * Extension of AbstractOutputPort implementing EventForwardingOutputPort
- *
- * @author Tom Oinn
- *
- */
-public class BasicEventForwardingOutputPort extends AbstractOutputPort
- implements EventForwardingOutputPort {
- protected Set<DatalinkImpl> outgoingLinks;
-
- /**
- * Construct a new abstract output port with event forwarding capability
- *
- * @param portName
- * @param portDepth
- * @param granularDepth
- */
- public BasicEventForwardingOutputPort(String portName, int portDepth,
- int granularDepth) {
- super(portName, portDepth, granularDepth);
- this.outgoingLinks = new HashSet<>();
- }
-
- /**
- * Implements EventForwardingOutputPort
- */
- @Override
- public final Set<? extends Datalink> getOutgoingLinks() {
- return Collections.unmodifiableSet(this.outgoingLinks);
- }
-
- /**
- * Forward the specified event to all targets
- *
- * @param e
- */
- public void sendEvent(WorkflowDataToken e) {
- for (Datalink link : outgoingLinks)
- link.getSink().receiveEvent(e);
- }
-
- protected void addOutgoingLink(DatalinkImpl link) {
- if (outgoingLinks.contains(link) == false)
- outgoingLinks.add(link);
- }
-
- protected void removeOutgoingLink(Datalink link) {
- outgoingLinks.remove(link);
- }
-
- protected void setDepth(int depth) {
- this.depth = depth;
- }
-
- protected void setGranularDepth(int granularDepth) {
- this.granularDepth = granularDepth;
- }
-
- protected void setName(String name) {
- this.name = name;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConditionImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConditionImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConditionImpl.java
deleted file mode 100644
index c3440ec..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConditionImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import static java.lang.Boolean.TRUE;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import net.sf.taverna.t2.annotation.AbstractAnnotatedThing;
-import net.sf.taverna.t2.workflowmodel.Condition;
-
-class ConditionImpl extends AbstractAnnotatedThing<Condition> implements Condition {
- private ProcessorImpl control, target;
- private Map<String, Boolean> stateMap = new HashMap<>();
-
- protected ConditionImpl(ProcessorImpl control, ProcessorImpl target) {
- this.control = control;
- this.target = target;
- }
-
- @Override
- public ProcessorImpl getControl() {
- return this.control;
- }
-
- @Override
- public ProcessorImpl getTarget() {
- return this.target;
- }
-
- @Override
- public boolean isSatisfied(String owningProcess) {
- if (!stateMap.containsKey(owningProcess))
- return false;
- return stateMap.get(owningProcess);
- }
-
- protected void satisfy(String owningProcess) {
- stateMap.put(owningProcess, TRUE);
- // TODO - poke target processor here
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConfigureEdit.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConfigureEdit.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConfigureEdit.java
deleted file mode 100644
index edfaf01..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/ConfigureEdit.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.workflowmodel.Configurable;
-import net.sf.taverna.t2.workflowmodel.ConfigurationException;
-import net.sf.taverna.t2.workflowmodel.EditException;
-
-import org.apache.log4j.Logger;
-
-/**
- * An Edit that is responsible for configuring a {@link Configurable} with a
- * given configuration bean.
- *
- * @author Stuart Owen
- * @author Stian Soiland-Reyes
- * @author Donal Fellows
- */
-class ConfigureEdit<T> extends EditSupport<Configurable<T>> {
- private static Logger logger = Logger.getLogger(ConfigureEdit.class);
-
- private final Configurable<T> configurable;
- private final Class<? extends Configurable<T>> configurableType;
- private final T configurationBean;
-
- ConfigureEdit(Class<? extends Configurable<T>> subjectType,
- Configurable<T> configurable, T configurationBean) {
- if (configurable == null)
- throw new RuntimeException(
- "Cannot construct an edit with null subject");
- this.configurableType = subjectType;
- this.configurable = configurable;
- this.configurationBean = configurationBean;
- if (!configurableType.isInstance(configurable))
- throw new RuntimeException(
- "Edit cannot be applied to an object which isn't an instance of "
- + configurableType);
- }
-
- @Override
- public final Configurable<T> applyEdit() throws EditException {
- try {
- // FIXME: Should clone bean on configuration to prevent caller from
- // modifying bean afterwards
- synchronized (configurable) {
- configurable.configure(configurationBean);
- }
- return configurable;
- } catch (ConfigurationException e) {
- logger.error("Error configuring :"
- + configurable.getClass().getSimpleName(), e);
- throw new EditException(e);
- }
- }
-
- @Override
- public final Configurable<T> getSubject() {
- return configurable;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/Crystalizer.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/Crystalizer.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/Crystalizer.java
deleted file mode 100644
index 7b95012..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/Crystalizer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.invocation.Completion;
-import net.sf.taverna.t2.invocation.IterationInternalEvent;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
-
-/**
- * Recieves Job and Completion events and emits Jobs unaltered. Completion
- * events additionally cause registration of lists for each key in the datamap
- * of the jobs at immediate child locations in the index structure. These list
- * identifiers are sent in place of the Completion events.
- * <p>
- * State for a given process ID is purged when a final completion event is
- * received so there is no need for an explicit cache purge operation in the
- * public API (although for termination of partially complete workflows it may
- * be sensible for subclasses to provide one)
- *
- * @author Tom Oinn
- */
-public interface Crystalizer {
- /**
- * Receive a Job or Completion, Jobs are emitted unaltered and cached,
- * Completion events trigger registration of a corresponding list - this may
- * be recursive in nature if the completion event's index implies nested
- * lists which have not been registered.
- */
- void receiveEvent(
- IterationInternalEvent<? extends IterationInternalEvent<?>> event);
-
- /**
- * This method is called when a new Job has been handled by the
- * AbstractCrystalizer, either by direct passthrough or by list
- * registration.
- *
- */
- void jobCreated(Job outputJob);
-
- /**
- * Called whenever a completion not corresponding to a node in the cache is
- * generated. In many cases this is an indication of an error state, the
- * processor implementation should ensure that completion events are only
- * sent to the crystalizer if there has been at least one data event with a
- * lower depth on the same path.
- */
- void completionCreated(Completion completion);
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowImpl.java
deleted file mode 100644
index b087359..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowImpl.java
+++ /dev/null
@@ -1,797 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import static java.util.Collections.unmodifiableList;
-import static net.sf.taverna.t2.workflowmodel.utils.Tools.addDataflowIdentification;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import net.sf.taverna.t2.annotation.AbstractAnnotatedThing;
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.monitor.MonitorManager;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.DataflowValidationReport;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.EditException;
-import net.sf.taverna.t2.workflowmodel.EventHandlingInputPort;
-import net.sf.taverna.t2.workflowmodel.FailureTransmitter;
-import net.sf.taverna.t2.workflowmodel.InvalidDataflowException;
-import net.sf.taverna.t2.workflowmodel.Merge;
-import net.sf.taverna.t2.workflowmodel.NamedWorkflowEntity;
-import net.sf.taverna.t2.workflowmodel.NamingException;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.TokenProcessingEntity;
-import net.sf.taverna.t2.workflowmodel.processor.iteration.IterationTypeMismatchException;
-
-/**
- * Implementation of Dataflow including implementation of the dataflow level
- * type checker. Other than this the implementation is fairly simple as it's
- * effectively just a container for other things especially the dataflow input
- * and output port implementations.
- *
- * @author Tom Oinn
- *
- */
-public class DataflowImpl extends AbstractAnnotatedThing<Dataflow> implements
- Dataflow {
- List<ProcessorImpl> processors;
- List<MergeImpl> merges;
- private String name;
- private static int nameIndex = 1;
- private List<DataflowInputPortImpl> inputs;
- private List<DataflowOutputPortImpl> outputs;
- protected String internalIdentifier;
- private DataflowValidationReport validationReport;
-
- /**
- * Protected constructor, assigns a default name. To build an instance of
- * DataflowImpl you should use the appropriate Edit object from the Edits
- * interface
- */
- protected DataflowImpl() {
- this.name = "Workflow" + (nameIndex++);
- this.processors = new ArrayList<ProcessorImpl>();
- this.merges = new ArrayList<MergeImpl>();
- this.inputs = new ArrayList<DataflowInputPortImpl>();
- this.outputs = new ArrayList<DataflowOutputPortImpl>();
- refreshInternalIdentifier();
- }
-
- /**
- * Adds a processor on the DataFlow.
- *
- * @param processor
- * the ProcessorImpl to be added to the Dataflow
- * @return
- * @throws NamingException
- * if a processor already exists with the same local name
- */
- protected synchronized void addProcessor(ProcessorImpl processor)
- throws NamingException {
- for (Processor existingProcessor : new ArrayList<>(processors))
- if (existingProcessor.getLocalName().equals(
- processor.getLocalName()))
- throw new NamingException("There already is a processor named:"
- + processor.getLocalName());
- processors.add(processor);
- }
-
- protected synchronized void removeProcessor(Processor processor) {
- processors.remove(processor);
- }
-
- /**
- * Adds a processor on the DataFlow.
- *
- * @param processor
- * the ProcessorImpl to be added to the Dataflow
- * @return
- * @throws NamingException
- * if a processor already exists with the same local name
- */
- protected synchronized void addMerge(MergeImpl merge)
- throws NamingException {
- for (Merge existingMerge : new ArrayList<>(merges))
- if (existingMerge.getLocalName().equals(merge.getLocalName()))
- throw new NamingException(
- "There already is a merge operation named:"
- + merge.getLocalName());
- merges.add(merge);
- }
-
- protected synchronized void removeMerge(Merge merge) {
- merges.remove(merge);
- }
-
- /**
- * Build a new dataflow input port, the granular depth is set for the input
- * port so it can be copied onto the internal output port
- *
- * @param name
- * name of the dataflow input port to build
- * @param depth
- * input depth
- * @param granularDepth
- * granular depth to copy to the internal output port
- * @throws NamingException
- * in the event of a duplicate or invalid name
- * @return the newly created input port
- */
- protected synchronized DataflowInputPort createInputPort(String name,
- int depth, int granularDepth) throws NamingException {
- for (DataflowInputPort dip : inputs)
- if (dip.getName().equals(name))
- throw new NamingException(
- "Duplicate workflow input port name '" + name
- + "' in workflow already.");
- DataflowInputPortImpl dipi = new DataflowInputPortImpl(name, depth,
- granularDepth, this);
- inputs.add(dipi);
- return dipi;
- }
-
- /**
- * Adds an input port to the DataFlow.
- *
- * @param inputPort
- * the DataflowInputPortImpl to be added to the Dataflow
- * @throws EditException
- */
- protected synchronized void addInputPort(DataflowInputPortImpl inputPort)
- throws EditException {
- for (DataflowInputPort existingInputPort : new ArrayList<>(inputs))
- if (existingInputPort.getName().equals(inputPort.getName()))
- throw new NamingException(
- "There already is a workflow input port named:"
- + inputPort.getName());
- if (inputPort.getDataflow() != this)
- throw new EditException("Port specifies a different workflow");
- inputs.add(inputPort);
- }
-
- /**
- * Remove the named dataflow input port
- *
- * @param name
- * name of the dataflow input port to remove
- * @throws EditException
- * if the specified port doesn't exist within this dataflow
- */
- protected synchronized void removeDataflowInputPort(String name)
- throws EditException {
- for (DataflowInputPort dip : inputs)
- if (dip.getName().equals(name)) {
- removeDataflowInputPort(dip);
- return;
- }
- throw new EditException("No such input port '" + name
- + "' in workflow.");
- }
-
- /**
- * Remove the specified input port from this dataflow
- *
- * @param dip
- * dataflow input port to remove
- * @throws EditException
- * if the input port isn't in the list of inputs - should never
- * happen but you never know.
- */
- protected synchronized void removeDataflowInputPort(DataflowInputPort dip)
- throws EditException {
- if (!inputs.contains(dip))
- throw new EditException(
- "Can't locate the specified input port in workflow. Input port has name '"
- + dip.getName() + "'.");
- inputs.remove(dip);
- }
-
- /**
- * Create and return a new DataflowOutputPort in this dataflow
- *
- * @param name
- * name of the port to create, must be unique within the set of
- * output ports for this dataflow
- * @return the newly created DataflowOutputPort
- * @throws NamingException
- * if the name is invalid or already exists as a name for a
- * dataflow output
- */
- protected synchronized DataflowOutputPort createOutputPort(String name)
- throws NamingException {
- for (DataflowOutputPort dop : outputs)
- if (dop.getName().equals(name))
- throw new NamingException(
- "Duplicate workflow output port name '" + name
- + "' in workflow already.");
- DataflowOutputPortImpl dopi = new DataflowOutputPortImpl(name, this);
- outputs.add(dopi);
- return dopi;
- }
-
- /**
- * Adds an output port to the DataFlow.
- *
- * @param outputPort
- * the DataflowOutputPortImpl to be added to the Dataflow
- * @throws EditException
- */
- protected synchronized void addOutputPort(DataflowOutputPortImpl outputPort)
- throws EditException {
- for (DataflowOutputPort existingOutputPort : new ArrayList<>(outputs))
- if (existingOutputPort.getName().equals(outputPort.getName()))
- throw new NamingException(
- "There already is a workflow output port named:"
- + outputPort.getName());
- if (outputPort.getDataflow() != this)
- throw new EditException("Port specifies a different workflow");
- outputs.add(outputPort);
- }
-
- /**
- * Remove the named dataflow output port
- *
- * @param name
- * name of the dataflow output port to remove
- * @throws EditException
- * if the specified port doesn't exist within this dataflow
- */
- protected synchronized void removeDataflowOutputPort(String name)
- throws EditException {
- for (DataflowOutputPort dop : outputs)
- if (dop.getName().equals(name)) {
- removeDataflowOutputPort(dop);
- return;
- }
- throw new EditException("No such output port '" + name
- + "' in workflow.");
- }
-
- /**
- * Remove the specified output port from this dataflow
- *
- * @param dop
- * dataflow output port to remove
- * @throws EditException
- * if the output port isn't in the list of outputs for this
- * dataflow
- */
- protected synchronized void removeDataflowOutputPort(DataflowOutputPort dop)
- throws EditException {
- if (!outputs.contains(dop))
- throw new EditException(
- "Can't locate the specified output port in workflow, output port has name '"
- + dop.getName() + "'.");
- outputs.remove(dop);
- }
-
- /**
- * Create a new datalink between two entities within the workflow
- *
- * @param sourceName
- * interpreted either as the literal name of a dataflow input
- * port or the colon seperated name of a
- * [processorName|mergeName]:[outputPort]
- * @param sinkName
- * as with sourceName but for processor or merge input ports and
- * dataflow output ports
- * @return the created Datalink
- * @throws EditException
- * if either source or sink isn't found within this dataflow or
- * if the link would violate workflow structural constraints in
- * an immediate way. This won't catch cycles (see the validation
- * methods for that) but will prevent you from having more than
- * one link going to an input port.
- */
- protected synchronized Datalink link(String sourceName, String sinkName)
- throws EditException {
- BasicEventForwardingOutputPort source = findSourcePort(sourceName);
- EventHandlingInputPort sink = findSinkPort(sinkName);
-
- // Check whether the sink is already linked
- if (sink.getIncomingLink() != null)
- throw new EditException("Cannot link to sink port '" + sinkName
- + "' as it is already linked");
-
- /*
- * Got here so we have both source and sink and the sink isn't already
- * linked from somewhere. If the sink isn't linked we can't have a
- * duplicate link here which would have been the other condition to
- * check for.
- */
-
- DatalinkImpl link = new DatalinkImpl(source, sink);
- source.addOutgoingLink(link);
- ((AbstractEventHandlingInputPort) sink).setIncomingLink(link);
-
- return link;
- }
-
- /* @nonnull */
- private BasicEventForwardingOutputPort findSourcePort(String sourceName)
- throws EditException {
- BasicEventForwardingOutputPort source = null;
- String[] split = sourceName.split(":");
- if (split.length == 2) {
- /* source is a processor */
- // TODO - update to include Merge when it's added
- for (ProcessorImpl pi : processors)
- if (pi.getLocalName().equals(split[0])) {
- source = pi.getOutputPortWithName(split[1]);
- break;
- }
- } else if (split.length == 1) {
- /*
- * source is a workflow input port, or at least the internal output
- * port within it
- */
- for (DataflowInputPortImpl dipi : inputs)
- if (dipi.getName().equals(split[0])) {
- source = dipi.internalOutput;
- break;
- }
- } else
- throw new EditException("Invalid source link name '" + sourceName
- + "'.");
- if (source == null)
- throw new EditException("Unable to find source port named '"
- + sourceName + "' in link creation.");
- return source;
- }
-
- /* @nonnull */
- private EventHandlingInputPort findSinkPort(String sinkName)
- throws EditException {
- EventHandlingInputPort sink = null;
- String[] split;
- split = sinkName.split(":");
- if (split.length == 2) {
- /* sink is a processor */
- // TODO - update to include Merge when it's added
- for (ProcessorImpl pi : processors)
- if (pi.getLocalName().equals(split[0])) {
- sink = pi.getInputPortWithName(split[1]);
- break;
- }
- } else if (split.length == 1) {
- /*
- * source is a workflow input port, or at least the internal output
- * port within it
- */
- for (DataflowOutputPortImpl dopi : outputs)
- if (dopi.getName().equals(split[0])) {
- sink = dopi.getInternalInputPort();
- break;
- }
- } else
- throw new EditException("Invalid link sink name '" + sinkName
- + "'.");
- if (sink == null)
- throw new EditException("Unable to find sink port named '"
- + sinkName + "' in link creation");
- return sink;
- }
-
- /**
- * Return a copy of the list of dataflow input ports for this dataflow
- */
- @Override
- public synchronized List<? extends DataflowInputPort> getInputPorts() {
- return unmodifiableList(inputs);
- }
-
- /**
- * For each processor input, merge input and workflow output get the
- * incoming link and, if non null, add to a list and return the entire list.
- */
- @Override
- public synchronized List<? extends Datalink> getLinks() {
- List<Datalink> result = new ArrayList<>();
- /*
- * All processors have a set of input ports each of which has at most
- * one incoming data link
- */
- for (TokenProcessingEntity p : getEntities(TokenProcessingEntity.class))
- for (EventHandlingInputPort pip : p.getInputPorts()) {
- Datalink dl = pip.getIncomingLink();
- if (dl != null)
- result.add(dl);
- }
- /*
- * Workflow outputs have zero or one incoming data link to their
- * internal input port
- */
- for (DataflowOutputPort dop : getOutputPorts()) {
- Datalink dl = dop.getInternalInputPort().getIncomingLink();
- if (dl != null)
- result.add(dl);
- }
-
- return result;
- }
-
- /**
- * Return the list of all processors within the dataflow
- */
- @Override
- public synchronized List<? extends Processor> getProcessors() {
- return getEntities(Processor.class);
- }
-
- /**
- * Return the list of all merge operations within the dataflow
- */
- @Override
- public synchronized List<? extends Merge> getMerges() {
- return getEntities(Merge.class);
- }
-
- /**
- * Return all dataflow output ports
- */
- @Override
- public synchronized List<? extends DataflowOutputPort> getOutputPorts() {
- return unmodifiableList(outputs);
- }
-
- /**
- * Return the local name of this workflow
- */
- @Override
- public String getLocalName() {
- return this.name;
- }
-
- /**
- * Run the type check algorithm and return a report on any problems found.
- * This method must be called prior to actually pushing data through the
- * dataflow as it sets various properties as a side effect.
- *
- * If the workflow has been set immutable with {@link #setImmutable()},
- * subsequent calls to this method will return the cached
- * DataflowValidationReport.
- *
- */
- @Override
- public DataflowValidationReport checkValidity() {
- if (!immutable)
- // Don't store it!
- return checkValidityImpl();
- if (validationReport == null)
- validationReport = checkValidityImpl();
- return validationReport;
- }
-
- /**
- * Works out whether a dataflow is valid. <strong>This includes working out
- * the real depths of output ports.</strong>
- */
- public synchronized DataflowValidationReport checkValidityImpl() {
- // First things first - nullify the resolved depths in all datalinks
- for (Datalink dl : getLinks())
- if (dl instanceof DatalinkImpl)
- ((DatalinkImpl) dl).setResolvedDepth(-1);
- // Now copy type information from workflow inputs
- for (DataflowInputPort dip : getInputPorts())
- for (Datalink dl : dip.getInternalOutputPort().getOutgoingLinks())
- if (dl instanceof DatalinkImpl)
- ((DatalinkImpl) dl).setResolvedDepth(dip.getDepth());
-
- /*
- * ==================================================================
- * Now iteratively attempt to resolve everything else.
- * ==================================================================
- */
-
- /*
- * Firstly take a copy of the processor list, we'll processors from this
- * list as they become either failed or resolved
- */
- List<TokenProcessingEntity> unresolved = new ArrayList<>(
- getEntities(TokenProcessingEntity.class));
-
- // Keep a list of processors that have failed, initially empty
- List<TokenProcessingEntity> failed = new ArrayList<>();
-
- /**
- * Is the dataflow valid? The flow is valid if and only if both
- * unresolved and failed lists are empty at the end. This doesn't
- * guarantee that the workflow will run, in particular it doesn't
- * actually check for issues such as unresolved output edges.
- */
-
- // Flag to indicate whether we've finished yet, set to true if no
- // changes are made in an iteration
- boolean finished = false;
-
- Map<TokenProcessingEntity, DataflowValidationReport> invalidDataflows = new HashMap<>();
- while (!finished) {
- // We're finished unless something happens later
- finished = true;
- // Keep a list of processors to remove from the unresolved list
- // because they've been resolved properly
- List<TokenProcessingEntity> removeValidated = new ArrayList<>();
- // Keep another list of those that have failed
- List<TokenProcessingEntity> removeFailed = new ArrayList<>();
-
- for (TokenProcessingEntity p : unresolved)
- try {
- /*
- * true = checked and valid, false = can't check, the
- * exception means the processor was checked but was invalid
- * for some reason
- */
-
- if (p.doTypeCheck()) {
- removeValidated.add(p);
- /*
- * At least one thing validated; we will need to run the
- * check loop at least once more.
- */
- finished = false;
- }
- } catch (IterationTypeMismatchException e) {
- removeFailed.add(p);
- } catch (InvalidDataflowException e) {
- invalidDataflows.put(p, e.getDataflowValidationReport());
- removeFailed.add(p);
- }
-
- /*
- * Remove validated and failed items from the pending lists.
- */
- unresolved.removeAll(removeValidated);
- unresolved.removeAll(removeFailed);
- failed.addAll(removeFailed);
- }
-
- /*
- * At this point we know whether the processors within the workflow
- * validated. If all the processors validated then we're probably okay,
- * but there are a few other problems to check for. Firstly we need to
- * check whether all the dataflow outputs are connected; any unconnected
- * output is by definition a validation failure.
- */
- List<DataflowOutputPort> unresolvedOutputs = new ArrayList<>();
- for (DataflowOutputPortImpl dopi : outputs) {
- Datalink dl = dopi.getInternalInputPort().getIncomingLink();
- /*
- * Unset any type information on the output port, we'll set it again
- * later if there's a suitably populated link going into it
- */
- dopi.setDepths(-1, -1);
- if (dl == null)
- // not linked, this is by definition an unsatisfied link!
- unresolvedOutputs.add(dopi);
- else if (dl.getResolvedDepth() == -1)
- /*
- * linked but the edge hasn't had its depth resolved, i.e. it
- * links from an unresolved entity
- */
- unresolvedOutputs.add(dopi);
- else {
- /*
- * linked and edge depth is defined, we can therefore populate
- * the granular and real depth of the dataflow output port. Note
- * that this is the only way these values can be populated, you
- * don't define them when creating the ports as they are
- * entirely based on the type check stage.
- */
-
- int granularDepth = dl.getSource().getGranularDepth();
- int resolvedDepth = dl.getResolvedDepth();
- dopi.setDepths(resolvedDepth, granularDepth);
- }
- }
-
- /*
- * Check if workflow is 'incomplete' - i.e. if it contains no processors
- * and no output ports. This is to prevent empty workflows or ones that
- * contain input ports from being run.
- */
-
- boolean dataflowIsIncomplete = getProcessors().isEmpty()
- && getOutputPorts().isEmpty();
-
- /*
- * For a workflow to be valid - workflow must not be 'empty' and lists
- * of problems must all be empty
- */
-
- boolean dataflowValid = (!dataflowIsIncomplete)
- && unresolvedOutputs.isEmpty() && failed.isEmpty()
- && unresolved.isEmpty();
-
- /*
- * Build and return a new validation report containing the overall state
- * along with lists of failed and unsatisfied processors and unsatisfied
- * output ports
- */
-
- return new DataflowValidationReportImpl(dataflowValid,
- dataflowIsIncomplete, failed, unresolved, unresolvedOutputs,
- invalidDataflows);
- }
-
- /**
- * Gets all workflow entities of the specified type and returns as an
- * unmodifiable list of that type
- */
- @Override
- public <T extends NamedWorkflowEntity> List<? extends T> getEntities(
- Class<T> entityType) {
- List<T> result = new ArrayList<T>();
- filterAndAdd(processors, result, entityType);
- filterAndAdd(merges, result, entityType);
- return unmodifiableList(result);
- }
-
- private <T extends NamedWorkflowEntity> void filterAndAdd(
- Iterable<?> source, List<T> target, Class<T> type) {
- for (Object o : source)
- if (type.isAssignableFrom(o.getClass()))
- target.add(type.cast(o));
- }
-
- /**
- * The active process identifiers correspond to current strands of data
- * running through this dataflow.
- */
- private Set<String> activeProcessIdentifiers = new HashSet<>();
- private volatile boolean immutable;
-
- /**
- * Called when a token is received or the dataflow is fired, checks to see
- * whether the process identifier is already known (in which case we assume
- * it's been registered and can ignore it) or registers it with the monitor
- * along with all child entities. The method is called with the ID of the
- * new process, that is to say the ID of the token with ':'getLocalName()
- * appended.
- *
- * @param owningProcess
- *
- * @return true if the owning process specified was already in the active
- * process identifier set, false otherwise
- */
- protected boolean tokenReceived(String owningProcess,
- InvocationContext context) {
- synchronized (activeProcessIdentifiers) {
- if (activeProcessIdentifiers.contains(owningProcess))
- return true;
- MonitorManager.getInstance().registerNode(this, owningProcess);
-
- /*
- * Message each processor within the dataflow and instruct it to
- * register any properties with the monitor including any processor
- * level properties it can aggregate from its dispatch stack.
- */
-
- for (ProcessorImpl p : getEntities(ProcessorImpl.class)) {
- p.registerWithMonitor(owningProcess);
- if (p.getInputPorts().isEmpty())
- p.fire(owningProcess, context);
- }
- activeProcessIdentifiers.add(owningProcess);
- return false;
- }
- }
-
- /**
- * Sets the local name for the dataflow
- *
- * @param localName
- */
- public void setLocalName(String localName) {
- if (immutable)
- throw new UnsupportedOperationException("Dataflow is immutable");
- name = localName;
- }
-
- @Override
- public String toString() {
- return "Dataflow " + getLocalName() + "[" + getIdentifier() + "]";
- }
-
- @Override
- public void fire(String owningProcess, InvocationContext context) {
- String newOwningProcess = owningProcess + ":" + getLocalName();
- if (tokenReceived(newOwningProcess, context)) {
- /*
- * This is not good - should ideally handle it as it means the
- * workflow has been fired when in a state where this wasn't
- * sensible, i.e. already having been fired on this process
- * identifier. For now we'll ignore it (ho hum, release deadline
- * etc!)
- */
- }
- /*
- * The code below now happens in the tokenReceived method, we need to
- * fire any processors which don't have dependencies when a new token
- * arrives and we weren't doing that anywhere.
- */
- /**
- * for (Processor p : getEntities(Processor.class)) { if
- * (p.getInputPorts().isEmpty()) { p.fire(newOwningProcess, context); }
- * }
- */
- }
-
- @Override
- public FailureTransmitter getFailureTransmitter() {
- throw new UnsupportedOperationException(
- "Not implemented for DataflowImpl yet");
- }
-
- @Override
- public boolean doTypeCheck() throws IterationTypeMismatchException {
- throw new UnsupportedOperationException(
- "Not implemented for DataflowImpl yet");
- }
-
- public void refreshInternalIdentifier() {
- setIdentifier(UUID.randomUUID().toString());
- }
-
- @Override
- public String getIdentifier() {
- return internalIdentifier;
- }
-
- @Override
- public String recordIdentifier() {
- addDataflowIdentification(this, internalIdentifier, new EditsImpl());
- return internalIdentifier;
- }
-
- void setIdentifier(String id) {
- if (immutable)
- throw new UnsupportedOperationException("Dataflow is immutable");
- this.internalIdentifier = id;
- }
-
- @Override
- public boolean isInputPortConnected(DataflowInputPort inputPort) {
- for (Datalink link : getLinks())
- if (link.getSource().equals(inputPort.getInternalOutputPort()))
- return true;
- return false;
- }
-
- @Override
- public synchronized void setImmutable() {
- if (immutable)
- return;
- processors = unmodifiableList(processors);
- merges = unmodifiableList(merges);
- outputs = unmodifiableList(outputs);
- inputs = unmodifiableList(inputs);
- immutable = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowInputPortImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowInputPortImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowInputPortImpl.java
deleted file mode 100644
index 804bc5a..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowInputPortImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.EventForwardingOutputPort;
-
-public class DataflowInputPortImpl extends AbstractEventHandlingInputPort
- implements DataflowInputPort {
- protected BasicEventForwardingOutputPort internalOutput;
- private int granularInputDepth;
- private Dataflow dataflow;
-
- DataflowInputPortImpl(String name, int depth, int granularDepth, Dataflow df) {
- super(name, depth);
- granularInputDepth = granularDepth;
- dataflow = df;
- internalOutput = new BasicEventForwardingOutputPort(name, depth,
- granularDepth);
- }
-
- @Override
- public int getGranularInputDepth() {
- return granularInputDepth;
- }
-
- void setDepth(int depth) {
- this.depth = depth;
- internalOutput.setDepth(depth);
- }
-
- void setGranularDepth(int granularDepth) {
- this.granularInputDepth = granularDepth;
- internalOutput.setGranularDepth(granularDepth);
- }
-
- @Override
- public EventForwardingOutputPort getInternalOutputPort() {
- return internalOutput;
- }
-
- /**
- * Receive an input event, relay it through the internal output port to all
- * connected entities
- */
- @Override
- public void receiveEvent(WorkflowDataToken t) {
- WorkflowDataToken transformedToken = t.pushOwningProcess(dataflow.getLocalName());
- /*
- * I'd rather avoid casting to the implementation but in this case we're
- * in the same package - the only reason to do this is to allow dummy
- * implementations of parts of this infrastructure during testing, in
- * 'real' use this should always be a dataflowimpl
- */
- if (dataflow instanceof DataflowImpl)
- ((DataflowImpl) dataflow).tokenReceived(transformedToken
- .getOwningProcess(), t.getContext());
- for (Datalink dl : internalOutput.getOutgoingLinks())
- dl.getSink().receiveEvent(transformedToken);
- }
-
- @Override
- public Dataflow getDataflow() {
- return dataflow;
- }
-
- @Override
- public void setName(String newName) {
- this.name = newName;
- internalOutput.setName(newName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowOutputPortImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowOutputPortImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowOutputPortImpl.java
deleted file mode 100644
index a341628..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowOutputPortImpl.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import static java.util.Collections.synchronizedList;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import net.sf.taverna.t2.facade.ResultListener;
-import net.sf.taverna.t2.invocation.WorkflowDataToken;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.EventHandlingInputPort;
-
-class DataflowOutputPortImpl extends BasicEventForwardingOutputPort
- implements DataflowOutputPort {
- protected InternalInputPort internalInput;
- /**
- * Remember to synchronize access to this list
- */
- protected List<ResultListener> resultListeners = synchronizedList(new ArrayList<ResultListener>());
- private Dataflow dataflow;
-
- DataflowOutputPortImpl(String portName, Dataflow dataflow) {
- super(portName, -1, -1);
- this.dataflow = dataflow;
- this.internalInput = new InternalInputPort(name, dataflow, portName);
- }
-
- @Override
- public EventHandlingInputPort getInternalInputPort() {
- return this.internalInput;
- }
-
- @Override
- public Dataflow getDataflow() {
- return this.dataflow;
- }
-
- void setDepths(int depth, int granularDepth) {
- this.depth = depth;
- this.granularDepth = granularDepth;
- }
-
- @Override
- public void addResultListener(ResultListener listener) {
- resultListeners.add(listener);
- }
-
- @Override
- public void removeResultListener(ResultListener listener) {
- resultListeners.remove(listener);
- }
-
- @Override
- public void setName(String newName) {
- this.name = newName;
- internalInput.setName(newName);
- }
-
- /** This makes a thread-safe copy. */
- private List<ResultListener> getListeners() {
- synchronized (resultListeners) {
- return new ArrayList<>(resultListeners);
- }
- }
-
- private class InternalInputPort extends AbstractEventHandlingInputPort {
- InternalInputPort(String name, Dataflow dataflow, String portName) {
- super(name, -1);
- }
-
- /**
- * Forward the event through the output port Also informs any
- * ResultListeners on the output port to the new token.
- */
- @Override
- public void receiveEvent(WorkflowDataToken token) {
- WorkflowDataToken newToken = token.popOwningProcess();
- sendEvent(newToken);
- for (ResultListener listener : getListeners())
- listener.resultTokenProduced(newToken, this.getName());
- }
-
- /**
- * Always copy the value of the enclosing dataflow output port
- */
- @Override
- public int getDepth() {
- return DataflowOutputPortImpl.this.getDepth();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowValidationReportImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowValidationReportImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowValidationReportImpl.java
deleted file mode 100644
index 9e2abc0..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DataflowValidationReportImpl.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import static java.util.Collections.unmodifiableList;
-import static java.util.Collections.unmodifiableMap;
-
-import java.util.List;
-import java.util.Map;
-
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.DataflowValidationReport;
-import net.sf.taverna.t2.workflowmodel.TokenProcessingEntity;
-
-/**
- * Simple implementation of the DataflowValidationReport interface
- *
- * @author Tom Oinn
- */
-public class DataflowValidationReportImpl implements DataflowValidationReport {
- private final List<TokenProcessingEntity> failed;
- private final Map<TokenProcessingEntity, DataflowValidationReport> invalidDataflows;
- private final List<DataflowOutputPort> unresolvedOutputs;
- private final List<TokenProcessingEntity> unsatisfied;
- private boolean valid;
- /**
- * whether a workflow is incomplete (contains no processors and no output
- * ports), in which case it also must be invalid
- */
- private boolean isWorkflowIncomplete;
-
- DataflowValidationReportImpl(
- boolean isValid,
- boolean isWorkflowIncomplete,
- List<TokenProcessingEntity> failedProcessors,
- List<TokenProcessingEntity> unsatisfiedProcessors,
- List<DataflowOutputPort> unresolvedOutputs,
- Map<TokenProcessingEntity, DataflowValidationReport> invalidDataflows) {
- this.valid = isValid;
- this.isWorkflowIncomplete = isWorkflowIncomplete;
- this.invalidDataflows = unmodifiableMap(invalidDataflows);
- this.failed = unmodifiableList(failedProcessors);
- this.unsatisfied = unmodifiableList(unsatisfiedProcessors);
- this.unresolvedOutputs = unmodifiableList(unresolvedOutputs);
- }
-
- @Override
- public List<? extends TokenProcessingEntity> getFailedEntities() {
- return failed;
- }
-
- @Override
- public Map<TokenProcessingEntity, DataflowValidationReport> getInvalidDataflows() {
- return invalidDataflows;
- }
-
- @Override
- public List<? extends DataflowOutputPort> getUnresolvedOutputs() {
- return unresolvedOutputs;
- }
-
- @Override
- public List<? extends TokenProcessingEntity> getUnsatisfiedEntities() {
- return unsatisfied;
- }
-
- @Override
- public boolean isValid() {
- return valid;
- }
-
- @Override
- public boolean isWorkflowIncomplete() {
- return isWorkflowIncomplete;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DatalinkImpl.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DatalinkImpl.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DatalinkImpl.java
deleted file mode 100644
index c33d83f..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/DatalinkImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.annotation.AbstractAnnotatedThing;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.EventForwardingOutputPort;
-import net.sf.taverna.t2.workflowmodel.EventHandlingInputPort;
-
-/**
- * Naive bean implementation of Datalink
- *
- * @author Tom Oinn
- */
-class DatalinkImpl extends AbstractAnnotatedThing<Datalink> implements Datalink {
- private EventForwardingOutputPort source;
- private EventHandlingInputPort sink;
- private transient int resolvedDepth = -1;
-
- @Override
- public int getResolvedDepth() {
- return this.resolvedDepth;
- }
-
- protected void setResolvedDepth(int newResolvedDepth) {
- this.resolvedDepth = newResolvedDepth;
- }
-
- protected DatalinkImpl(EventForwardingOutputPort source,
- EventHandlingInputPort sink) {
- this.source = source;
- this.sink = sink;
- }
-
- @Override
- public EventHandlingInputPort getSink() {
- return sink;
- }
-
- @Override
- public EventForwardingOutputPort getSource() {
- return source;
- }
-
- @Override
- public String toString() {
- return "link(" + resolvedDepth + ")" + source.getName() + ":"
- + sink.getName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/5f1ddb71/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditSupport.java
----------------------------------------------------------------------
diff --git a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditSupport.java b/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditSupport.java
deleted file mode 100644
index e7447ff..0000000
--- a/taverna-workflowmodel-impl/src/main/java/net/sf/taverna/t2/workflowmodel/impl/EditSupport.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package net.sf.taverna.t2.workflowmodel.impl;
-
-import net.sf.taverna.t2.workflowmodel.Edit;
-import net.sf.taverna.t2.workflowmodel.EditException;
-
-abstract class EditSupport<T> implements Edit<T> {
- protected boolean applied;
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final T doEdit() throws EditException {
- if (applied)
- throw new EditException("Edit has already been applied!");
- T result = applyEdit();
- applied = true;
- return result;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final boolean isApplied() {
- return applied;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final void undo() {
- if (!applied)
- throw new RuntimeException(
- "Attempt to undo edit that was never applied");
- applyUndo();
- }
-
- protected abstract T applyEdit() throws EditException;
-
- protected void applyUndo() {
- throw new UnsupportedOperationException(
- "undo not supported by this interface in Taverna 3");
- }
-}