You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/17 21:43:19 UTC

[15/51] [partial] incubator-taverna-engine git commit: temporarily empty repository

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
deleted file mode 100644
index 291da99..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/EventProcessor.java
+++ /dev/null
@@ -1,1547 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- *
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-import static java.util.Collections.synchronizedList;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.getDataItemAsXML;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.iterationToString;
-import static net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils.parentProcess;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ACTIVITY_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.END_WORKFLOW_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.INVOCATION_STARTED_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ITERATION_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESSOR_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESS_EVENT_TYPE;
-import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.WORKFLOW_DATA_EVENT_TYPE;
-
-import java.beans.ExceptionListener;
-import java.beans.XMLEncoder;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.sql.Blob;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.sql.rowset.serial.SerialBlob;
-
-import net.sf.taverna.t2.provenance.item.DataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.DataflowRunComplete;
-import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
-import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
-import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.lineageservice.utils.DataBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.DataLink;
-import net.sf.taverna.t2.provenance.lineageservice.utils.NestedListNode;
-import net.sf.taverna.t2.provenance.lineageservice.utils.Port;
-import net.sf.taverna.t2.provenance.lineageservice.utils.PortBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProcessorEnactment;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils;
-import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
-import net.sf.taverna.t2.reference.T2Reference;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
-import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
-import net.sf.taverna.t2.workflowmodel.Datalink;
-import net.sf.taverna.t2.workflowmodel.InputPort;
-import net.sf.taverna.t2.workflowmodel.MergeInputPort;
-import net.sf.taverna.t2.workflowmodel.MergeOutputPort;
-import net.sf.taverna.t2.workflowmodel.OutputPort;
-import net.sf.taverna.t2.workflowmodel.Processor;
-import net.sf.taverna.t2.workflowmodel.ProcessorInputPort;
-import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort;
-import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
-import net.sf.taverna.t2.workflowmodel.processor.activity.NestedDataflow;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-//import org.apache.commons.io.FileUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.JDOMException;
-import org.jdom.Namespace;
-import org.jdom.input.SAXBuilder;
-import org.jdom.output.XMLOutputter;
-
-import uk.org.taverna.scufl2.api.io.WorkflowBundleIO;
-
-/**
- * @author Paolo Missier
- */
-public class EventProcessor {
-	/**
-	 * A map of UUIDs of the originating processor to the ProcBinding object
-	 * that contains its parameters
-	 */
-	private Map<String, ProcessorBinding> procBindingMap = new ConcurrentHashMap<>();
-
-	/** A map of child ids to their parents in the hierarchy of events:
-	 *  workflow -> process -> processor -> activity -> iteration
-	 */
-	private Map<String, String> parentChildMap= new ConcurrentHashMap<>();
-
-	private static Logger logger = Logger.getLogger(EventProcessor.class);
-
-	private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_";
-	private static final String INPUT_CONTAINER_PROCESSOR = "_INPUT_";
-
-	private volatile boolean workflowStructureDone = false; // used to inhibit processing of multiple workflow events -- we only need the first
-	private volatile String workflowRunId = null; // unique run ID. set when we see the first event of type "process"
-
-	String topLevelDataflowName = null;
-	String topLevelDataflowID   = null;
-
-	Map<String, String> wfNestingMap = new ConcurrentHashMap<>();
-
-	// all input bindings are accumulated here so they can be "backpatched" (see backpatching() )
-	List<PortBinding> allInputVarBindings = synchronizedList(new ArrayList<PortBinding>());
-
-	// dedicated class for processing WorkflowData events which carry workflow output info
-	private WorkflowDataProcessor  wfdp;
-	private ProvenanceWriter pw = null;
-	private ProvenanceQuery  pq = null;
-
-	private HashMap<String, Port> mapping;
-
-	private Map<String, ProcessorEnactment> processorEnactmentMap = new ConcurrentHashMap<>();
-
-	private Map<String, ProvenanceProcessor> processorMapById = new ConcurrentHashMap<>();
-
-	private WorkflowBundleIO io;
-
-	// Backpatching temporarily disabled
-	private static final boolean backpatching = false;
-
-	public EventProcessor(WorkflowBundleIO io) {
-		this.io = io;
-	}
-
-	/**
-	 * @param pw
-	 * @throws SQLException
-	 * @throws ClassNotFoundException
-	 * @throws IllegalAccessException
-	 * @throws InstantiationException
-	 *
-	 */
-	public EventProcessor(ProvenanceWriter pw, ProvenanceQuery pq,
-			WorkflowDataProcessor wfdp,WorkflowBundleIO io) throws InstantiationException,
-			IllegalAccessException, ClassNotFoundException, SQLException {
-		this.pw = pw;
-		this.pq = pq;
-		this.wfdp = wfdp;
-		this.io = io;
-
-		//logger.setLevel((Level) Level.INFO);
-	}
-
-	/**
-	 * this is the new version that makes use of the T2 deserializer
-	 * populate static portion of the DB<br/>
-	 * the static structure may already be in the DB -- this is detected as a duplicate top-level workflow ID.
-	 * In this case, we skip this processing altogether
-	 * @param content
-	 *            is a serialized dataflow (XML) -- this is parsed using the T2
-	 *            Deserializer
-	 * @return the workflowRunId for this workflow structure
-	 */
-	public String processWorkflowStructure(ProvenanceItem provenanceItem) {
-		/*
-		 * this flag is set to prevent processing of separate
-		 * workflowProvenanceItems that describe nested workflows. the
-		 * processing of all nested workflows is done as part of the very first
-		 * workflowProvenanceItem that we receive, which is self-consistent. so
-		 * we ignore all others
-		 */
-		if (workflowStructureDone)
-			return null;
-		WorkflowProvenanceItem wpi = (WorkflowProvenanceItem) provenanceItem;
-		setWorkflowRunId(wpi.getIdentifier());
-		workflowStructureDone = true;
-		return processWorkflowStructure(wpi.getDataflow());
-	}
-
-	public String processWorkflowStructure(Dataflow df) {
-		topLevelDataflowName = df.getLocalName();
-		topLevelDataflowID   = df.getIdentifier();
-
-		// check whether we already have this WF in the DB
-		List<String> workflowIds = null;
-		try {
-			workflowIds = pq.getAllworkflowIds();
-		} catch (SQLException e) {
-			logger.warn("Problem processing workflow structure", e);
-		}
-
-		if (workflowIds == null || workflowIds.contains(topLevelDataflowID)) {
-			// not already in the DB
-			logger.info("new workflow structure with ID " + topLevelDataflowID);
-			ProvenanceProcessor provProc = new ProvenanceProcessor();
-			provProc.setIdentifier(UUID.randomUUID().toString());
-			provProc.setProcessorName(topLevelDataflowName);
-			provProc.setFirstActivityClassName(DATAFLOW_ACTIVITY);
-			provProc.setWorkflowId(topLevelDataflowID);
-			provProc.setTopLevelProcessor(true);
-			// record the top level dataflow as a processor in the DB
-			try {
-				pw.addProcessor(provProc);
-				// pw.addProcessor(topLevelDataflowName, DATAFLOW_PROCESSOR_TYPE, topLevelDataflowID, true);  // true -> is top level
-			} catch (SQLException e) {
-				logger.warn("Can't add processor " + topLevelDataflowID, e);
-			}
-		}
-
-		return processDataflowStructure(df, topLevelDataflowID, df.getLocalName());  // null: no external name given to top level dataflow
-	}
-
-	private Blob serialize(Dataflow df) {
-		Element serializeDataflow = null;xmlSerializer.serializeDataflow(df);//FIXME
-		String dataflowString = null;
-		try {
-		    XMLOutputter outputter = new XMLOutputter();
-		    StringWriter stringWriter = new StringWriter();
-		    outputter.output(serializeDataflow, stringWriter);
-		    dataflowString = stringWriter.toString();
-		} catch (java.io.IOException e) {
-		    logger.error("Could not serialise dataflow", e);
-		    // FIXME Bad Exception handling!
-		}
-		return new SerialBlob(dataflowString.getBytes("UTF-8"));
-	}
-
-	/**
-	 * note: this method can be called as part of a recursion on sub-workflows
-	 * 
-	 * @param df
-	 * @param dataflowID
-	 *            the UUID for the entire dataflow (may be a sub-dataflow)
-	 * @param localName
-	 *            the external name of the dataflow. Null if this is top level,
-	 *            not null if a sub-dataflow
-	 * @return the workflowRunId for this workflow structure
-	 */
-	private String processDataflowStructure(Dataflow df, String dataflowID, String externalName) {
-		String localWorkflowRunID = getWorkflowRunId();
-
-		//dataflowDepth++;
-
-		try {
-			// check whether we already have this WF in the DB
-			boolean alreadyInDb;
-			try {
-				List<String> workflowIds = pq.getAllworkflowIds();
-				alreadyInDb = workflowIds != null && workflowIds.contains(dataflowID);
-			} catch (SQLException e) {
-				logger.warn("Problem processing dataflow structure for " + dataflowID, e);
-				alreadyInDb = false;
-			}
-
-			// add workflow ID -- this is NOT THE SAME AS the workflowRunId
-
-			/*
-			 * this could be a nested workflow -- in this case, override its
-			 * workflowRunId with that of its parent
-			 */
-			if (!alreadyInDb) {
-				String parentDataflow = wfNestingMap.get(dataflowID);
-				Blob blob = serialize(df);
-				if (parentDataflow == null) {
-					// this is a top level dataflow description
-					pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent
-
-				} else {
-					// we are processing a nested workflow structure
-					logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow);
-
-					pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent
-
-					// override workflowRunId to point to top level -- UNCOMMENTED PM 9/09  CHECK
-					localWorkflowRunID = pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId();
-				}
-			}
-			// Log the run itself
-			pw.addWorkflowRun(dataflowID, localWorkflowRunID);
-
-			// add processors along with their variables
-			List<Port> vars = new ArrayList<Port>();
-			for (Processor p : df.getProcessors()) {
-				String pName = p.getLocalName();
-
-				//CHECK get type of first activity and set this as the type of the processor itself
-				List<? extends Activity<?>> activities = p.getActivityList();
-
-				if (! alreadyInDb) {
-					ProvenanceProcessor provProc;
-					String pType = null;
-					if (activities != null && !activities.isEmpty())
-						pType = activities.get(0).getClass().getCanonicalName();
-					provProc = new ProvenanceProcessor();
-					provProc.setIdentifier(UUID.randomUUID().toString());
-					provProc.setProcessorName(pName);
-					provProc.setFirstActivityClassName(pType);
-					provProc.setWorkflowId(dataflowID);
-					provProc.setTopLevelProcessor(false);
-
-					pw.addProcessor(provProc);
-
-					//pw.addProcessor(pName, pType, dataflowID, false);  // false: not a top level processor
-
-					/*
-					 * add all input ports for this processor as input variables
-					 */
-					for (ProcessorInputPort ip : p.getInputPorts()) {
-						Port inputVar = new Port();
-						inputVar.setIdentifier(UUID.randomUUID().toString());
-						inputVar.setProcessorId(provProc.getIdentifier());
-						inputVar.setProcessorName(pName);
-						inputVar.setWorkflowId(dataflowID);
-						inputVar.setPortName(ip.getName());
-						inputVar.setDepth(ip.getDepth());
-						inputVar.setInputPort(true);
-					 	vars.add(inputVar);
-					}
-
-					/*
-					 * add all output ports for this processor as output
-					 * variables
-					 */
-					for (ProcessorOutputPort op : p.getOutputPorts()) {
-						Port outputVar = new Port();
-						outputVar.setIdentifier(UUID.randomUUID().toString());
-						outputVar.setProcessorName(pName);
-						outputVar.setProcessorId(provProc.getIdentifier());
-						outputVar.setWorkflowId(dataflowID);
-						outputVar.setPortName(op.getName());
-						outputVar.setDepth(op.getDepth());
-						outputVar.setInputPort(false);
-
-						vars.add(outputVar);
-					}
-				}
-
-				/*
-				 * check for nested structures: if the activity is
-				 * DataflowActivity then this processor is a nested workflow;
-				 * make an entry into wfNesting map with its ID and recurse on
-				 * the nested workflow
-				 */
-
-				if (activities != null)
-					for (Activity<?> a : activities) {
-						if (!(a instanceof NestedDataflow))
-							continue;
-
-						Dataflow nested = ((NestedDataflow) a)
-								.getNestedDataflow();
-						wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent
-
-						// RECURSIVE CALL
-						processDataflowStructure(nested,
-								nested.getIdentifier(), p.getLocalName());
-					}
-			} // end for each processor
-
-			// add inputs to entire dataflow
-			String pName = INPUT_CONTAINER_PROCESSOR;  // overridden -- see below
-
-			/*
-			 * check whether we are processing a nested workflow. in this case
-			 * the input vars are not assigned to the INPUT processor but to the
-			 * containing dataflow
-			 */
-			if (! alreadyInDb) {
-				if (externalName != null) // override the default if we are nested or someone external name is provided
-					pName = externalName;
-
-				for (DataflowInputPort ip : df.getInputPorts()) {
-					Port inputVar = new Port();
-					inputVar.setIdentifier(UUID.randomUUID().toString());
-					inputVar.setProcessorId(null); // meaning workflow port
-					inputVar.setProcessorName(pName);
-					inputVar.setWorkflowId(dataflowID);
-					inputVar.setPortName(ip.getName());
-					inputVar.setDepth(ip.getDepth());
-					inputVar.setInputPort(true);  // CHECK PM modified 11/08 -- input vars are actually outputs of input processors...
-
-					vars.add(inputVar);
-				}
-
-				// add outputs of entire dataflow
-				pName = OUTPUT_CONTAINER_PROCESSOR;  // overridden -- see below
-
-				/*
-				 * check whether we are processing a nested workflow. in this
-				 * case the output vars are not assigned to the OUTPUT processor
-				 * but to the containing dataflow
-				 */
-				if (externalName != null) // we are nested
-					pName = externalName;
-
-				for (DataflowOutputPort op : df.getOutputPorts()) {
-					Port outputVar = new Port();
-					outputVar.setIdentifier(UUID.randomUUID().toString());
-					outputVar.setProcessorId(null); // meaning workflow port
-					outputVar.setProcessorName(pName);
-					outputVar.setWorkflowId(dataflowID);
-					outputVar.setPortName(op.getName());
-					outputVar.setDepth(op.getDepth());
-					outputVar.setInputPort(false);  // CHECK PM modified 11/08 -- output vars are actually outputs of output processors...
-					vars.add(outputVar);
-				}
-
-				pw.addPorts(vars, dataflowID);
-				makePortMapping(vars);
-
-				/*
-				 * add datalink records using the dataflow links retrieving the
-				 * processor names requires navigating from links to source/sink
-				 * and from there to the processors
-				 */
-				for (Datalink l : df.getLinks()) {
-					// TODO cover the case of datalinks from an input and to an output to the entire dataflow
-
-					Port sourcePort = null;
-					Port destinationPort = null;
-
-					OutputPort source = l.getSource();
-					if (source instanceof ProcessorOutputPort) {
-						String sourcePname = ((ProcessorOutputPort) source)
-								.getProcessor().getLocalName();
-						sourcePort = lookupPort(sourcePname, source.getName(), false);
-					} else if (source instanceof MergeOutputPort) {
-						// TODO: Handle merge output ports
-					} else
-						// Assume it is internal port from DataflowInputPort
-						sourcePort = lookupPort(externalName, source.getName(), true);
-
-					InputPort sink = l.getSink();
-					if (sink instanceof ProcessorInputPort) {
-						String sinkPname = ((ProcessorInputPort) sink)
-								.getProcessor().getLocalName();
-						destinationPort = lookupPort(sinkPname, sink.getName(), true);
-					} else if (sink instanceof MergeInputPort) {
-						// TODO: Handle merge input ports
-					} else
-						// Assume it is internal port from DataflowOutputPort
-						destinationPort = lookupPort(externalName, sink.getName(), false);
-
-					if (sourcePort != null && destinationPort != null)
-						pw.addDataLink(sourcePort, destinationPort, dataflowID);
-					else
-						logger.info("Can't record datalink " + l);
-				}
-			}
-		} catch (Exception e) {
-			logger.error("Problem processing provenance for dataflow", e);
-		}
-
-		return dataflowID;
-	}
-
-	private void makePortMapping(List<Port> ports) {
-		mapping = new HashMap<>();
-		for (Port port: ports) {
-			String key = port.getProcessorName()
-					+ (port.isInputPort() ? "/i:" : "/o:") + port.getPortName();
-			mapping.put(key, port);
-		}
-	}
-
-	private Port lookupPort(String processorName, String portName, boolean isInputPort) {
-		String key = processorName + (isInputPort ? "/i:" : "/o:") + portName;
-		return mapping.get(key);
-	}
-
-	/**
-	 * processes an elementary process execution event from T2. Collects info
-	 * from events as they happen and sends them to the writer for processing
-	 * when the iteration event is received. Uses the map of procBindings to
-	 * process event id and the map of child ids to parent ids to ensure that
-	 * the correct proc binding is used
-	 * @param currentWorkflowID
-	 *
-	 * @param d
-	 * @param context
-	 */
-	public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) {
-		switch (provenanceItem.getEventType()) {
-		case PROCESS_EVENT_TYPE: {
-			String parentId = provenanceItem.getParentId();  // this is the workflowID
-			String identifier = provenanceItem.getIdentifier();  // use this as workflowRunId if this is the top-level process
-			
-			parentChildMap.put(identifier, parentId);
-			ProcessorBinding pb = new ProcessorBinding();
-			pb.setWorkflowRunId(getWorkflowRunId());
-			pb.setWorkflowId(currentWorkflowID);
-			procBindingMap.put(identifier, pb);
-			return;
-		}
-		case PROCESSOR_EVENT_TYPE: {
-			String identifier = provenanceItem.getIdentifier();
-			String parentId = provenanceItem.getParentId();
-			String processID = provenanceItem.getProcessId(); // this is the external process ID
-
-			// this has the weird form facade0:dataflowname:pname  need to extract pname from here
-			String[] processName = processID.split(":");
-			procBindingMap.get(parentId).setProcessorName(
-					processName[processName.length - 1]);
-			// 3rd component of composite name
-
-			parentChildMap.put(identifier, parentId);
-			return;
-		}
-		case ACTIVITY_EVENT_TYPE: {
-			String identifier = provenanceItem.getIdentifier();
-			String parentId = provenanceItem.getParentId();
-			procBindingMap.get(parentChildMap.get(parentId))
-					.setFirstActivityClassName(identifier);
-			parentChildMap.put(identifier, parentId);
-			return;
-		}
-		case ITERATION_EVENT_TYPE: {
-			IterationProvenanceItem iterationProvenanceItem = (IterationProvenanceItem)provenanceItem;
-			if (iterationProvenanceItem.getParentIterationItem() != null)
-				// Skipping pipelined outputs, we'll process the parent output later instead
-				return;
-
-			// traverse up to root to retrieve ProcBinding that was created when we saw the process event
-			String activityID = provenanceItem.getParentId();
-			String processorID = parentChildMap.get(activityID);
-			String processID = parentChildMap.get(processorID);
-			String iterationID = provenanceItem.getIdentifier();
-			parentChildMap.put(iterationID, activityID);
-
-			ProcessorEnactment processorEnactment = processorEnactmentMap
-					.get(iterationID);
-			if (processorEnactment == null)
-				processorEnactment = new ProcessorEnactment();
-
-			ProcessorBinding procBinding = procBindingMap.get(processID);
-
-			String itVector = extractIterationVector(iterationToString(iterationProvenanceItem
-					.getIteration()));
-			procBinding.setIterationVector(itVector);
-
-			processorEnactment.setEnactmentStarted(iterationProvenanceItem
-					.getEnactmentStarted());
-			processorEnactment.setEnactmentEnded(iterationProvenanceItem
-					.getEnactmentEnded());
-			processorEnactment.setWorkflowRunId(workflowRunId);
-			processorEnactment.setIteration(itVector);
-
-			String processId = iterationProvenanceItem.getProcessId();
-			String parentProcessId = parentProcess(processId, 3);
-			if (parentProcessId != null) {
-				ProcessorEnactment parentProcEnact = getWfdp().invocationProcessToProcessEnactment
-						.get(parentProcessId);
-				if (parentProcEnact != null)
-					processorEnactment
-							.setParentProcessorEnactmentId(parentProcEnact
-									.getProcessEnactmentId());
-			}
-			processorEnactment.setProcessEnactmentId(iterationProvenanceItem
-					.getIdentifier());
-			processorEnactment.setProcessIdentifier(processId);
-
-			ProvenanceProcessor provenanceProcessor;
-			if (processorEnactment.getProcessorId() == null) {
-				provenanceProcessor = pq.getProvenanceProcessorByName(
-						currentWorkflowID, procBinding.getProcessorName());
-				if (provenanceProcessor == null)
-					// already logged warning
-					return;
-				processorMapById.put(provenanceProcessor.getIdentifier(),
-						provenanceProcessor);
-				processorEnactment.setProcessorId(provenanceProcessor
-						.getIdentifier());
-			} else {
-				provenanceProcessor = processorMapById.get(processorEnactment
-						.getProcessorId());
-				if (provenanceProcessor == null) {
-					provenanceProcessor = pq
-							.getProvenanceProcessorById(processorEnactment
-									.getProcessorId());
-					processorMapById.put(provenanceProcessor.getIdentifier(),
-							provenanceProcessor);
-				}
-			}
-
-			InputDataProvenanceItem inputDataEl = iterationProvenanceItem.getInputDataItem();
-			OutputDataProvenanceItem outputDataEl = iterationProvenanceItem.getOutputDataItem();
-
-			if (inputDataEl != null
-					&& processorEnactment.getInitialInputsDataBindingId() == null) {
-				processorEnactment
-						.setInitialInputsDataBindingId(processDataBindings(
-								inputDataEl, provenanceProcessor));
-				processInput(inputDataEl, procBinding, currentWorkflowID);
-			}
-
-			if (outputDataEl != null
-					&& processorEnactment.getFinalOutputsDataBindingId() == null) {
-				processorEnactment
-						.setFinalOutputsDataBindingId(processDataBindings(
-								outputDataEl, provenanceProcessor));
-				processOutput(outputDataEl, procBinding, currentWorkflowID);
-			}
-
-			try {
-				if (processorEnactmentMap.containsKey(iterationID)) {
-					getPw().updateProcessorEnactment(processorEnactment);
-				} else {
-					getPw().addProcessorEnactment(processorEnactment);
-					processorEnactmentMap.put(iterationID, processorEnactment);
-				}
-			} catch (SQLException e) {
-				logger.warn("Could not store processor enactment", e);
-			}
-			return;
-		}
-		case END_WORKFLOW_EVENT_TYPE: {
-			DataflowRunComplete completeEvent = (DataflowRunComplete) provenanceItem;
-			// use this event to do housekeeping on the input/output varbindings
-
-			// process the input and output values accumulated by WorkflowDataProcessor
-			getWfdp().processTrees(completeEvent, getWorkflowRunId());
-
-			reconcileLocalOutputs(provenanceItem.getWorkflowId());
-
-			if (! provenanceItem.getProcessId().contains(":")) {
-				// Top-level workflow finished
-				// No longer needed, done by processTrees()
-//				patchTopLevelnputs();
-
-				workflowStructureDone = false; // CHECK reset for next run...
-//				reconcileTopLevelOutputs(); // Done by reconcileLocalOutputs
-				getPw().closeCurrentModel();  // only real impl is for RDF
-			}
-			return;
-		}
-		case WORKFLOW_DATA_EVENT_TYPE: {
-			// give this event to a WorkflowDataProcessor object for pre-processing
-			//			try {
-			// TODO may generate an exception when the data is an error CHECK
-			getWfdp().addWorkflowDataItem(provenanceItem);
-			//			} catch (NumberFormatException e) {
-			//			logger.error(e);
-			//			}
-			//			logger.info("Received workflow data - not processing");
-			//FIXME not sure  - needs to be stored somehow
-			return;
-		}
-		case INVOCATION_STARTED_EVENT_TYPE: {
-			InvocationStartedProvenanceItem startedItem = (InvocationStartedProvenanceItem) provenanceItem;
-			ProcessorEnactment processorEnactment = processorEnactmentMap
-					.get(startedItem.getParentId());
-			if (processorEnactment == null) {
-				logger.error("Could not find ProcessorEnactment for invocation "
-						+ startedItem);
-				return;
-			}
-			getWfdp().invocationProcessToProcessEnactment.put(
-					startedItem.getInvocationProcessId(), processorEnactment);
-			return;
-		}
-		case ERROR_EVENT_TYPE:
-			//TODO process the error
-			return;
-		default:
-			// TODO broken, should we throw something here?
-			return;
-		}
-	}
-
-	private String processDataBindings(
-			DataProvenanceItem provenanceItem, ProvenanceProcessor provenanceProcessor) {
-		// TODO: Cache known provenaneItems and avoid registering again
-		String dataBindingId = UUID.randomUUID().toString();
-		boolean isInput = provenanceItem instanceof InputDataProvenanceItem;
-
-		for (Entry<String, T2Reference> entry : provenanceItem.getDataMap().entrySet()) {
-			DataBinding dataBinding = new DataBinding();
-			dataBinding.setDataBindingId(dataBindingId);
-			Port port = findPort(provenanceProcessor, entry.getKey(), isInput); // findPort
-			if (port == null) {
-				logger.warn("Could not find port for " + entry.getKey());
-				continue;
-			}
-			dataBinding.setPort(port);
-			dataBinding.setT2Reference(entry.getValue().toUri().toASCIIString());
-			dataBinding.setWorkflowRunId(workflowRunId);
-			try {
-				getPw().addDataBinding(dataBinding);
-			} catch (SQLException e) {
-				logger.warn("Could not register data binding for " + port, e);
-			}
-		}
-		return dataBindingId;
-	}
-
-	private Port findPort(ProvenanceProcessor provenanceProcessor,
-			String portName, boolean isInput) {
-		// TODO: Query pr dataflow and cache
-		Map<String, String> queryConstraints = new HashMap<>();
-		queryConstraints.put("V.workflowId",
-				provenanceProcessor.getWorkflowId());
-		String processorName = provenanceProcessor.getProcessorName();
-		queryConstraints.put("processorName", processorName);
-		queryConstraints.put("portName", portName);
-		queryConstraints.put("isInputPort", isInput ? "1" : "0");
-		try {
-			List<Port> vars = pq.getPorts(queryConstraints);
-			if (vars.isEmpty()) {
-				logger.warn("Can't find port " + portName + " in "
-						+ processorName);
-			} else if (vars.size() > 1) {
-				logger.warn("Multiple matches for port " + portName + " in "
-						+ processorName + ", got:" + vars);
-			} else
-				return vars.get(0);
-		} catch (SQLException e) {
-			logger.error(
-					"Problem getting ports for processor: " + processorName
-							+ " worflow: "
-							+ provenanceProcessor.getWorkflowId(), e);
-		}
-		return null;
-	}
-
-
-	/**
-	 * fills in the VBs for the global inputs -- this removes the need for explicit events
-	 * that account for these value bindings...
-	 */
-	public void patchTopLevelnputs() {
-
-		// for each input I to topLevelDataflow:
-		// pick first outgoing datalink with sink P:X
-		// copy value X to I -- this can be a collection, so copy everything
-
-		// get all global input vars
-
-		//		logger.info("\n\n BACKPATCHING GLOBAL INPUTS with dataflowDepth = "+dataflowDepth+"*******\n");
-
-		List<Port> inputs=null;
-		try {
-			inputs = getPq().getInputPorts(topLevelDataflowName, topLevelDataflowID);
-
-			for (Port input:inputs)  {
-
-				//				logger.info("global input: "+input.getVName());
-
-				Map<String,String> queryConstraints = new HashMap<String,String>();
-
-//				queryConstraints.put("sourcePortName", input.getVName());
-//				queryConstraints.put("sourceProcessorName", input.getPName());
-				queryConstraints.put("sourcePortId", input.getIdentifier());
-				queryConstraints.put("workflowId", input.getWorkflowId());
-				List<DataLink> outgoingDataLinks = getPq().getDataLinks(queryConstraints);
-
-				// any datalink will do, use the first
-				String targetPname = outgoingDataLinks.get(0).getDestinationProcessorName();
-				String targetVname = outgoingDataLinks.get(0).getDestinationPortName();
-
-//				logger.info("copying values from ["+targetPname+":"+targetVname+"] for instance ID: ["+workflowRunId+"]");
-
-				queryConstraints.clear();
-				queryConstraints.put("V.portName", targetVname);
-				queryConstraints.put("V.processorName", targetPname);
-				queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
-				queryConstraints.put("V.workflowId", topLevelDataflowID);
-
-				for (PortBinding vb : getPq().getPortBindings(queryConstraints)) {
-					PortBinding inputPortBinding = new PortBinding(vb);
-
-					// insert PortBinding back into VB with the global input portName
-					inputPortBinding.setProcessorName(input.getProcessorName());
-					inputPortBinding.setPortName(input.getPortName());
-					try {
-						getPw().addPortBinding(inputPortBinding);
-					} catch (SQLException ex) {
-						logger.info("Already logged port binding", ex);
-					}
-				}
-			}
-		} catch (SQLException e) {
-			logger.warn("Patch top level inputs problem for provenance", e);
-		} catch (IndexOutOfBoundsException e) {
-			logger.error("Could not patch top level", e);
-		}
-	}
-
-	public void reconcileTopLevelOutputs() {
-		reconcileLocalOutputs(topLevelDataflowID);
-	}
-
-	// PM added 23/4/09
-	/**
-	 * reconcile the top level outputs with the results from its immediate precedessors in the graph.<br/>
-	 * various cases have to be considered: predecessors may include records that are not in the output,
-	 * while the output may include nested list structures that are not in the precedessors. This method accounts
-	 * for a 2-way reconciliation that considers all possible cases.<br/>
-	 * at the end, outputs and their predecessors contain the same data.<p/>
-	 * NOTE: if we assume that data values (URIs) are <em>always</em> unique then this is greatly simplified by just
-	 * comparing two sets of value records by their URIs and reconciling them. But this is not the way it is done here
-	 */
-	public void reconcileLocalOutputs(String dataflowID) {
-		/*
-	for each output O
-
-	for each variable V in predecessors(O)
-
-	fetch all VB records for O into list OValues
-	fetch all VB records for V  into list Yalues
-
-	compare OValues and VValues:
-	it SHOULD be the case that OValues is a subset of YValues. Under this assumption:
-
-	for each vb in YValues:
-	- if there is a matching o in OValues then (vb may be missing collection information)
-	    copy o to vb
-	  else
-	    if vb has no collection info && there is a matching tree node tn  in OTree (use iteration index for the match) then
-	       set vb to be in collection tb
-	       copy vb to o
-
-     finally copy all Collection records for O in OTree -- catch duplicate errors
-		 */
-
-		Map<String, String> queryConstraints = new HashMap<>();
-
-		try {
-			// for each output O
-			for (Port output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID))  {
-				// collect all VBs for O
-//				String oPName = output.getPName();
-//				String oVName = output.getVName();
-//				queryConstraints.put("V.portName", oVName);
-//				queryConstraints.put("V.processorName", oPName);
-//				queryConstraints.put("VB.workflowRunId", workflowRunId);
-//				queryConstraints.put("V.workflowId", topLevelDataflowID);
-
-//				List<PortBinding> OValues = pq.getPortBindings(queryConstraints);
-
-				// find all records for the immediate precedessor Y of O
-				queryConstraints.clear();
-//				queryConstraints.put("destinationPortName", output.getVName());
-//				queryConstraints.put("destinationProcessorName", output.getPName());
-				queryConstraints.put("destinationPortId", output.getIdentifier());
-				queryConstraints.put("workflowId", output.getWorkflowId());
-				List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
-
-				// there can be only one -- but check that there is one!
-				if (incomingDataLinks.isEmpty())
-					continue;
-
-				String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
-				String sourceVname = incomingDataLinks.get(0).getSourcePortName();
-
-				queryConstraints.clear();
-				queryConstraints.put("V.portName", sourceVname);
-				queryConstraints.put("V.processorName", sourcePname);
-				queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
-				queryConstraints.put("V.workflowId", topLevelDataflowID);
-
-				List<PortBinding> YValues = pq.getPortBindings(queryConstraints);
-
-				// for each YValue look for a match in OValues
-				// (assume the YValues values are a superset of OValues)!)
-
-				for (PortBinding yValue:YValues) {
-					// look for a matching record in PortBinding for output O
-					queryConstraints.clear();
-					queryConstraints.put("V.portName", output.getPortName());
-					queryConstraints.put("V.processorName", output.getProcessorName());
-					queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
-					queryConstraints.put("V.workflowid", topLevelDataflowID);
-					queryConstraints.put("VB.iteration", yValue.getIteration());
-					if (yValue.getCollIDRef()!= null) {
-						queryConstraints.put("VB.collIDRef", yValue.getCollIDRef());
-						queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl()));
-					}
-					List<PortBinding> matchingOValues = pq.getPortBindings(queryConstraints);
-
-					// result at most size 1
-					if (!matchingOValues.isEmpty()) {
-						PortBinding oValue = matchingOValues.get(0);
-
-						// copy collection info from oValue to yValue
-						yValue.setCollIDRef(oValue.getCollIDRef());
-						yValue.setPositionInColl(oValue.getPositionInColl());
-
-						pw.updatePortBinding(yValue);
-					} else {
-						// copy the yValue to O
-						// insert PortBinding back into VB with the global output portName
-						yValue.setProcessorName(output.getProcessorName());
-						yValue.setPortName(output.getPortName());
-						pw.addPortBinding(yValue);
-					}
-
-				} // for each yValue in YValues
-
-				// copy all Collection records for O to Y
-
-				// get all collections refs for O
-				queryConstraints.clear();
-				queryConstraints.put("workflowRunId", getWorkflowRunId());
-				queryConstraints.put("processorNameRef", output.getProcessorName());
-				queryConstraints.put("portName", output.getPortName());
-
-				List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints);
-
-				// insert back as collection refs for Y -- catch duplicates
-				for (NestedListNode nln:oCollections) {
-					nln.setProcessorName(sourcePname);
-					nln.setProcessorName(sourceVname);
-
-					getPw().replaceCollectionRecord(nln, sourcePname, sourceVname);
-				}
-
-			} // for each output var
-
-		} catch (SQLException e) {
-			logger.warn("Problem reconciling top level outputs", e);
-		}
-
-	}
-
-	@SuppressWarnings("unchecked")
-	private void processOutput(OutputDataProvenanceItem provenanceItem,
-			ProcessorBinding procBinding, String currentWorkflowID) {
-		Element dataItemAsXML = getDataItemAsXML(provenanceItem);
-		List<Element> outputPorts = dataItemAsXML.getChildren("port");
-		for (Element outputport : outputPorts) {
-			String portName = outputport.getAttributeValue("name");
-
-			// value type may vary
-			List<Element> valueElements = outputport.getChildren();
-			if (valueElements != null && !valueElements.isEmpty()) {
-				Element valueEl = valueElements.get(0); // only really 1 child
-
-				processPortBinding(valueEl, procBinding.getProcessorName(),
-						portName, procBinding.getIterationVector(),
-						getWorkflowRunId(), currentWorkflowID);
-			}
-		}
-	}
-
-	/**
-	 * this method reconciles values in varBindings across an datalink: Firstly,
-	 * if vb's value is within a collection, _and_ it is copied from a value
-	 * generated during a previous iteration, then this method propagates the
-	 * list reference to that iteration value, which wouldn't have it.
-	 * Conversely, if vb is going to be input to an iteration, then it's lost
-	 * its containing list node, and we put it back in by looking at the
-	 * corresponding predecessor
-	 * 
-	 * @param vb
-	 * @throws SQLException
-	 */
-	private void backpatchIterationResults(List<PortBinding> newBindings) throws SQLException {
-		logger.debug("backpatchIterationResults: start");
-		for (PortBinding vb : newBindings) {
-			logger.debug("backpatchIterationResults: processing vb "
-					+ vb.getProcessorName() + "/" + vb.getPortName() + "="
-					+ vb.getValue());
-
-			if (vb.getCollIDRef()!= null) // this is a member of a collection
-				logger.debug("...which is inside a collection ");
-
-			// look for its antecedent
-			Map<String,String> queryConstraints = new HashMap<>();
-			queryConstraints.put("destinationPortName", vb.getPortName());
-			queryConstraints.put("destinationProcessorName", vb.getProcessorName());
-			queryConstraints.put("workflowId", pq.getWorkflowIdsForRun(vb.getWorkflowRunId()).get(0));  // CHECK picking first element in list...
-			List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
-
-			// there can be only one -- but check that there is one!
-			if (incomingDataLinks.isEmpty())
-				return;
-
-			String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
-			String sourceVname = incomingDataLinks.get(0).getSourcePortName();
-
-			logger.debug("antecedent: "+sourcePname+":"+sourceVname);
-
-			// get the varbindings for this port and select the one with the same iteration vector as its successor
-			queryConstraints.clear();
-			queryConstraints.put("VB.portName", sourceVname);
-			queryConstraints.put("V.processorName", sourcePname);
-			queryConstraints.put("VB.value", vb.getValue());
-			queryConstraints.put("VB.workflowRunId", vb.getWorkflowRunId());
-
-			// reconcile
-			for (PortBinding b : pq.getPortBindings(queryConstraints)) {
-				logger.debug("backpatching " + sourceVname + " " + sourcePname);
-
-				if (vb.getCollIDRef() != null && b.getCollIDRef() == null) {
-					logger.debug("successor " + vb.getPortName()
-							+ " is in collection " + vb.getCollIDRef()
-							+ " but pred " + b.getPortName() + " is not");
-					logger.debug("putting " + b.getPortName()
-							+ " in collection " + vb.getCollIDRef()
-							+ " at pos " + vb.getPositionInColl());
-					b.setCollIDRef(vb.getCollIDRef());
-					b.setPositionInColl(vb.getPositionInColl());
-					getPw().updatePortBinding(b);
-
-				} else if (vb.getCollIDRef() == null && b.getCollIDRef() != null) {
-					logger.debug("successor " + vb.getPortName()
-							+ " is NOT in collection but pred "
-							+ b.getPortName() + " IS");
-					logger.debug("putting " + vb.getPortName()
-							+ " in collection " + b.getCollIDRef() + " at pos "
-							+ b.getPositionInColl());
-					vb.setCollIDRef(b.getCollIDRef());
-					vb.setPositionInColl(b.getPositionInColl());
-					getPw().updatePortBinding(vb);
-				}
-			}
-		}
-	}
-
-
-	/**
-	 * create one new PortBinding record for each input port binding
-	 * @param currentWorkflowID
-	 */
-	@SuppressWarnings("unchecked")
-	private void processInput(InputDataProvenanceItem provenanceItem,
-			ProcessorBinding procBinding, String currentWorkflowID) {
-		Element dataItemAsXML = getDataItemAsXML(provenanceItem);
-		int order = 0;
-		for (Element inputport : (List<Element>) dataItemAsXML.getChildren("port")) {
-			String portName = inputport.getAttributeValue("name");
-
-			try {
-				// add process order sequence to Port for this portName
-				Map<String, String> queryConstraints = new HashMap<>();
-				queryConstraints.put("V.workflowId", currentWorkflowID);
-				queryConstraints.put("processorName", procBinding.getProcessorName());
-				queryConstraints.put("portName", portName);
-				queryConstraints.put("isInputPort", "1");
-
-				Port v = getPq().getPorts(queryConstraints).get(0);
-				v.setIterationStrategyOrder(order++);
-				getPw().updatePort(v);
-			} catch (IndexOutOfBoundsException e) {
-				logger.error("Could not process input " + portName, e);
-			} catch (SQLException e1) {
-				logger.error("Could not process input " + portName, e1);
-			}
-
-			// value type may vary
-			List<Element> valueElements = inputport.getChildren(); // hopefully
-			// in the right order...
-			if (valueElements != null && valueElements.size() > 0) {
-				Element valueEl = valueElements.get(0); // expect only 1 child
-				//				processVarBinding(valueEl, processor, portName, iterationVector,
-				//				dataflow);
-
-				List<PortBinding> newBindings = processPortBinding(valueEl,
-						procBinding.getProcessorName(), portName,
-						procBinding.getIterationVector(), getWorkflowRunId(),
-						currentWorkflowID);
-				// this is a list whenever valueEl is of type list: in this case processVarBinding recursively
-				// processes all values within the collection, and generates one PortBinding record for each of them
-
-				allInputVarBindings.addAll(newBindings);
-
-				//				// if the new binding involves list values, then check to see if they need to be propagated back to
-//				// results of iterations
-
-				// Backpatching disabled as it is very inefficient and not needed
-				// for current Taverna usage
-
-				try {
-					if (backpatching)
-						backpatchIterationResults(newBindings);
-				} catch (SQLException e) {
-					logger.warn("Problem with back patching iteration results", e);
-				}
-			} else {
-				if (valueElements != null)
-					logger.debug("port name " + portName + "  "
-							+ valueElements.size());
-				else
-					logger.debug("valueElements is null for port name "
-							+ portName);
-			}
-		}
-	}
-
-	/**
-	 * capture the default case where the value is not a list
-	 *
-	 * @param valueEl
-	 * @param processorId
-	 * @param portName
-	 * @param iterationId
-	 * @param workflowRunId
-	 * @param currentWorkflowID
-	 */
-	private List<PortBinding> processPortBinding(Element valueEl,
-			String processorId, String portName, String iterationId,
-			String workflowRunId, String currentWorkflowID) {
-		// uses the defaults:
-		// collIdRef = null
-		// parentcollectionRef = null
-		// positionInCollection = 1
-		return processPortBinding(valueEl, processorId, portName, null, 1, null,
-				iterationId, workflowRunId, null, currentWorkflowID);
-	}
-
-	/**
-	 * general case where value can be a list
-	 * @param valueEl
-	 * @param processorId
-	 * @param portName
-	 * @param collIdRef
-	 * @param positionInCollection
-	 * @param parentCollectionRef
-	 * @param iterationId
-	 * @param workflowRunId
-	 * @param currentWorkflowID
-	 */
-	@SuppressWarnings("unchecked")
-	private List<PortBinding> processPortBinding(Element valueEl,
-			String processorId, String portName, String collIdRef,
-			int positionInCollection, String parentCollectionRef,
-			String iterationId, String workflowRunId, String itVector,
-			String currentWorkflowID) {
-		List<PortBinding> newBindings = new ArrayList<>();
-
-		String valueType = valueEl.getName();
-		//		logger.info("value element for " + processorId + ": "
-		//		+ valueType);
-
-		String iterationVector = (itVector == null ? extractIterationVector(iterationId)
-				: itVector);
-
-		PortBinding vb = new PortBinding();
-
-		vb.setWorkflowId(currentWorkflowID);
-		vb.setWorkflowRunId(workflowRunId);
-		vb.setProcessorName(processorId);
-		vb.setValueType(valueType);
-		vb.setPortName(portName);
-		vb.setCollIDRef(collIdRef);
-		vb.setPositionInColl(positionInCollection);
-
-		newBindings.add(vb);
-
-		if (valueType.equals("literal")) {
-			try {
-				vb.setIteration(iterationVector);
-				vb.setValue(valueEl.getAttributeValue("id"));
-				logger.debug("new input VB with workflowId="+currentWorkflowID+" processorId="+processorId+
-						" valueType="+valueType+" portName="+portName+" collIdRef="+collIdRef+
-						" position="+positionInCollection+" itvector="+iterationVector+
-						" value="+vb.getValue());
-				getPw().addPortBinding(vb);
-			} catch (SQLException e) {
-				logger.warn("Process Port Binding problem with provenance", e);
-			}
-
-		} else if (valueType.equals("referenceSet")) {
-			vb.setIteration(iterationVector);
-			vb.setValue(valueEl.getAttributeValue("id"));
-			vb.setReference(valueEl.getChildText("reference"));
-
-			logger.debug("new input VB with workflowId=" + currentWorkflowID
-					+ " processorId=" + processorId + " valueType=" + valueType
-					+ " portName=" + portName + " collIdRef=" + collIdRef
-					+ " position=" + positionInCollection + " itvector="
-					+ iterationVector + " value=" + vb.getValue());
-
-			try {
-				getPw().addPortBinding(vb);
-			} catch (SQLException e) {
-				logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e);
-				// try to update the existing record instead using the current collection info
-
-				getPw().updatePortBinding(vb);
-			}
-
-		} else if (valueType.equals("list")) {
-			logger.debug("input of type list");
-
-			// add entries to the Collection and to the PortBinding tables
-			// list id --> Collection.collId
-
-			String collId = valueEl.getAttributeValue("id");
-			try {
-				parentCollectionRef = getPw().addCollection(processorId, collId,
-						parentCollectionRef, iterationVector, portName,
-						workflowRunId);
-
-				// iterate over each list element
-				List<Element> listElements = valueEl.getChildren();
-
-				positionInCollection = 1;  // also use this as a suffix to extend the iteration vector
-
-				// extend iteration vector to account for additional levels within the list
-
-				String originalIterationVector = iterationVector;
-
-				// children can be any base type, including list itself -- so
-				// use recursion
-				for (Element el : listElements) {
-					if (originalIterationVector.length() > 2) // vector is not empty
-						iterationVector = originalIterationVector.substring(0,
-								originalIterationVector.length()-1) + ","+
-								Integer.toString(positionInCollection-1) + "]";
-					else
-						iterationVector = "["+ (positionInCollection-1) + "]";
-
-					newBindings.addAll(processPortBinding(el, processorId,
-							portName, collId, positionInCollection,
-							parentCollectionRef, iterationId, workflowRunId,
-							iterationVector, currentWorkflowID));
-
-					positionInCollection++;
-				}
-			} catch (SQLException e) {
-				logger.warn("Problem processing var binding", e);
-			}
-		} else if (valueType.equals("error")) {
-			vb.setIteration(iterationVector);
-			vb.setValue(valueEl.getAttributeValue("id"));
-			vb.setReference(valueEl.getChildText("reference"));
-			try {
-				getPw().addPortBinding(vb);
-			} catch (SQLException e) {
-				logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e);
-				// try to update the existing record instead using the current collection info
-
-				getPw().updatePortBinding(vb);
-			}
-		} else {
-			logger.warn("unrecognized value type element for "
-					+ processorId + ": " + valueType);
-		}
-
-		return newBindings;
-	}
-
-
-	/**
-	 * OBSOLETE: returns the iteration vector x,y,z,... from [x,y,z,...]
-	 * <p/>
-	 * now returns the vector itself -- this is still experimental
-	 *
-	 * @param iteration
-	 * @return
-	 */
-	@Deprecated
-	String extractIterationVector(String iteration) {
-		return iteration;
-	}
-
-	/**
-	 * silly class to hold pairs of strings. any better way??
-	 * @author paolo
-	 *
-	 */
-	class Pair {
-		String v1, v2;
-
-		public Pair(String current, String workflowId) {
-			v1=current; v2=workflowId;
-		}
-
-		/**
-		 * @return the v1
-		 */
-		public String getV1() {
-			return v1;
-		}
-
-		/**
-		 * @param v1 the v1 to set
-		 */
-		public void setV1(String v1) {
-			this.v1 = v1;
-		}
-
-		/**
-		 * @return the v2
-		 */
-		public String getV2() {
-			return v2;
-		}
-
-		/**
-		 * @param v2 the v2 to set
-		 */
-		public void setV2(String v2) {
-			this.v2 = v2;
-		}
-	}
-
-	@SuppressWarnings("deprecation")
-	public List<Pair> toposort(String dataflowName, String workflowRunId) throws SQLException  {
-
-//		String workflowId = pq.getworkflowIdForDataflow(dataflowName, workflowRunId);
-		String workflowId = pq.getWorkflowIdForExternalName(dataflowName);
-
-		// fetch processors along with the count of their predecessors
-		Map<String, Integer> predecessorsCount = getPq().getPredecessorsCount(workflowRunId);
-		Map<String, List<String>> successorsOf = new HashMap<String, List<String>>();
-//		List<String> procList = pq.getContainedProcessors(dataflowName, workflowRunId);
-		List<String> procList = pq.getContainedProcessors(dataflowName);
-
-		for (String s:procList) {
-			List<String> successors = getPq().getSuccProcessors(s, workflowId, workflowRunId);
-			successorsOf.put(s, successors);
-		}
-
-		List<Pair>  sorted = tsort(procList, dataflowName, predecessorsCount, successorsOf, workflowId, workflowRunId);
-
-		for (int i=0; i< sorted.size(); i++) {
-			String procName = sorted.get(i).getV1();
-
-			if (pq.isDataflow(procName) && !procName.equals(dataflowName)) {  // handle weirdness: a dataflow is contained within itself..
-				// recurse on procName
-				List<Pair> sortedSublist = toposort(procName, workflowRunId);
-
-				// replace procName with sortedSublist in sorted
-				sorted.remove(i);
-				sorted.addAll(i, sortedSublist);
-			}
-		}
-		return sorted;
-	}
-
-
-
-	/**
-	 * @param procList
-	 * @param predecessorsCount
-	 * @param successorsOf
-	 * @param workflowRunId
-	 * @return
-	 * @throws SQLException
-	 */
-	public List<Pair> tsort(List<String> procList, String dataflowName,
-			Map<String, Integer> predecessorsCount,
-			Map<String, List<String>> successorsOf, String workflowId,
-			String workflowRunId) throws SQLException {
-		List<Pair> l = new ArrayList<>();		// holds sorted elements
-		List<String> q = new ArrayList<>(); 	// temp queue
-
-		// init queue with procList processors that have no predecessors
-		for (String proc:procList)
-			if (predecessorsCount.get(proc) == null || predecessorsCount.get(proc) == 0 &&
-					!proc.equals(dataflowName))
-				q.add(proc);
-
-		while (!q.isEmpty()) {
-			String current = q.remove(0);
-			l.add(new Pair(current, workflowId));
-
-			List<String> successors = successorsOf.get(current);
-
-			if (successors == null)
-				continue;
-
-			// reduce the number of predecessors to each of the successors by one
-			// NB we must traverse an additional datalink through a nested workflow input if the successor is a dataflow!!
-			for (String succ : successors) {
-				// decrease edge count for each successor processor
-				predecessorsCount.put(succ, predecessorsCount.get(succ) - 1);
-
-				if (predecessorsCount.get(succ) == 0 && !succ.equals(dataflowName))
-					q.add(succ);
-			}
-		} // end loop on q
-		return l;
-	}
-
-	@SuppressWarnings("deprecation")
-	public void propagateANL(String workflowRunId) throws SQLException {
-		String top = pq.getTopLevelDataflowName(workflowRunId);
-
-		// //////////////////////
-		// PHASE I: toposort the processors in the whole graph
-		// //////////////////////
-		List<Pair> sorted = toposort(top, workflowRunId);
-
-		List<String> sortedProcessors = new ArrayList<>();
-
-		for (Pair p : sorted)
-			sortedProcessors.add(p.getV1());
-
-		logger.debug("final sorted list of processors");
-		for (Pair p : sorted)
-			logger.debug(p.getV1() + " in workflowId " + p.getV2());
-
-		// //////////////////////
-		// PHASE II: traverse and set anl on each port
-		// //////////////////////
-
-		//		// sorted processor names in L at this point
-		//		// process them in order
-		for (Pair pnameInContext : sorted) {
-			//			// process pname's inputs -- set ANL to be the DNL if not set in prior steps
-			String pname     = pnameInContext.getV1();
-			String workflowId = pnameInContext.getV2();
-
-			List<Port> inputs = getPq().getInputPorts(pname, workflowId); // null -> do not use instance (??) CHECK
-
-			int totalANL = 0;
-			for (Port iv : inputs) {
-
-				if (! iv.isResolvedDepthSet()) {
-					iv.setResolvedDepth(iv.getDepth());
-					getPw().updatePort(iv);
-				}
-
-				int delta_nl = iv.getResolvedDepth() - iv.getDepth();
-
-				// if delta_nl < 0 then Taverna wraps the value into a list --> use dnl(X) in this case
-				if (delta_nl < 0 ) delta_nl = 0;// CHECK iv.getTypedepth();
-
-				totalANL += delta_nl;
-
-				// this should take care of the special case of the top level dataflow with inputs that have successors in the graph
-				// propagate this through all the links from this var
-//				List<Port> successors = getPq().getSuccVars(pname, iv.getVName(), workflowRunId);
-
-//				for (Port v : successors) {
-//				v.setresolvedDepth(iv.getresolvedDepth());
-//				getPw().updateVar(v);
-//				}
-			}
-
-			// process pname's outputs -- set ANL based on the sum formula (see
-			// paper)
-			for (Port ov : getPq().getOutputPorts(pname, workflowId)) {
-
-				ov.setResolvedDepth(ov.getDepth() + totalANL);
-
-				logger.debug("anl for "+pname+":"+ov.getPortName()+" = "+(ov.getDepth() + totalANL));
-				getPw().updatePort(ov);
-
-				// propagate this through all the links from this var
-				for (Port v : getPq().getSuccPorts(pname, ov.getPortName(), workflowId)) {
-					List<Port> toBeProcessed = new ArrayList<>();
-					toBeProcessed.add(v);
-
-					if (v.getProcessorId() == null && v.isInputPort()) {  // this is the input to a nested workflow
-//						String tempWorkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
-						String tempWorkflowId = pq
-								.getWorkflowIdForExternalName(v
-										.getProcessorName());
-						List<Port> realSuccessors = getPq().getSuccPorts(
-								v.getProcessorName(), v.getPortName(),
-								tempWorkflowId);
-
-						toBeProcessed.remove(0);
-						toBeProcessed.addAll(realSuccessors);
-
-					} else if (v.getProcessorId() == null && !v.isInputPort()) {  // this is the output to a nested workflow
-//						String tempworkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
-						List<Port> realSuccessors = getPq().getSuccPorts(
-								v.getProcessorName(), v.getPortName(), null);
-
-						toBeProcessed.remove(0);
-						toBeProcessed.addAll(realSuccessors);
-					}
-
-					for (Port v1 : toBeProcessed) {
-						v1.setResolvedDepth(ov.getResolvedDepth());
-						logger.debug("anl for " + v1.getProcessorName() + ":"
-								+ v1.getPortName() + " = "
-								+ ov.getResolvedDepth());
-						getPw().updatePort(v1);
-					}
-				}
-			}
-		}
-	}
-
-	public void setPw(ProvenanceWriter pw) {
-		this.pw = pw;
-	}
-
-	public ProvenanceWriter getPw() {
-		return pw;
-	}
-
-	public void setPq(ProvenanceQuery pq) {
-		this.pq = pq;
-	}
-
-	public ProvenanceQuery getPq() {
-		return pq;
-	}
-
-	public void setWorkflowRunId(String workflowRunId) {
-		this.workflowRunId = workflowRunId;
-	}
-
-	public String getWorkflowRunId() {
-		return workflowRunId;
-	}
-
-	public void setWfdp(WorkflowDataProcessor wfdp) {
-		this.wfdp = wfdp;
-	}
-
-	public WorkflowDataProcessor getWfdp() {
-		return wfdp;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
deleted file mode 100644
index 975a83a..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageQueryResultRecord.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-/**
- * 
- * This Java bean holds a single provenance record, i.e., the finest element of
- * a provenance graph that is stored in the provenance DB. Essentially this
- * represents one data element (value) flowing through a port (vname) of a
- * processor (pname), in the context of one run (wfInstance) of a workflow
- * (wfname). The record may include an <b>iteration</b> vector, used when the
- * same processor receives multiple values on the same port, as part of
- * iterative processing. When the value belongs to a collection (a nested list),
- * the <b>collIdRef</b> field contains a reference to that collection.
- * 
- * @author Paolo Missier
- * 
- */
-public class LineageQueryResultRecord {
-	private String workflowId;
-	private String processorName;
-	private String portName;
-	private String workflowRunId;
-	private String iteration;
-	private String value;     // atomic or XML-formatted collection -- this is actually a reference to the value...
-	private String collIdRef;
-	private String parentCollIDRef;
-	private String resolvedValue;
-	private String type;  // one of referenceSet, referenceSetCollection
-	boolean printResolvedValue;
-	boolean isInput; 
-	boolean isCollection;
-
-	@Override
-	public String toString() {
-		if (isCollection) {
-			return "COLLECTION: proc " + getProcessorName() + " var "
-					+ getPortName() + " " + " iteration: " + getIteration()
-					+ " value: " + getValue() + " collection id: "
-					+ getCollectionT2Reference() + " parent collection: "
-					+ getParentCollIDRef();
-		} else if (printResolvedValue)
-			return "workflow " + getworkflowId() + " proc "
-					+ getProcessorName() + " var " + getPortName() + " "
-					+ " iteration: " + getIteration() + " value: " + getValue()
-					+ " collection id: " + getCollectionT2Reference()
-					+ " resolvedValue: " + getResolvedValue();
-		else
-			return "workflow " + getworkflowId() + " proc "
-					+ getProcessorName() + " var " + getPortName() + " "
-					+ " iteration: " + getIteration() + " collection id: "
-					+ getCollectionT2Reference() + " value: " + getValue();
-	}
-
-	/**
-	 * @return the pname
-	 */
-	public String getProcessorName() {
-		return processorName;
-	}
-	/**
-	 * @param pname the pname to set
-	 */
-	public void setProcessorName(String pname) {
-		this.processorName = pname;
-	}
-	/**
-	 * @return the vname
-	 */
-	public String getPortName() {
-		return portName;
-	}
-	/**
-	 * @param vname the vname to set
-	 */
-	public void setPortName(String vname) {
-		this.portName = vname;
-	}
-	/**
-	 * @return the workflowRun
-	 */
-	public String getWorkflowRunId() {
-		return workflowRunId;
-	}
-	/**
-	 * @param workflowRun the workflowRun to set
-	 */
-	public void setWorkflowRunId(String workflowRun) {
-		this.workflowRunId = workflowRun;
-	}
-	/**
-	 * @return the value
-	 */
-	public String getValue() {
-		return value;
-	}
-	/**
-	 * @param value the value to set
-	 */
-	public void setValue(String value) {
-		this.value = value;
-	}
-	/**
-	 * @return the type
-	 */
-	public String getType() {
-		return type;
-	}
-	/**
-	 * @param type the type to set
-	 */
-	public void setType(String type) {
-		this.type = type;
-	}
-	/**
-	 * @return the iteration
-	 */
-	public String getIteration() {
-		return iteration;
-	}
-	/**
-	 * @param iteration the iteration to set
-	 */
-	public void setIteration(String iteration) {
-		this.iteration = iteration;
-	}
-	/**
-	 * @return the resolvedValue
-	 */
-	public String getResolvedValue() {
-		return resolvedValue;
-	}
-	/**
-	 * @param resolvedValue the resolvedValue to set
-	 */
-	public void setResolvedValue(String resolvedValue) {
-		this.resolvedValue = resolvedValue;
-	}
-
-
-	public void setPrintResolvedValue(boolean printResolvedValue) {
-		this.printResolvedValue = printResolvedValue;
-	}
-
-
-	/**
-	 * @return the isInput
-	 */
-	public boolean isInputPort() {
-		return isInput;
-	}
-
-
-	/**
-	 * @param isInput the isInput to set
-	 */
-	public void setIsInputPort(boolean isInput) {
-		this.isInput = isInput;
-	}
-
-
-	/**
-	 * @return the collIdRef
-	 */
-	public String getCollectionT2Reference() {
-		return collIdRef;
-	}
-
-
-	/**
-	 * @param collIdRef the collIdRef to set
-	 */
-	public void setCollectionT2Reference(String collIdRef) {
-		this.collIdRef = collIdRef;
-	}
-
-
-	/**
-	 * @return the isCollection
-	 */
-	public boolean isCollection() {
-		return isCollection;
-	}
-
-
-	/**
-	 * @param isCollection the isCollection to set
-	 */
-	public void setCollection(boolean isCollection) {
-		this.isCollection = isCollection;
-	}
-
-
-	/**
-	 * @return the parentCollIDRef
-	 */
-	public String getParentCollIDRef() {
-		return parentCollIDRef;
-	}
-
-
-	/**
-	 * @param parentCollIDRef the parentCollIDRef to set
-	 */
-	public void setParentCollIDRef(String parentCollIDRef) {
-		this.parentCollIDRef = parentCollIDRef;
-	}
-
-
-	/**
-	 * @return the workflowId
-	 */
-	public String getworkflowId() {
-		return workflowId;
-	}
-
-
-	/**
-	 * @param workflowId the workflowId to set
-	 */
-	public void setWorkflowId(String workflowId) {
-		this.workflowId = workflowId;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
deleted file mode 100644
index 58d7c3d..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/LineageSQLQuery.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-/**
- * encapsulates an SQL query along with directives on how to interpret the
- * results, i.e., which elements of the select clause are to be considered
- * relevant. For instance when the query includes a join with Collection, the
- * intent is that lineage should return the collection itself as opposed to any
- * of its elements.
- * 
- * @author paolo
- * 
- */
-public class LineageSQLQuery {
-	private String vbQuery = null;
-	private String collQuery = null;
-
-	/** =0 => use var values, >0 => use enclosing collection */
-	private int depth = 0;
-
-	/**
-	 * @return the depth
-	 */
-	public int getdepth() {
-		return depth;
-	}
-
-	/**
-	 * @param depth
-	 *            the depth to set
-	 */
-	public void setdepth(int depth) {
-		this.depth = depth;
-	}
-
-	/**
-	 * @return the collQuery
-	 */
-	public String getCollQuery() {
-		return collQuery;
-	}
-
-	/**
-	 * @param collQuery the collQuery to set
-	 */
-	public void setCollQuery(String collQuery) {
-		this.collQuery = collQuery;
-	}
-
-	/**
-	 * @return the vbQuery
-	 */
-	public String getVbQuery() {
-		return vbQuery;
-	}
-
-	/**
-	 * @param vbQuery the vbQuery to set
-	 */
-	public void setVbQuery(String vbQuery) {
-		this.vbQuery = vbQuery;
-	}	
-}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
deleted file mode 100644
index 621e351..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/Provenance.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester   
- * 
- *  Modifications to the initial code base are copyright of their
- *  respective authors, or their employers as appropriate.
- * 
- *  This program is free software; you can redistribute it and/or
- *  modify it under the terms of the GNU Lesser General Public License
- *  as published by the Free Software Foundation; either version 2.1 of
- *  the License, or (at your option) any later version.
- *    
- *  This program is distributed in the hope that it will be useful, but
- *  WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- *  Lesser General Public License for more details.
- *    
- *  You should have received a copy of the GNU Lesser General Public
- *  License along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import net.sf.taverna.t2.provenance.connector.AbstractProvenanceConnector;
-import net.sf.taverna.t2.provenance.item.ProvenanceItem;
-import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
-import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
-import net.sf.taverna.t2.workflowmodel.Dataflow;
-
-import org.apache.log4j.Logger;
-
-/**
- * Implemented by the database class that a {@link AbstractProvenanceConnector}
- * implementation uses for storage purposes
- * 
- * @author Paolo Missier
- * @author Ian Dunlop
- * 
- */
-//FIXME is this class really needed. Can't we just push the
-//acceptRawProvanceEvent up into the ProvenanceConnector?
-public class Provenance {
-	private static Logger logger = Logger.getLogger(Provenance.class);
-
-	protected ProvenanceQuery pq;
-	protected ProvenanceWriter pw;
-	protected EventProcessor ep;
-
-	private String saveEvents;
-	
-	private volatile boolean firstWorkflowStructure = true;
-
-	public boolean isFirstWorkflowStructure() {
-		return firstWorkflowStructure;
-	}
-
-	public void setFirstWorkflowStructure(boolean firstWorkflowStructure) {
-		this.firstWorkflowStructure = firstWorkflowStructure;
-	}
-
-	private List<String> workflowIDStack = Collections.synchronizedList(new ArrayList<String>());
-	
-	private Map<String, String> workflowIDMap = new ConcurrentHashMap<String, String>();
-
-	public Provenance() {	}
-
-	public Provenance(EventProcessor eventProcessor) {
-		this.ep = eventProcessor;
-		this.pq = ep.getPq();
-		this.pw = ep.getPw();		
-	}
-
-	public void clearDB() throws SQLException {
-		getPw().clearDBStatic();
-		getPw().clearDBDynamic();
-	}
-	
-
-	/**
-	 * @return the saveEvents
-	 */
-	public String getSaveEvents() {
-		return saveEvents;
-	}
-
-	/**
-	 * @param saveEvents
-	 *            the saveEvents to set
-	 */
-	public void setSaveEvents(String saveEvents) {
-		this.saveEvents = saveEvents;
-	}
-
-	// FIXME I think the provenance query and writer should both come from the
-	// EventProcessor
-	// seems silly setting the ep, pq and pw separately.
-	public void setPq(ProvenanceQuery pq) {
-		this.pq = pq;
-	}
-
-	public ProvenanceQuery getPq() {
-		return pq;
-	}
-
-	public void setPw(ProvenanceWriter pw) {
-		this.pw = pw;
-	}
-
-	public ProvenanceWriter getPw() {
-		return pw;
-	}
-
-	public void setEp(EventProcessor ep) {
-		this.ep = ep;
-	}
-
-	public EventProcessor getEp() {
-		return ep;
-	}
-
-	/**
-	 * maps each incoming event to an insert query into the provenance store
-	 * 
-	 * @param eventType
-	 * @param content
-	 * @throws SQLException
-	 * @throws IOException
-	 */
-	public void acceptRawProvenanceEvent(SharedVocabulary eventType,
-			ProvenanceItem provenanceItem) throws SQLException, IOException {
-		processEvent(provenanceItem, eventType);
-	}
-
-	/**
-	 * parse d and generate SQL insert calls into the provenance DB
-	 * 
-	 * @param d
-	 *            DOM for the event
-	 * @param eventType
-	 *            see {@link SharedVocabulary}
-	 * @throws SQLException
-	 * @throws IOException
-	 */
-	protected void processEvent(ProvenanceItem provenanceItem,
-			SharedVocabulary eventType) throws SQLException, IOException {
-		if (eventType.equals(SharedVocabulary.WORKFLOW_EVENT_TYPE)) {
-			// process the workflow structure
-			//workflowStartedMap.put()
-			WorkflowProvenanceItem workflowProvenanceItem = (WorkflowProvenanceItem) provenanceItem;
-			
-			getEp().getWfdp().workflowStarted.put(workflowProvenanceItem.getIdentifier(), workflowProvenanceItem.getInvocationStarted());
-			if (isFirstWorkflowStructure()) {
-				String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier();
-				String instanceId = provenanceItem.getIdentifier();
-				
-				workflowIDMap.put(instanceId, dataflowId);
-				setFirstWorkflowStructure(false);
-				String processWorkflowStructure = getEp().processWorkflowStructure(provenanceItem);
-				synchronized(workflowIDStack) {
-					workflowIDStack.add(0,processWorkflowStructure);
-				}
-				
-				getEp().propagateANL(provenanceItem.getIdentifier());
-			} else {
-				String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier();
-				String instanceId = provenanceItem.getIdentifier();
-				
-				workflowIDMap.put(instanceId, dataflowId);
-
-				Dataflow df = workflowProvenanceItem.getDataflow();
-				synchronized(workflowIDStack) {
-					workflowIDStack.add(0,df.getIdentifier());
-				}
-			}
-		} else if (provenanceItem.getEventType().equals(SharedVocabulary.END_WORKFLOW_EVENT_TYPE)) {
-//			String currentWorkflowID = workflowIDStack.get(0);
-//			workflowIDStack.remove(0);
-			String currentWorkflowID = provenanceItem.getParentId();
-			
-			getEp().processProcessEvent(provenanceItem, currentWorkflowID);
-			
-		} else {  // all other event types (iteration etc.)
-			logger.debug("processEvent of type "+provenanceItem.getEventType()+" for item of type "+provenanceItem.getClass().getName());
-			String currentWorkflowID = provenanceItem.getWorkflowId();
-//			String currentWorkflowID = workflowIDMap.get(provenanceItem.getParentId());
-
-			getEp().processProcessEvent(provenanceItem, currentWorkflowID);
-		
-//			getEp().processProcessEvent(provenanceItem, workflowIDStack.get(0));
-		}
-	}
-}