You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/17 21:43:19 UTC
[15/51] [partial] incubator-taverna-engine git commit: temporarily
empty repository
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
deleted file mode 100644
index 291da99..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
+++ /dev/null
@@ -1,1547 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-import static java.util.Collections.synchronizedList;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.getDataItemAsXML;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.iterationToString;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.parentProcess;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ACTIVITY_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.END_WORKFLOW_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.INVOCATION_STARTED_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ITERATION_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESSOR_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESS_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.WORKFLOW_DATA_EVENT_TYPE;
-
-import java.beans.ExceptionListener;
-import java.beans.XMLEncoder;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.sql.Blob;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.sql.rowset.serial.SerialBlob;
-
-import net.sf.taverna.t2.provenance.item.DataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.DataflowRunComplete;
-import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.lineageservice.utils.DataBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.DataLink;
-import net.sf.taverna.t2.provenance.lineageservice.utils.NestedListNode;
-import net.sf.taverna.t2.provenance.lineageservice.utils.Port;
-import net.sf.taverna.t2.provenance.lineageservice.utils.PortBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorEnactment;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils;
-import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.InputPort;
-import net.sf.taverna.t2.workflowmodel.MergeInputPort;
-import net.sf.taverna.t2.workflowmodel.MergeOutputPort;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.ProcessorInputPort;
-import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.NestedDataflow;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-//import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.JDOMException;
-import org.jdom.Namespace;
-import org.jdom.input.SAXBuilder;
-import org.jdom.output.XMLOutputter;
-
-import uk.org.taverna.scufl2.api.io.WorkflowBundleIO;
-
-/**
- * @author Paolo Missier
- */
-public class EventProcessor {
- /**
- * A map of UUIDs of the originating processor to the ProcBinding object
- * that contains its parameters
- */
- private Map<String, ProcessorBinding> procBindingMap = new ConcurrentHashMap<>();
-
- /** A map of child ids to their parents in the hierarchy of events:
- * workflow -> process -> processor -> activity -> iteration
- */
- private Map<String, String> parentChildMap= new ConcurrentHashMap<>();
-
- private static Logger logger = Logger.getLogger(EventProcessor.class);
-
- private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_";
- private static final String INPUT_CONTAINER_PROCESSOR = "_INPUT_";
-
- private volatile boolean workflowStructureDone = false; // used to inhibit processing of multiple workflow events -- we only need the first
- private volatile String workflowRunId = null; // unique run ID. set when we see the first event of type "process"
-
- String topLevelDataflowName = null;
- String topLevelDataflowID = null;
-
- Map<String, String> wfNestingMap = new ConcurrentHashMap<>();
-
- // all input bindings are accumulated here so they can be "backpatched" (see backpatching() )
- List<PortBinding> allInputVarBindings = synchronizedList(new ArrayList<PortBinding>());
-
- // dedicated class for processing WorkflowData events which carry workflow output info
- private WorkflowDataProcessor wfdp;
- private ProvenanceWriter pw = null;
- private ProvenanceQuery pq = null;
-
- private HashMap<String, Port> mapping;
-
- private Map<String, ProcessorEnactment> processorEnactmentMap = new ConcurrentHashMap<>();
-
- private Map<String, ProvenanceProcessor> processorMapById = new ConcurrentHashMap<>();
-
- private WorkflowBundleIO io;
-
- // Backpatching temporarily disabled
- private static final boolean backpatching = false;
-
- public EventProcessor(WorkflowBundleIO io) {
- this.io = io;
- }
-
- /**
- * @param pw
- * @throws SQLException
- * @throws ClassNotFoundException
- * @throws IllegalAccessException
- * @throws InstantiationException
- *
- */
- public EventProcessor(ProvenanceWriter pw, ProvenanceQuery pq,
- WorkflowDataProcessor wfdp,WorkflowBundleIO io) throws InstantiationException,
- IllegalAccessException, ClassNotFoundException, SQLException {
- this.pw = pw;
- this.pq = pq;
- this.wfdp = wfdp;
- this.io = io;
-
- //logger.setLevel((Level) Level.INFO);
- }
-
- /**
- * this is the new version that makes use of the T2 deserializer
- * populate static portion of the DB<br/>
- * the static structure may already be in the DB -- this is detected as a duplicate top-level workflow ID.
- * In this case, we skip this processing altogether
- * @param content
- * is a serialized dataflow (XML) -- this is parsed using the T2
- * Deserializer
- * @return the workflowRunId for this workflow structure
- */
- public String processWorkflowStructure(ProvenanceItem provenanceItem) {
- /*
- * this flag is set to prevent processing of separate
- * workflowProvenanceItems that describe nested workflows. the
- * processing of all nested workflows is done as part of the very first
- * workflowProvenanceItem that we receive, which is self-consistent. so
- * we ignore all others
- */
- if (workflowStructureDone)
- return null;
- WorkflowProvenanceItem wpi = (WorkflowProvenanceItem) provenanceItem;
- setWorkflowRunId(wpi.getIdentifier());
- workflowStructureDone = true;
- return processWorkflowStructure(wpi.getDataflow());
- }
-
- public String processWorkflowStructure(Dataflow df) {
- topLevelDataflowName = df.getLocalName();
- topLevelDataflowID = df.getIdentifier();
-
- // check whether we already have this WF in the DB
- List<String> workflowIds = null;
- try {
- workflowIds = pq.getAllworkflowIds();
- } catch (SQLException e) {
- logger.warn("Problem processing workflow structure", e);
- }
-
- if (workflowIds == null || workflowIds.contains(topLevelDataflowID)) {
- // not already in the DB
- logger.info("new workflow structure with ID " + topLevelDataflowID);
- ProvenanceProcessor provProc = new ProvenanceProcessor();
- provProc.setIdentifier(UUID.randomUUID().toString());
- provProc.setProcessorName(topLevelDataflowName);
- provProc.setFirstActivityClassName(DATAFLOW_ACTIVITY);
- provProc.setWorkflowId(topLevelDataflowID);
- provProc.setTopLevelProcessor(true);
- // record the top level dataflow as a processor in the DB
- try {
- pw.addProcessor(provProc);
- // pw.addProcessor(topLevelDataflowName, DATAFLOW_PROCESSOR_TYPE, topLevelDataflowID, true); // true -> is top level
- } catch (SQLException e) {
- logger.warn("Can't add processor " + topLevelDataflowID, e);
- }
- }
-
- return processDataflowStructure(df, topLevelDataflowID, df.getLocalName()); // null: no external name given to top level dataflow
- }
-
- private Blob serialize(Dataflow df) {
- Element serializeDataflow = null;xmlSerializer.serializeDataflow(df);//FIXME
- String dataflowString = null;
- try {
- XMLOutputter outputter = new XMLOutputter();
- StringWriter stringWriter = new StringWriter();
- outputter.output(serializeDataflow, stringWriter);
- dataflowString = stringWriter.toString();
- } catch (java.io.IOException e) {
- logger.error("Could not serialise dataflow", e);
- // FIXME Bad Exception handling!
- }
- return new SerialBlob(dataflowString.getBytes("UTF-8"));
- }
-
- /**
- * note: this method can be called as part of a recursion on sub-workflows
- *
- * @param df
- * @param dataflowID
- * the UUID for the entire dataflow (may be a sub-dataflow)
- * @param localName
- * the external name of the dataflow. Null if this is top level,
- * not null if a sub-dataflow
- * @return the workflowRunId for this workflow structure
- */
- private String processDataflowStructure(Dataflow df, String dataflowID, String externalName) {
- String localWorkflowRunID = getWorkflowRunId();
-
- //dataflowDepth++;
-
- try {
- // check whether we already have this WF in the DB
- boolean alreadyInDb;
- try {
- List<String> workflowIds = pq.getAllworkflowIds();
- alreadyInDb = workflowIds != null && workflowIds.contains(dataflowID);
- } catch (SQLException e) {
- logger.warn("Problem processing dataflow structure for " + dataflowID, e);
- alreadyInDb = false;
- }
-
- // add workflow ID -- this is NOT THE SAME AS the workflowRunId
-
- /*
- * this could be a nested workflow -- in this case, override its
- * workflowRunId with that of its parent
- */
- if (!alreadyInDb) {
- String parentDataflow = wfNestingMap.get(dataflowID);
- Blob blob = serialize(df);
- if (parentDataflow == null) {
- // this is a top level dataflow description
- pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent
-
- } else {
- // we are processing a nested workflow structure
- logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow);
-
- pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent
-
- // override workflowRunId to point to top level -- UNCOMMENTED PM 9/09 CHECK
- localWorkflowRunID = pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId();
- }
- }
- // Log the run itself
- pw.addWorkflowRun(dataflowID, localWorkflowRunID);
-
- // add processors along with their variables
- List<Port> vars = new ArrayList<Port>();
- for (Processor p : df.getProcessors()) {
- String pName = p.getLocalName();
-
- //CHECK get type of first activity and set this as the type of the processor itself
- List<? extends Activity<?>> activities = p.getActivityList();
-
- if (! alreadyInDb) {
- ProvenanceProcessor provProc;
- String pType = null;
- if (activities != null && !activities.isEmpty())
- pType = activities.get(0).getClass().getCanonicalName();
- provProc = new ProvenanceProcessor();
- provProc.setIdentifier(UUID.randomUUID().toString());
- provProc.setProcessorName(pName);
- provProc.setFirstActivityClassName(pType);
- provProc.setWorkflowId(dataflowID);
- provProc.setTopLevelProcessor(false);
-
- pw.addProcessor(provProc);
-
- //pw.addProcessor(pName, pType, dataflowID, false); // false: not a top level processor
-
- /*
- * add all input ports for this processor as input variables
- */
- for (ProcessorInputPort ip : p.getInputPorts()) {
- Port inputVar = new Port();
- inputVar.setIdentifier(UUID.randomUUID().toString());
- inputVar.setProcessorId(provProc.getIdentifier());
- inputVar.setProcessorName(pName);
- inputVar.setWorkflowId(dataflowID);
- inputVar.setPortName(ip.getName());
- inputVar.setDepth(ip.getDepth());
- inputVar.setInputPort(true);
- vars.add(inputVar);
- }
-
- /*
- * add all output ports for this processor as output
- * variables
- */
- for (ProcessorOutputPort op : p.getOutputPorts()) {
- Port outputVar = new Port();
- outputVar.setIdentifier(UUID.randomUUID().toString());
- outputVar.setProcessorName(pName);
- outputVar.setProcessorId(provProc.getIdentifier());
- outputVar.setWorkflowId(dataflowID);
- outputVar.setPortName(op.getName());
- outputVar.setDepth(op.getDepth());
- outputVar.setInputPort(false);
-
- vars.add(outputVar);
- }
- }
-
- /*
- * check for nested structures: if the activity is
- * DataflowActivity then this processor is a nested workflow;
- * make an entry into wfNesting map with its ID and recurse on
- * the nested workflow
- */
-
- if (activities != null)
- for (Activity<?> a : activities) {
- if (!(a instanceof NestedDataflow))
- continue;
-
- Dataflow nested = ((NestedDataflow) a)
- .getNestedDataflow();
- wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent
-
- // RECURSIVE CALL
- processDataflowStructure(nested,
- nested.getIdentifier(), p.getLocalName());
- }
- } // end for each processor
-
- // add inputs to entire dataflow
- String pName = INPUT_CONTAINER_PROCESSOR; // overridden -- see below
-
- /*
- * check whether we are processing a nested workflow. in this case
- * the input vars are not assigned to the INPUT processor but to the
- * containing dataflow
- */
- if (! alreadyInDb) {
- if (externalName != null) // override the default if we are nested or someone external name is provided
- pName = externalName;
-
- for (DataflowInputPort ip : df.getInputPorts()) {
- Port inputVar = new Port();
- inputVar.setIdentifier(UUID.randomUUID().toString());
- inputVar.setProcessorId(null); // meaning workflow port
- inputVar.setProcessorName(pName);
- inputVar.setWorkflowId(dataflowID);
- inputVar.setPortName(ip.getName());
- inputVar.setDepth(ip.getDepth());
- inputVar.setInputPort(true); // CHECK PM modified 11/08 -- input vars are actually outputs of input processors...
-
- vars.add(inputVar);
- }
-
- // add outputs of entire dataflow
- pName = OUTPUT_CONTAINER_PROCESSOR; // overridden -- see below
-
- /*
- * check whether we are processing a nested workflow. in this
- * case the output vars are not assigned to the OUTPUT processor
- * but to the containing dataflow
- */
- if (externalName != null) // we are nested
- pName = externalName;
-
- for (DataflowOutputPort op : df.getOutputPorts()) {
- Port outputVar = new Port();
- outputVar.setIdentifier(UUID.randomUUID().toString());
- outputVar.setProcessorId(null); // meaning workflow port
- outputVar.setProcessorName(pName);
- outputVar.setWorkflowId(dataflowID);
- outputVar.setPortName(op.getName());
- outputVar.setDepth(op.getDepth());
- outputVar.setInputPort(false); // CHECK PM modified 11/08 -- output vars are actually outputs of output processors...
- vars.add(outputVar);
- }
-
- pw.addPorts(vars, dataflowID);
- makePortMapping(vars);
-
- /*
- * add datalink records using the dataflow links retrieving the
- * processor names requires navigating from links to source/sink
- * and from there to the processors
- */
- for (Datalink l : df.getLinks()) {
- // TODO cover the case of datalinks from an input and to an output to the entire dataflow
-
- Port sourcePort = null;
- Port destinationPort = null;
-
- OutputPort source = l.getSource();
- if (source instanceof ProcessorOutputPort) {
- String sourcePname = ((ProcessorOutputPort) source)
- .getProcessor().getLocalName();
- sourcePort = lookupPort(sourcePname, source.getName(), false);
- } else if (source instanceof MergeOutputPort) {
- // TODO: Handle merge output ports
- } else
- // Assume it is internal port from DataflowInputPort
- sourcePort = lookupPort(externalName, source.getName(), true);
-
- InputPort sink = l.getSink();
- if (sink instanceof ProcessorInputPort) {
- String sinkPname = ((ProcessorInputPort) sink)
- .getProcessor().getLocalName();
- destinationPort = lookupPort(sinkPname, sink.getName(), true);
- } else if (sink instanceof MergeInputPort) {
- // TODO: Handle merge input ports
- } else
- // Assume it is internal port from DataflowOutputPort
- destinationPort = lookupPort(externalName, sink.getName(), false);
-
- if (sourcePort != null && destinationPort != null)
- pw.addDataLink(sourcePort, destinationPort, dataflowID);
- else
- logger.info("Can't record datalink " + l);
- }
- }
- } catch (Exception e) {
- logger.error("Problem processing provenance for dataflow", e);
- }
-
- return dataflowID;
- }
-
- private void makePortMapping(List<Port> ports) {
- mapping = new HashMap<>();
- for (Port port: ports) {
- String key = port.getProcessorName()
- + (port.isInputPort() ? "/i:" : "/o:") + port.getPortName();
- mapping.put(key, port);
- }
- }
-
- private Port lookupPort(String processorName, String portName, boolean isInputPort) {
- String key = processorName + (isInputPort ? "/i:" : "/o:") + portName;
- return mapping.get(key);
- }
-
- /**
- * processes an elementary process execution event from T2. Collects info
- * from events as they happen and sends them to the writer for processing
- * when the iteration event is received. Uses the map of procBindings to
- * process event id and the map of child ids to parent ids to ensure that
- * the correct proc binding is used
- * @param currentWorkflowID
- *
- * @param d
- * @param context
- */
- public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) {
- switch (provenanceItem.getEventType()) {
- case PROCESS_EVENT_TYPE: {
- String parentId = provenanceItem.getParentId(); // this is the workflowID
- String identifier = provenanceItem.getIdentifier(); // use this as workflowRunId if this is the top-level process
-
- parentChildMap.put(identifier, parentId);
- ProcessorBinding pb = new ProcessorBinding();
- pb.setWorkflowRunId(getWorkflowRunId());
- pb.setWorkflowId(currentWorkflowID);
- procBindingMap.put(identifier, pb);
- return;
- }
- case PROCESSOR_EVENT_TYPE: {
- String identifier = provenanceItem.getIdentifier();
- String parentId = provenanceItem.getParentId();
- String processID = provenanceItem.getProcessId(); // this is the external process ID
-
- // this has the weird form facade0:dataflowname:pname need to extract pname from here
- String[] processName = processID.split(":");
- procBindingMap.get(parentId).setProcessorName(
- processName[processName.length - 1]);
- // 3rd component of composite name
-
- parentChildMap.put(identifier, parentId);
- return;
- }
- case ACTIVITY_EVENT_TYPE: {
- String identifier = provenanceItem.getIdentifier();
- String parentId = provenanceItem.getParentId();
- procBindingMap.get(parentChildMap.get(parentId))
- .setFirstActivityClassName(identifier);
- parentChildMap.put(identifier, parentId);
- return;
- }
- case ITERATION_EVENT_TYPE: {
- IterationProvenanceItem iterationProvenanceItem = (IterationProvenanceItem)provenanceItem;
- if (iterationProvenanceItem.getParentIterationItem() != null)
- // Skipping pipelined outputs, we'll process the parent output later instead
- return;
-
- // traverse up to root to retrieve ProcBinding that was created when we saw the process event
- String activityID = provenanceItem.getParentId();
- String processorID = parentChildMap.get(activityID);
- String processID = parentChildMap.get(processorID);
- String iterationID = provenanceItem.getIdentifier();
- parentChildMap.put(iterationID, activityID);
-
- ProcessorEnactment processorEnactment = processorEnactmentMap
- .get(iterationID);
- if (processorEnactment == null)
- processorEnactment = new ProcessorEnactment();
-
- ProcessorBinding procBinding = procBindingMap.get(processID);
-
- String itVector = extractIterationVector(iterationToString(iterationProvenanceItem
- .getIteration()));
- procBinding.setIterationVector(itVector);
-
- processorEnactment.setEnactmentStarted(iterationProvenanceItem
- .getEnactmentStarted());
- processorEnactment.setEnactmentEnded(iterationProvenanceItem
- .getEnactmentEnded());
- processorEnactment.setWorkflowRunId(workflowRunId);
- processorEnactment.setIteration(itVector);
-
- String processId = iterationProvenanceItem.getProcessId();
- String parentProcessId = parentProcess(processId, 3);
- if (parentProcessId != null) {
- ProcessorEnactment parentProcEnact = getWfdp().invocationProcessToProcessEnactment
- .get(parentProcessId);
- if (parentProcEnact != null)
- processorEnactment
- .setParentProcessorEnactmentId(parentProcEnact
- .getProcessEnactmentId());
- }
- processorEnactment.setProcessEnactmentId(iterationProvenanceItem
- .getIdentifier());
- processorEnactment.setProcessIdentifier(processId);
-
- ProvenanceProcessor provenanceProcessor;
- if (processorEnactment.getProcessorId() == null) {
- provenanceProcessor = pq.getProvenanceProcessorByName(
- currentWorkflowID, procBinding.getProcessorName());
- if (provenanceProcessor == null)
- // already logged warning
- return;
- processorMapById.put(provenanceProcessor.getIdentifier(),
- provenanceProcessor);
- processorEnactment.setProcessorId(provenanceProcessor
- .getIdentifier());
- } else {
- provenanceProcessor = processorMapById.get(processorEnactment
- .getProcessorId());
- if (provenanceProcessor == null) {
- provenanceProcessor = pq
- .getProvenanceProcessorById(processorEnactment
- .getProcessorId());
- processorMapById.put(provenanceProcessor.getIdentifier(),
- provenanceProcessor);
- }
- }
-
- InputDataProvenanceItem inputDataEl = iterationProvenanceItem.getInputDataItem();
- OutputDataProvenanceItem outputDataEl = iterationProvenanceItem.getOutputDataItem();
-
- if (inputDataEl != null
- && processorEnactment.getInitialInputsDataBindingId() == null) {
- processorEnactment
- .setInitialInputsDataBindingId(processDataBindings(
- inputDataEl, provenanceProcessor));
- processInput(inputDataEl, procBinding, currentWorkflowID);
- }
-
- if (outputDataEl != null
- && processorEnactment.getFinalOutputsDataBindingId() == null) {
- processorEnactment
- .setFinalOutputsDataBindingId(processDataBindings(
- outputDataEl, provenanceProcessor));
- processOutput(outputDataEl, procBinding, currentWorkflowID);
- }
-
- try {
- if (processorEnactmentMap.containsKey(iterationID)) {
- getPw().updateProcessorEnactment(processorEnactment);
- } else {
- getPw().addProcessorEnactment(processorEnactment);
- processorEnactmentMap.put(iterationID, processorEnactment);
- }
- } catch (SQLException e) {
- logger.warn("Could not store processor enactment", e);
- }
- return;
- }
- case END_WORKFLOW_EVENT_TYPE: {
- DataflowRunComplete completeEvent = (DataflowRunComplete) provenanceItem;
- // use this event to do housekeeping on the input/output varbindings
-
- // process the input and output values accumulated by WorkflowDataProcessor
- getWfdp().processTrees(completeEvent, getWorkflowRunId());
-
- reconcileLocalOutputs(provenanceItem.getWorkflowId());
-
- if (! provenanceItem.getProcessId().contains(":")) {
- // Top-level workflow finished
- // No longer needed, done by processTrees()
-// patchTopLevelnputs();
-
- workflowStructureDone = false; // CHECK reset for next run...
-// reconcileTopLevelOutputs(); // Done by reconcileLocalOutputs
- getPw().closeCurrentModel(); // only real impl is for RDF
- }
- return;
- }
- case WORKFLOW_DATA_EVENT_TYPE: {
- // give this event to a WorkflowDataProcessor object for pre-processing
- // try {
- // TODO may generate an exception when the data is an error CHECK
- getWfdp().addWorkflowDataItem(provenanceItem);
- // } catch (NumberFormatException e) {
- // logger.error(e);
- // }
- // logger.info("Received workflow data - not processing");
- //FIXME not sure - needs to be stored somehow
- return;
- }
- case INVOCATION_STARTED_EVENT_TYPE: {
- InvocationStartedProvenanceItem startedItem = (InvocationStartedProvenanceItem) provenanceItem;
- ProcessorEnactment processorEnactment = processorEnactmentMap
- .get(startedItem.getParentId());
- if (processorEnactment == null) {
- logger.error("Could not find ProcessorEnactment for invocation "
- + startedItem);
- return;
- }
- getWfdp().invocationProcessToProcessEnactment.put(
- startedItem.getInvocationProcessId(), processorEnactment);
- return;
- }
- case ERROR_EVENT_TYPE:
- //TODO process the error
- return;
- default:
- // TODO broken, should we throw something here?
- return;
- }
- }
-
- private String processDataBindings(
- DataProvenanceItem provenanceItem, ProvenanceProcessor provenanceProcessor) {
- // TODO: Cache known provenaneItems and avoid registering again
- String dataBindingId = UUID.randomUUID().toString();
- boolean isInput = provenanceItem instanceof InputDataProvenanceItem;
-
- for (Entry<String, T2Reference> entry : provenanceItem.getDataMap().entrySet()) {
- DataBinding dataBinding = new DataBinding();
- dataBinding.setDataBindingId(dataBindingId);
- Port port = findPort(provenanceProcessor, entry.getKey(), isInput); // findPort
- if (port == null) {
- logger.warn("Could not find port for " + entry.getKey());
- continue;
- }
- dataBinding.setPort(port);
- dataBinding.setT2Reference(entry.getValue().toUri().toASCIIString());
- dataBinding.setWorkflowRunId(workflowRunId);
- try {
- getPw().addDataBinding(dataBinding);
- } catch (SQLException e) {
- logger.warn("Could not register data binding for " + port, e);
- }
- }
- return dataBindingId;
- }
-
- private Port findPort(ProvenanceProcessor provenanceProcessor,
- String portName, boolean isInput) {
- // TODO: Query pr dataflow and cache
- Map<String, String> queryConstraints = new HashMap<>();
- queryConstraints.put("V.workflowId",
- provenanceProcessor.getWorkflowId());
- String processorName = provenanceProcessor.getProcessorName();
- queryConstraints.put("processorName", processorName);
- queryConstraints.put("portName", portName);
- queryConstraints.put("isInputPort", isInput ? "1" : "0");
- try {
- List<Port> vars = pq.getPorts(queryConstraints);
- if (vars.isEmpty()) {
- logger.warn("Can't find port " + portName + " in "
- + processorName);
- } else if (vars.size() > 1) {
- logger.warn("Multiple matches for port " + portName + " in "
- + processorName + ", got:" + vars);
- } else
- return vars.get(0);
- } catch (SQLException e) {
- logger.error(
- "Problem getting ports for processor: " + processorName
- + " worflow: "
- + provenanceProcessor.getWorkflowId(), e);
- }
- return null;
- }
-
-
- /**
- * fills in the VBs for the global inputs -- this removes the need for explicit events
- * that account for these value bindings...
- */
- public void patchTopLevelnputs() {
-
- // for each input I to topLevelDataflow:
- // pick first outgoing datalink with sink P:X
- // copy value X to I -- this can be a collection, so copy everything
-
- // get all global input vars
-
- // logger.info("\n\n BACKPATCHING GLOBAL INPUTS with dataflowDepth = "+dataflowDepth+"*******\n");
-
- List<Port> inputs=null;
- try {
- inputs = getPq().getInputPorts(topLevelDataflowName, topLevelDataflowID);
-
- for (Port input:inputs) {
-
- // logger.info("global input: "+input.getVName());
-
- Map<String,String> queryConstraints = new HashMap<String,String>();
-
-// queryConstraints.put("sourcePortName", input.getVName());
-// queryConstraints.put("sourceProcessorName", input.getPName());
- queryConstraints.put("sourcePortId", input.getIdentifier());
- queryConstraints.put("workflowId", input.getWorkflowId());
- List<DataLink> outgoingDataLinks = getPq().getDataLinks(queryConstraints);
-
- // any datalink will do, use the first
- String targetPname = outgoingDataLinks.get(0).getDestinationProcessorName();
- String targetVname = outgoingDataLinks.get(0).getDestinationPortName();
-
-// logger.info("copying values from ["+targetPname+":"+targetVname+"] for instance ID: ["+workflowRunId+"]");
-
- queryConstraints.clear();
- queryConstraints.put("V.portName", targetVname);
- queryConstraints.put("V.processorName", targetPname);
- queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
- queryConstraints.put("V.workflowId", topLevelDataflowID);
-
- for (PortBinding vb : getPq().getPortBindings(queryConstraints)) {
- PortBinding inputPortBinding = new PortBinding(vb);
-
- // insert PortBinding back into VB with the global input portName
- inputPortBinding.setProcessorName(input.getProcessorName());
- inputPortBinding.setPortName(input.getPortName());
- try {
- getPw().addPortBinding(inputPortBinding);
- } catch (SQLException ex) {
- logger.info("Already logged port binding", ex);
- }
- }
- }
- } catch (SQLException e) {
- logger.warn("Patch top level inputs problem for provenance", e);
- } catch (IndexOutOfBoundsException e) {
- logger.error("Could not patch top level", e);
- }
- }
-
- public void reconcileTopLevelOutputs() {
- reconcileLocalOutputs(topLevelDataflowID);
- }
-
- // PM added 23/4/09
- /**
- * reconcile the top level outputs with the results from its immediate precedessors in the graph.<br/>
- * various cases have to be considered: predecessors may include records that are not in the output,
- * while the output may include nested list structures that are not in the precedessors. This method accounts
- * for a 2-way reconciliation that considers all possible cases.<br/>
- * at the end, outputs and their predecessors contain the same data.<p/>
- * NOTE: if we assume that data values (URIs) are <em>always</em> unique then this is greatly simplified by just
- * comparing two sets of value records by their URIs and reconciling them. But this is not the way it is done here
- */
- public void reconcileLocalOutputs(String dataflowID) {
- /*
- for each output O
-
- for each variable V in predecessors(O)
-
- fetch all VB records for O into list OValues
- fetch all VB records for V into list Yalues
-
- compare OValues and VValues:
- it SHOULD be the case that OValues is a subset of YValues. Under this assumption:
-
- for each vb in YValues:
- - if there is a matching o in OValues then (vb may be missing collection information)
- copy o to vb
- else
- if vb has no collection info && there is a matching tree node tn in OTree (use iteration index for the match) then
- set vb to be in collection tb
- copy vb to o
-
- finally copy all Collection records for O in OTree -- catch duplicate errors
- */
-
- Map<String, String> queryConstraints = new HashMap<>();
-
- try {
- // for each output O
- for (Port output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID)) {
- // collect all VBs for O
-// String oPName = output.getPName();
-// String oVName = output.getVName();
-// queryConstraints.put("V.portName", oVName);
-// queryConstraints.put("V.processorName", oPName);
-// queryConstraints.put("VB.workflowRunId", workflowRunId);
-// queryConstraints.put("V.workflowId", topLevelDataflowID);
-
-// List<PortBinding> OValues = pq.getPortBindings(queryConstraints);
-
- // find all records for the immediate precedessor Y of O
- queryConstraints.clear();
-// queryConstraints.put("destinationPortName", output.getVName());
-// queryConstraints.put("destinationProcessorName", output.getPName());
- queryConstraints.put("destinationPortId", output.getIdentifier());
- queryConstraints.put("workflowId", output.getWorkflowId());
- List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
-
- // there can be only one -- but check that there is one!
- if (incomingDataLinks.isEmpty())
- continue;
-
- String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
- String sourceVname = incomingDataLinks.get(0).getSourcePortName();
-
- queryConstraints.clear();
- queryConstraints.put("V.portName", sourceVname);
- queryConstraints.put("V.processorName", sourcePname);
- queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
- queryConstraints.put("V.workflowId", topLevelDataflowID);
-
- List<PortBinding> YValues = pq.getPortBindings(queryConstraints);
-
- // for each YValue look for a match in OValues
- // (assume the YValues values are a superset of OValues)!)
-
- for (PortBinding yValue:YValues) {
- // look for a matching record in PortBinding for output O
- queryConstraints.clear();
- queryConstraints.put("V.portName", output.getPortName());
- queryConstraints.put("V.processorName", output.getProcessorName());
- queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
- queryConstraints.put("V.workflowid", topLevelDataflowID);
- queryConstraints.put("VB.iteration", yValue.getIteration());
- if (yValue.getCollIDRef()!= null) {
- queryConstraints.put("VB.collIDRef", yValue.getCollIDRef());
- queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl()));
- }
- List<PortBinding> matchingOValues = pq.getPortBindings(queryConstraints);
-
- // result at most size 1
- if (!matchingOValues.isEmpty()) {
- PortBinding oValue = matchingOValues.get(0);
-
- // copy collection info from oValue to yValue
- yValue.setCollIDRef(oValue.getCollIDRef());
- yValue.setPositionInColl(oValue.getPositionInColl());
-
- pw.updatePortBinding(yValue);
- } else {
- // copy the yValue to O
- // insert PortBinding back into VB with the global output portName
- yValue.setProcessorName(output.getProcessorName());
- yValue.setPortName(output.getPortName());
- pw.addPortBinding(yValue);
- }
-
- } // for each yValue in YValues
-
- // copy all Collection records for O to Y
-
- // get all collections refs for O
- queryConstraints.clear();
- queryConstraints.put("workflowRunId", getWorkflowRunId());
- queryConstraints.put("processorNameRef", output.getProcessorName());
- queryConstraints.put("portName", output.getPortName());
-
- List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints);
-
- // insert back as collection refs for Y -- catch duplicates
- for (NestedListNode nln:oCollections) {
- nln.setProcessorName(sourcePname);
- nln.setProcessorName(sourceVname);
-
- getPw().replaceCollectionRecord(nln, sourcePname, sourceVname);
- }
-
- } // for each output var
-
- } catch (SQLException e) {
- logger.warn("Problem reconciling top level outputs", e);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private void processOutput(OutputDataProvenanceItem provenanceItem,
- ProcessorBinding procBinding, String currentWorkflowID) {
- Element dataItemAsXML = getDataItemAsXML(provenanceItem);
- List<Element> outputPorts = dataItemAsXML.getChildren("port");
- for (Element outputport : outputPorts) {
- String portName = outputport.getAttributeValue("name");
-
- // value type may vary
- List<Element> valueElements = outputport.getChildren();
- if (valueElements != null && !valueElements.isEmpty()) {
- Element valueEl = valueElements.get(0); // only really 1 child
-
- processPortBinding(valueEl, procBinding.getProcessorName(),
- portName, procBinding.getIterationVector(),
- getWorkflowRunId(), currentWorkflowID);
- }
- }
- }
-
- /**
- * this method reconciles values in varBindings across an datalink: Firstly,
- * if vb's value is within a collection, _and_ it is copied from a value
- * generated during a previous iteration, then this method propagates the
- * list reference to that iteration value, which wouldn't have it.
- * Conversely, if vb is going to be input to an iteration, then it's lost
- * its containing list node, and we put it back in by looking at the
- * corresponding predecessor
- *
- * @param vb
- * @throws SQLException
- */
- private void backpatchIterationResults(List<PortBinding> newBindings) throws SQLException {
- logger.debug("backpatchIterationResults: start");
- for (PortBinding vb : newBindings) {
- logger.debug("backpatchIterationResults: processing vb "
- + vb.getProcessorName() + "/" + vb.getPortName() + "="
- + vb.getValue());
-
- if (vb.getCollIDRef()!= null) // this is a member of a collection
- logger.debug("...which is inside a collection ");
-
- // look for its antecedent
- Map<String,String> queryConstraints = new HashMap<>();
- queryConstraints.put("destinationPortName", vb.getPortName());
- queryConstraints.put("destinationProcessorName", vb.getProcessorName());
- queryConstraints.put("workflowId", pq.getWorkflowIdsForRun(vb.getWorkflowRunId()).get(0)); // CHECK picking first element in list...
- List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
-
- // there can be only one -- but check that there is one!
- if (incomingDataLinks.isEmpty())
- return;
-
- String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
- String sourceVname = incomingDataLinks.get(0).getSourcePortName();
-
- logger.debug("antecedent: "+sourcePname+":"+sourceVname);
-
- // get the varbindings for this port and select the one with the same iteration vector as its successor
- queryConstraints.clear();
- queryConstraints.put("VB.portName", sourceVname);
- queryConstraints.put("V.processorName", sourcePname);
- queryConstraints.put("VB.value", vb.getValue());
- queryConstraints.put("VB.workflowRunId", vb.getWorkflowRunId());
-
- // reconcile
- for (PortBinding b : pq.getPortBindings(queryConstraints)) {
- logger.debug("backpatching " + sourceVname + " " + sourcePname);
-
- if (vb.getCollIDRef() != null && b.getCollIDRef() == null) {
- logger.debug("successor " + vb.getPortName()
- + " is in collection " + vb.getCollIDRef()
- + " but pred " + b.getPortName() + " is not");
- logger.debug("putting " + b.getPortName()
- + " in collection " + vb.getCollIDRef()
- + " at pos " + vb.getPositionInColl());
- b.setCollIDRef(vb.getCollIDRef());
- b.setPositionInColl(vb.getPositionInColl());
- getPw().updatePortBinding(b);
-
- } else if (vb.getCollIDRef() == null && b.getCollIDRef() != null) {
- logger.debug("successor " + vb.getPortName()
- + " is NOT in collection but pred "
- + b.getPortName() + " IS");
- logger.debug("putting " + vb.getPortName()
- + " in collection " + b.getCollIDRef() + " at pos "
- + b.getPositionInColl());
- vb.setCollIDRef(b.getCollIDRef());
- vb.setPositionInColl(b.getPositionInColl());
- getPw().updatePortBinding(vb);
- }
- }
- }
- }
-
-
- /**
- * create one new PortBinding record for each input port binding
- * @param currentWorkflowID
- */
- @SuppressWarnings("unchecked")
- private void processInput(InputDataProvenanceItem provenanceItem,
- ProcessorBinding procBinding, String currentWorkflowID) {
- Element dataItemAsXML = getDataItemAsXML(provenanceItem);
- int order = 0;
- for (Element inputport : (List<Element>) dataItemAsXML.getChildren("port")) {
- String portName = inputport.getAttributeValue("name");
-
- try {
- // add process order sequence to Port for this portName
- Map<String, String> queryConstraints = new HashMap<>();
- queryConstraints.put("V.workflowId", currentWorkflowID);
- queryConstraints.put("processorName", procBinding.getProcessorName());
- queryConstraints.put("portName", portName);
- queryConstraints.put("isInputPort", "1");
-
- Port v = getPq().getPorts(queryConstraints).get(0);
- v.setIterationStrategyOrder(order++);
- getPw().updatePort(v);
- } catch (IndexOutOfBoundsException e) {
- logger.error("Could not process input " + portName, e);
- } catch (SQLException e1) {
- logger.error("Could not process input " + portName, e1);
- }
-
- // value type may vary
- List<Element> valueElements = inputport.getChildren(); // hopefully
- // in the right order...
- if (valueElements != null && valueElements.size() > 0) {
- Element valueEl = valueElements.get(0); // expect only 1 child
- // processVarBinding(valueEl, processor, portName, iterationVector,
- // dataflow);
-
- List<PortBinding> newBindings = processPortBinding(valueEl,
- procBinding.getProcessorName(), portName,
- procBinding.getIterationVector(), getWorkflowRunId(),
- currentWorkflowID);
- // this is a list whenever valueEl is of type list: in this case processVarBinding recursively
- // processes all values within the collection, and generates one PortBinding record for each of them
-
- allInputVarBindings.addAll(newBindings);
-
- // // if the new binding involves list values, then check to see if they need to be propagated back to
-// // results of iterations
-
- // Backpatching disabled as it is very inefficient and not needed
- // for current Taverna usage
-
- try {
- if (backpatching)
- backpatchIterationResults(newBindings);
- } catch (SQLException e) {
- logger.warn("Problem with back patching iteration results", e);
- }
- } else {
- if (valueElements != null)
- logger.debug("port name " + portName + " "
- + valueElements.size());
- else
- logger.debug("valueElements is null for port name "
- + portName);
- }
- }
- }
-
- /**
- * capture the default case where the value is not a list
- *
- * @param valueEl
- * @param processorId
- * @param portName
- * @param iterationId
- * @param workflowRunId
- * @param currentWorkflowID
- */
- private List<PortBinding> processPortBinding(Element valueEl,
- String processorId, String portName, String iterationId,
- String workflowRunId, String currentWorkflowID) {
- // uses the defaults:
- // collIdRef = null
- // parentcollectionRef = null
- // positionInCollection = 1
- return processPortBinding(valueEl, processorId, portName, null, 1, null,
- iterationId, workflowRunId, null, currentWorkflowID);
- }
-
- /**
- * general case where value can be a list
- * @param valueEl
- * @param processorId
- * @param portName
- * @param collIdRef
- * @param positionInCollection
- * @param parentCollectionRef
- * @param iterationId
- * @param workflowRunId
- * @param currentWorkflowID
- */
- @SuppressWarnings("unchecked")
- private List<PortBinding> processPortBinding(Element valueEl,
- String processorId, String portName, String collIdRef,
- int positionInCollection, String parentCollectionRef,
- String iterationId, String workflowRunId, String itVector,
- String currentWorkflowID) {
- List<PortBinding> newBindings = new ArrayList<>();
-
- String valueType = valueEl.getName();
- // logger.info("value element for " + processorId + ": "
- // + valueType);
-
- String iterationVector = (itVector == null ? extractIterationVector(iterationId)
- : itVector);
-
- PortBinding vb = new PortBinding();
-
- vb.setWorkflowId(currentWorkflowID);
- vb.setWorkflowRunId(workflowRunId);
- vb.setProcessorName(processorId);
- vb.setValueType(valueType);
- vb.setPortName(portName);
- vb.setCollIDRef(collIdRef);
- vb.setPositionInColl(positionInCollection);
-
- newBindings.add(vb);
-
- if (valueType.equals("literal")) {
- try {
- vb.setIteration(iterationVector);
- vb.setValue(valueEl.getAttributeValue("id"));
- logger.debug("new input VB with workflowId="+currentWorkflowID+" processorId="+processorId+
- " valueType="+valueType+" portName="+portName+" collIdRef="+collIdRef+
- " position="+positionInCollection+" itvector="+iterationVector+
- " value="+vb.getValue());
- getPw().addPortBinding(vb);
- } catch (SQLException e) {
- logger.warn("Process Port Binding problem with provenance", e);
- }
-
- } else if (valueType.equals("referenceSet")) {
- vb.setIteration(iterationVector);
- vb.setValue(valueEl.getAttributeValue("id"));
- vb.setReference(valueEl.getChildText("reference"));
-
- logger.debug("new input VB with workflowId=" + currentWorkflowID
- + " processorId=" + processorId + " valueType=" + valueType
- + " portName=" + portName + " collIdRef=" + collIdRef
- + " position=" + positionInCollection + " itvector="
- + iterationVector + " value=" + vb.getValue());
-
- try {
- getPw().addPortBinding(vb);
- } catch (SQLException e) {
- logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e);
- // try to update the existing record instead using the current collection info
-
- getPw().updatePortBinding(vb);
- }
-
- } else if (valueType.equals("list")) {
- logger.debug("input of type list");
-
- // add entries to the Collection and to the PortBinding tables
- // list id --> Collection.collId
-
- String collId = valueEl.getAttributeValue("id");
- try {
- parentCollectionRef = getPw().addCollection(processorId, collId,
- parentCollectionRef, iterationVector, portName,
- workflowRunId);
-
- // iterate over each list element
- List<Element> listElements = valueEl.getChildren();
-
- positionInCollection = 1; // also use this as a suffix to extend the iteration vector
-
- // extend iteration vector to account for additional levels within the list
-
- String originalIterationVector = iterationVector;
-
- // children can be any base type, including list itself -- so
- // use recursion
- for (Element el : listElements) {
- if (originalIterationVector.length() > 2) // vector is not empty
- iterationVector = originalIterationVector.substring(0,
- originalIterationVector.length()-1) + ","+
- Integer.toString(positionInCollection-1) + "]";
- else
- iterationVector = "["+ (positionInCollection-1) + "]";
-
- newBindings.addAll(processPortBinding(el, processorId,
- portName, collId, positionInCollection,
- parentCollectionRef, iterationId, workflowRunId,
- iterationVector, currentWorkflowID));
-
- positionInCollection++;
- }
- } catch (SQLException e) {
- logger.warn("Problem processing var binding", e);
- }
- } else if (valueType.equals("error")) {
- vb.setIteration(iterationVector);
- vb.setValue(valueEl.getAttributeValue("id"));
- vb.setReference(valueEl.getChildText("reference"));
- try {
- getPw().addPortBinding(vb);
- } catch (SQLException e) {
- logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e);
- // try to update the existing record instead using the current collection info
-
- getPw().updatePortBinding(vb);
- }
- } else {
- logger.warn("unrecognized value type element for "
- + processorId + ": " + valueType);
- }
-
- return newBindings;
- }
-
-
- /**
- * OBSOLETE: returns the iteration vector x,y,z,... from [x,y,z,...]
- * <p/>
- * now returns the vector itself -- this is still experimental
- *
- * @param iteration
- * @return
- */
- @Deprecated
- String extractIterationVector(String iteration) {
- return iteration;
- }
-
- /**
- * silly class to hold pairs of strings. any better way??
- * @author paolo
- *
- */
- class Pair {
- String v1, v2;
-
- public Pair(String current, String workflowId) {
- v1=current; v2=workflowId;
- }
-
- /**
- * @return the v1
- */
- public String getV1() {
- return v1;
- }
-
- /**
- * @param v1 the v1 to set
- */
- public void setV1(String v1) {
- this.v1 = v1;
- }
-
- /**
- * @return the v2
- */
- public String getV2() {
- return v2;
- }
-
- /**
- * @param v2 the v2 to set
- */
- public void setV2(String v2) {
- this.v2 = v2;
- }
- }
-
- @SuppressWarnings("deprecation")
- public List<Pair> toposort(String dataflowName, String workflowRunId) throws SQLException {
-
-// String workflowId = pq.getworkflowIdForDataflow(dataflowName, workflowRunId);
- String workflowId = pq.getWorkflowIdForExternalName(dataflowName);
-
- // fetch processors along with the count of their predecessors
- Map<String, Integer> predecessorsCount = getPq().getPredecessorsCount(workflowRunId);
- Map<String, List<String>> successorsOf = new HashMap<String, List<String>>();
-// List<String> procList = pq.getContainedProcessors(dataflowName, workflowRunId);
- List<String> procList = pq.getContainedProcessors(dataflowName);
-
- for (String s:procList) {
- List<String> successors = getPq().getSuccProcessors(s, workflowId, workflowRunId);
- successorsOf.put(s, successors);
- }
-
- List<Pair> sorted = tsort(procList, dataflowName, predecessorsCount, successorsOf, workflowId, workflowRunId);
-
- for (int i=0; i< sorted.size(); i++) {
- String procName = sorted.get(i).getV1();
-
- if (pq.isDataflow(procName) && !procName.equals(dataflowName)) { // handle weirdness: a dataflow is contained within itself..
- // recurse on procName
- List<Pair> sortedSublist = toposort(procName, workflowRunId);
-
- // replace procName with sortedSublist in sorted
- sorted.remove(i);
- sorted.addAll(i, sortedSublist);
- }
- }
- return sorted;
- }
-
-
-
- /**
- * @param procList
- * @param predecessorsCount
- * @param successorsOf
- * @param workflowRunId
- * @return
- * @throws SQLException
- */
- public List<Pair> tsort(List<String> procList, String dataflowName,
- Map<String, Integer> predecessorsCount,
- Map<String, List<String>> successorsOf, String workflowId,
- String workflowRunId) throws SQLException {
- List<Pair> l = new ArrayList<>(); // holds sorted elements
- List<String> q = new ArrayList<>(); // temp queue
-
- // init queue with procList processors that have no predecessors
- for (String proc:procList)
- if (predecessorsCount.get(proc) == null || predecessorsCount.get(proc) == 0 &&
- !proc.equals(dataflowName))
- q.add(proc);
-
- while (!q.isEmpty()) {
- String current = q.remove(0);
- l.add(new Pair(current, workflowId));
-
- List<String> successors = successorsOf.get(current);
-
- if (successors == null)
- continue;
-
- // reduce the number of predecessors to each of the successors by one
- // NB we must traverse an additional datalink through a nested workflow input if the successor is a dataflow!!
- for (String succ : successors) {
- // decrease edge count for each successor processor
- predecessorsCount.put(succ, predecessorsCount.get(succ) - 1);
-
- if (predecessorsCount.get(succ) == 0 && !succ.equals(dataflowName))
- q.add(succ);
- }
- } // end loop on q
- return l;
- }
-
- @SuppressWarnings("deprecation")
- public void propagateANL(String workflowRunId) throws SQLException {
- String top = pq.getTopLevelDataflowName(workflowRunId);
-
- // //////////////////////
- // PHASE I: toposort the processors in the whole graph
- // //////////////////////
- List<Pair> sorted = toposort(top, workflowRunId);
-
- List<String> sortedProcessors = new ArrayList<>();
-
- for (Pair p : sorted)
- sortedProcessors.add(p.getV1());
-
- logger.debug("final sorted list of processors");
- for (Pair p : sorted)
- logger.debug(p.getV1() + " in workflowId " + p.getV2());
-
- // //////////////////////
- // PHASE II: traverse and set anl on each port
- // //////////////////////
-
- // // sorted processor names in L at this point
- // // process them in order
- for (Pair pnameInContext : sorted) {
- // // process pname's inputs -- set ANL to be the DNL if not set in prior steps
- String pname = pnameInContext.getV1();
- String workflowId = pnameInContext.getV2();
-
- List<Port> inputs = getPq().getInputPorts(pname, workflowId); // null -> do not use instance (??) CHECK
-
- int totalANL = 0;
- for (Port iv : inputs) {
-
- if (! iv.isResolvedDepthSet()) {
- iv.setResolvedDepth(iv.getDepth());
- getPw().updatePort(iv);
- }
-
- int delta_nl = iv.getResolvedDepth() - iv.getDepth();
-
- // if delta_nl < 0 then Taverna wraps the value into a list --> use dnl(X) in this case
- if (delta_nl < 0 ) delta_nl = 0;// CHECK iv.getTypedepth();
-
- totalANL += delta_nl;
-
- // this should take care of the special case of the top level dataflow with inputs that have successors in the graph
- // propagate this through all the links from this var
-// List<Port> successors = getPq().getSuccVars(pname, iv.getVName(), workflowRunId);
-
-// for (Port v : successors) {
-// v.setresolvedDepth(iv.getresolvedDepth());
-// getPw().updateVar(v);
-// }
- }
-
- // process pname's outputs -- set ANL based on the sum formula (see
- // paper)
- for (Port ov : getPq().getOutputPorts(pname, workflowId)) {
-
- ov.setResolvedDepth(ov.getDepth() + totalANL);
-
- logger.debug("anl for "+pname+":"+ov.getPortName()+" = "+(ov.getDepth() + totalANL));
- getPw().updatePort(ov);
-
- // propagate this through all the links from this var
- for (Port v : getPq().getSuccPorts(pname, ov.getPortName(), workflowId)) {
- List<Port> toBeProcessed = new ArrayList<>();
- toBeProcessed.add(v);
-
- if (v.getProcessorId() == null && v.isInputPort()) { // this is the input to a nested workflow
-// String tempWorkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
- String tempWorkflowId = pq
- .getWorkflowIdForExternalName(v
- .getProcessorName());
- List<Port> realSuccessors = getPq().getSuccPorts(
- v.getProcessorName(), v.getPortName(),
- tempWorkflowId);
-
- toBeProcessed.remove(0);
- toBeProcessed.addAll(realSuccessors);
-
- } else if (v.getProcessorId() == null && !v.isInputPort()) { // this is the output to a nested workflow
-// String tempworkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
- List<Port> realSuccessors = getPq().getSuccPorts(
- v.getProcessorName(), v.getPortName(), null);
-
- toBeProcessed.remove(0);
- toBeProcessed.addAll(realSuccessors);
- }
-
- for (Port v1 : toBeProcessed) {
- v1.setResolvedDepth(ov.getResolvedDepth());
- logger.debug("anl for " + v1.getProcessorName() + ":"
- + v1.getPortName() + " = "
- + ov.getResolvedDepth());
- getPw().updatePort(v1);
- }
- }
- }
- }
- }
-
- public void setPw(ProvenanceWriter pw) {
- this.pw = pw;
- }
-
- public ProvenanceWriter getPw() {
- return pw;
- }
-
- public void setPq(ProvenanceQuery pq) {
- this.pq = pq;
- }
-
- public ProvenanceQuery getPq() {
- return pq;
- }
-
- public void setWorkflowRunId(String workflowRunId) {
- this.workflowRunId = workflowRunId;
- }
-
- public String getWorkflowRunId() {
- return workflowRunId;
- }
-
- public void setWfdp(WorkflowDataProcessor wfdp) {
- this.wfdp = wfdp;
- }
-
- public WorkflowDataProcessor getWfdp() {
- return wfdp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
deleted file mode 100644
index 975a83a..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-/**
- *
- * This Java bean holds a single provenance record, i.e., the finest element of
- * a provenance graph that is stored in the provenance DB. Essentially this
- * represents one data element (value) flowing through a port (vname) of a
- * processor (pname), in the context of one run (wfInstance) of a workflow
- * (wfname). The record may include an <b>iteration</b> vector, used when the
- * same processor receives multiple values on the same port, as part of
- * iterative processing. When the value belongs to a collection (a nested list),
- * the <b>collIdRef</b> field contains a reference to that collection.
- *
- * @author Paolo Missier
- *
- */
-public class LineageQueryResultRecord {
- private String workflowId;
- private String processorName;
- private String portName;
- private String workflowRunId;
- private String iteration;
- private String value; // atomic or XML-formatted collection -- this is actually a reference to the value...
- private String collIdRef;
- private String parentCollIDRef;
- private String resolvedValue;
- private String type; // one of referenceSet, referenceSetCollection
- boolean printResolvedValue;
- boolean isInput;
- boolean isCollection;
-
- @Override
- public String toString() {
- if (isCollection) {
- return "COLLECTION: proc " + getProcessorName() + " var "
- + getPortName() + " " + " iteration: " + getIteration()
- + " value: " + getValue() + " collection id: "
- + getCollectionT2Reference() + " parent collection: "
- + getParentCollIDRef();
- } else if (printResolvedValue)
- return "workflow " + getworkflowId() + " proc "
- + getProcessorName() + " var " + getPortName() + " "
- + " iteration: " + getIteration() + " value: " + getValue()
- + " collection id: " + getCollectionT2Reference()
- + " resolvedValue: " + getResolvedValue();
- else
- return "workflow " + getworkflowId() + " proc "
- + getProcessorName() + " var " + getPortName() + " "
- + " iteration: " + getIteration() + " collection id: "
- + getCollectionT2Reference() + " value: " + getValue();
- }
-
- /**
- * @return the pname
- */
- public String getProcessorName() {
- return processorName;
- }
- /**
- * @param pname the pname to set
- */
- public void setProcessorName(String pname) {
- this.processorName = pname;
- }
- /**
- * @return the vname
- */
- public String getPortName() {
- return portName;
- }
- /**
- * @param vname the vname to set
- */
- public void setPortName(String vname) {
- this.portName = vname;
- }
- /**
- * @return the workflowRun
- */
- public String getWorkflowRunId() {
- return workflowRunId;
- }
- /**
- * @param workflowRun the workflowRun to set
- */
- public void setWorkflowRunId(String workflowRun) {
- this.workflowRunId = workflowRun;
- }
- /**
- * @return the value
- */
- public String getValue() {
- return value;
- }
- /**
- * @param value the value to set
- */
- public void setValue(String value) {
- this.value = value;
- }
- /**
- * @return the type
- */
- public String getType() {
- return type;
- }
- /**
- * @param type the type to set
- */
- public void setType(String type) {
- this.type = type;
- }
- /**
- * @return the iteration
- */
- public String getIteration() {
- return iteration;
- }
- /**
- * @param iteration the iteration to set
- */
- public void setIteration(String iteration) {
- this.iteration = iteration;
- }
- /**
- * @return the resolvedValue
- */
- public String getResolvedValue() {
- return resolvedValue;
- }
- /**
- * @param resolvedValue the resolvedValue to set
- */
- public void setResolvedValue(String resolvedValue) {
- this.resolvedValue = resolvedValue;
- }
-
-
- public void setPrintResolvedValue(boolean printResolvedValue) {
- this.printResolvedValue = printResolvedValue;
- }
-
-
- /**
- * @return the isInput
- */
- public boolean isInputPort() {
- return isInput;
- }
-
-
- /**
- * @param isInput the isInput to set
- */
- public void setIsInputPort(boolean isInput) {
- this.isInput = isInput;
- }
-
-
- /**
- * @return the collIdRef
- */
- public String getCollectionT2Reference() {
- return collIdRef;
- }
-
-
- /**
- * @param collIdRef the collIdRef to set
- */
- public void setCollectionT2Reference(String collIdRef) {
- this.collIdRef = collIdRef;
- }
-
-
- /**
- * @return the isCollection
- */
- public boolean isCollection() {
- return isCollection;
- }
-
-
- /**
- * @param isCollection the isCollection to set
- */
- public void setCollection(boolean isCollection) {
- this.isCollection = isCollection;
- }
-
-
- /**
- * @return the parentCollIDRef
- */
- public String getParentCollIDRef() {
- return parentCollIDRef;
- }
-
-
- /**
- * @param parentCollIDRef the parentCollIDRef to set
- */
- public void setParentCollIDRef(String parentCollIDRef) {
- this.parentCollIDRef = parentCollIDRef;
- }
-
-
- /**
- * @return the workflowId
- */
- public String getworkflowId() {
- return workflowId;
- }
-
-
- /**
- * @param workflowId the workflowId to set
- */
- public void setWorkflowId(String workflowId) {
- this.workflowId = workflowId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
deleted file mode 100644
index 58d7c3d..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-/**
- * encapsulates an SQL query along with directives on how to interpret the
- * results, i.e., which elements of the select clause are to be considered
- * relevant. For instance when the query includes a join with Collection, the
- * intent is that lineage should return the collection itself as opposed to any
- * of its elements.
- *
- * @author paolo
- *
- */
-public class LineageSQLQuery {
- private String vbQuery = null;
- private String collQuery = null;
-
- /** =0 => use var values, >0 => use enclosing collection */
- private int depth = 0;
-
- /**
- * @return the depth
- */
- public int getdepth() {
- return depth;
- }
-
- /**
- * @param depth
- * the depth to set
- */
- public void setdepth(int depth) {
- this.depth = depth;
- }
-
- /**
- * @return the collQuery
- */
- public String getCollQuery() {
- return collQuery;
- }
-
- /**
- * @param collQuery the collQuery to set
- */
- public void setCollQuery(String collQuery) {
- this.collQuery = collQuery;
- }
-
- /**
- * @return the vbQuery
- */
- public String getVbQuery() {
- return vbQuery;
- }
-
- /**
- * @param vbQuery the vbQuery to set
- */
- public void setVbQuery(String vbQuery) {
- this.vbQuery = vbQuery;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
deleted file mode 100644
index 621e351..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import net.sf.taverna.t2.provenance.connector.AbstractProvenanceConnector;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-
-import org.apache.log4j.Logger;
-
-/**
- * Implemented by the database class that a {@link AbstractProvenanceConnector}
- * implementation uses for storage purposes
- *
- * @author Paolo Missier
- * @author Ian Dunlop
- *
- */
-//FIXME is this class really needed. Can't we just push the
-//acceptRawProvanceEvent up into the ProvenanceConnector?
-public class Provenance {
- private static Logger logger = Logger.getLogger(Provenance.class);
-
- protected ProvenanceQuery pq;
- protected ProvenanceWriter pw;
- protected EventProcessor ep;
-
- private String saveEvents;
-
- private volatile boolean firstWorkflowStructure = true;
-
- public boolean isFirstWorkflowStructure() {
- return firstWorkflowStructure;
- }
-
- public void setFirstWorkflowStructure(boolean firstWorkflowStructure) {
- this.firstWorkflowStructure = firstWorkflowStructure;
- }
-
- private List<String> workflowIDStack = Collections.synchronizedList(new ArrayList<String>());
-
- private Map<String, String> workflowIDMap = new ConcurrentHashMap<String, String>();
-
- public Provenance() { }
-
- public Provenance(EventProcessor eventProcessor) {
- this.ep = eventProcessor;
- this.pq = ep.getPq();
- this.pw = ep.getPw();
- }
-
- public void clearDB() throws SQLException {
- getPw().clearDBStatic();
- getPw().clearDBDynamic();
- }
-
-
- /**
- * @return the saveEvents
- */
- public String getSaveEvents() {
- return saveEvents;
- }
-
- /**
- * @param saveEvents
- * the saveEvents to set
- */
- public void setSaveEvents(String saveEvents) {
- this.saveEvents = saveEvents;
- }
-
- // FIXME I think the provenance query and writer should both come from the
- // EventProcessor
- // seems silly setting the ep, pq and pw separately.
- public void setPq(ProvenanceQuery pq) {
- this.pq = pq;
- }
-
- public ProvenanceQuery getPq() {
- return pq;
- }
-
- public void setPw(ProvenanceWriter pw) {
- this.pw = pw;
- }
-
- public ProvenanceWriter getPw() {
- return pw;
- }
-
- public void setEp(EventProcessor ep) {
- this.ep = ep;
- }
-
- public EventProcessor getEp() {
- return ep;
- }
-
- /**
- * maps each incoming event to an insert query into the provenance store
- *
- * @param eventType
- * @param content
- * @throws SQLException
- * @throws IOException
- */
- public void acceptRawProvenanceEvent(SharedVocabulary eventType,
- ProvenanceItem provenanceItem) throws SQLException, IOException {
- processEvent(provenanceItem, eventType);
- }
-
- /**
- * parse d and generate SQL insert calls into the provenance DB
- *
- * @param d
- * DOM for the event
- * @param eventType
- * see {@link SharedVocabulary}
- * @throws SQLException
- * @throws IOException
- */
- protected void processEvent(ProvenanceItem provenanceItem,
- SharedVocabulary eventType) throws SQLException, IOException {
- if (eventType.equals(SharedVocabulary.WORKFLOW_EVENT_TYPE)) {
- // process the workflow structure
- //workflowStartedMap.put()
- WorkflowProvenanceItem workflowProvenanceItem = (WorkflowProvenanceItem) provenanceItem;
-
- getEp().getWfdp().workflowStarted.put(workflowProvenanceItem.getIdentifier(), workflowProvenanceItem.getInvocationStarted());
- if (isFirstWorkflowStructure()) {
- String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier();
- String instanceId = provenanceItem.getIdentifier();
-
- workflowIDMap.put(instanceId, dataflowId);
- setFirstWorkflowStructure(false);
- String processWorkflowStructure = getEp().processWorkflowStructure(provenanceItem);
- synchronized(workflowIDStack) {
- workflowIDStack.add(0,processWorkflowStructure);
- }
-
- getEp().propagateANL(provenanceItem.getIdentifier());
- } else {
- String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier();
- String instanceId = provenanceItem.getIdentifier();
-
- workflowIDMap.put(instanceId, dataflowId);
-
- Dataflow df = workflowProvenanceItem.getDataflow();
- synchronized(workflowIDStack) {
- workflowIDStack.add(0,df.getIdentifier());
- }
- }
- } else if (provenanceItem.getEventType().equals(SharedVocabulary.END_WORKFLOW_EVENT_TYPE)) {
-// String currentWorkflowID = workflowIDStack.get(0);
-// workflowIDStack.remove(0);
- String currentWorkflowID = provenanceItem.getParentId();
-
- getEp().processProcessEvent(provenanceItem, currentWorkflowID);
-
- } else { // all other event types (iteration etc.)
- logger.debug("processEvent of type "+provenanceItem.getEventType()+" for item of type "+provenanceItem.getClass().getName());
- String currentWorkflowID = provenanceItem.getWorkflowId();
-// String currentWorkflowID = workflowIDMap.get(provenanceItem.getParentId());
-
- getEp().processProcessEvent(provenanceItem, currentWorkflowID);
-
-// getEp().processProcessEvent(provenanceItem, workflowIDStack.get(0));
- }
- }
-}