You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/30 15:47:17 UTC

[05/12] incubator-taverna-engine git commit: some provenance refactoring

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java
new file mode 100644
index 0000000..c77de0a
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java
@@ -0,0 +1,1547 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+import static java.util.Collections.synchronizedList;
+import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY;
+import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils.getDataItemAsXML;
+import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils.iterationToString;
+import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils.parentProcess;
+import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ACTIVITY_EVENT_TYPE;
+import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.END_WORKFLOW_EVENT_TYPE;
+import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.INVOCATION_STARTED_EVENT_TYPE;
+import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ITERATION_EVENT_TYPE;
+import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESSOR_EVENT_TYPE;
+import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESS_EVENT_TYPE;
+import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.WORKFLOW_DATA_EVENT_TYPE;
+
+import java.beans.ExceptionListener;
+import java.beans.XMLEncoder;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.sql.Blob;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.sql.rowset.serial.SerialBlob;
+
+import net.sf.taverna.t2.provenance.item.DataProvenanceItem;
+import net.sf.taverna.t2.provenance.item.DataflowRunComplete;
+import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
+import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem;
+import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
+import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
+import net.sf.taverna.t2.provenance.item.ProvenanceItem;
+import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
+import org.apache.taverna.provenance.lineageservice.utils.DataBinding;
+import org.apache.taverna.provenance.lineageservice.utils.DataLink;
+import org.apache.taverna.provenance.lineageservice.utils.NestedListNode;
+import org.apache.taverna.provenance.lineageservice.utils.Port;
+import org.apache.taverna.provenance.lineageservice.utils.PortBinding;
+import org.apache.taverna.provenance.lineageservice.utils.ProcessorBinding;
+import org.apache.taverna.provenance.lineageservice.utils.ProcessorEnactment;
+import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor;
+import org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils;
+import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
+import org.apache.taverna.reference.T2Reference;
+import org.apache.taverna.workflowmodel.Dataflow;
+import org.apache.taverna.workflowmodel.DataflowInputPort;
+import org.apache.taverna.workflowmodel.DataflowOutputPort;
+import org.apache.taverna.workflowmodel.Datalink;
+import org.apache.taverna.workflowmodel.InputPort;
+import org.apache.taverna.workflowmodel.MergeInputPort;
+import org.apache.taverna.workflowmodel.MergeOutputPort;
+import org.apache.taverna.workflowmodel.OutputPort;
+import org.apache.taverna.workflowmodel.Processor;
+import org.apache.taverna.workflowmodel.ProcessorInputPort;
+import org.apache.taverna.workflowmodel.ProcessorOutputPort;
+import org.apache.taverna.workflowmodel.processor.activity.Activity;
+import org.apache.taverna.workflowmodel.processor.activity.NestedDataflow;
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+//import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.Namespace;
+import org.jdom.input.SAXBuilder;
+import org.jdom.output.XMLOutputter;
+
+import org.apache.taverna.scufl2.api.io.WorkflowBundleIO;
+
+/**
+ * @author Paolo Missier
+ */
+public class EventProcessor {
+	/**
+	 * A map of UUIDs of the originating processor to the ProcBinding object
+	 * that contains its parameters
+	 */
+	private Map<String, ProcessorBinding> procBindingMap = new ConcurrentHashMap<>();
+
+	/** A map of child ids to their parents in the hierarchy of events:
+	 *  workflow -> process -> processor -> activity -> iteration
+	 */
+	private Map<String, String> parentChildMap= new ConcurrentHashMap<>();
+
+	private static Logger logger = Logger.getLogger(EventProcessor.class);
+
+	private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_";
+	private static final String INPUT_CONTAINER_PROCESSOR = "_INPUT_";
+
+	private volatile boolean workflowStructureDone = false; // used to inhibit processing of multiple workflow events -- we only need the first
+	private volatile String workflowRunId = null; // unique run ID. set when we see the first event of type "process"
+
+	String topLevelDataflowName = null;
+	String topLevelDataflowID   = null;
+
+	Map<String, String> wfNestingMap = new ConcurrentHashMap<>();
+
+	// all input bindings are accumulated here so they can be "backpatched" (see backpatching() )
+	List<PortBinding> allInputVarBindings = synchronizedList(new ArrayList<PortBinding>());
+
+	// dedicated class for processing WorkflowData events which carry workflow output info
+	private WorkflowDataProcessor  wfdp;
+	private ProvenanceWriter pw = null;
+	private ProvenanceQuery  pq = null;
+
+	private HashMap<String, Port> mapping;
+
+	private Map<String, ProcessorEnactment> processorEnactmentMap = new ConcurrentHashMap<>();
+
+	private Map<String, ProvenanceProcessor> processorMapById = new ConcurrentHashMap<>();
+
+	private WorkflowBundleIO io;
+
+	// Backpatching temporarily disabled
+	private static final boolean backpatching = false;
+
+	public EventProcessor(WorkflowBundleIO io) {
+		this.io = io;
+	}
+
+	/**
+	 * @param pw
+	 * @throws SQLException
+	 * @throws ClassNotFoundException
+	 * @throws IllegalAccessException
+	 * @throws InstantiationException
+	 *
+	 */
+	public EventProcessor(ProvenanceWriter pw, ProvenanceQuery pq,
+			WorkflowDataProcessor wfdp,WorkflowBundleIO io) throws InstantiationException,
+			IllegalAccessException, ClassNotFoundException, SQLException {
+		this.pw = pw;
+		this.pq = pq;
+		this.wfdp = wfdp;
+		this.io = io;
+
+		//logger.setLevel((Level) Level.INFO);
+	}
+
+	/**
+	 * this is the new version that makes use of the T2 deserializer
+	 * populate static portion of the DB<br/>
+	 * the static structure may already be in the DB -- this is detected as a duplicate top-level workflow ID.
+	 * In this case, we skip this processing altogether
+	 * @param content
+	 *            is a serialized dataflow (XML) -- this is parsed using the T2
+	 *            Deserializer
+	 * @return the workflowRunId for this workflow structure
+	 */
+	public String processWorkflowStructure(ProvenanceItem provenanceItem) {
+		/*
+		 * this flag is set to prevent processing of separate
+		 * workflowProvenanceItems that describe nested workflows. the
+		 * processing of all nested workflows is done as part of the very first
+		 * workflowProvenanceItem that we receive, which is self-consistent. so
+		 * we ignore all others
+		 */
+		if (workflowStructureDone)
+			return null;
+		WorkflowProvenanceItem wpi = (WorkflowProvenanceItem) provenanceItem;
+		setWorkflowRunId(wpi.getIdentifier());
+		workflowStructureDone = true;
+		return processWorkflowStructure(wpi.getDataflow());
+	}
+
+	public String processWorkflowStructure(Dataflow df) {
+		topLevelDataflowName = df.getLocalName();
+		topLevelDataflowID   = df.getIdentifier();
+
+		// check whether we already have this WF in the DB
+		List<String> workflowIds = null;
+		try {
+			workflowIds = pq.getAllworkflowIds();
+		} catch (SQLException e) {
+			logger.warn("Problem processing workflow structure", e);
+		}
+
+		if (workflowIds == null || workflowIds.contains(topLevelDataflowID)) {
+			// not already in the DB
+			logger.info("new workflow structure with ID " + topLevelDataflowID);
+			ProvenanceProcessor provProc = new ProvenanceProcessor();
+			provProc.setIdentifier(UUID.randomUUID().toString());
+			provProc.setProcessorName(topLevelDataflowName);
+			provProc.setFirstActivityClassName(DATAFLOW_ACTIVITY);
+			provProc.setWorkflowId(topLevelDataflowID);
+			provProc.setTopLevelProcessor(true);
+			// record the top level dataflow as a processor in the DB
+			try {
+				pw.addProcessor(provProc);
+				// pw.addProcessor(topLevelDataflowName, DATAFLOW_PROCESSOR_TYPE, topLevelDataflowID, true);  // true -> is top level
+			} catch (SQLException e) {
+				logger.warn("Can't add processor " + topLevelDataflowID, e);
+			}
+		}
+
+		return processDataflowStructure(df, topLevelDataflowID, df.getLocalName());  // null: no external name given to top level dataflow
+	}
+
+	private Blob serialize(Dataflow df) {
+		Element serializeDataflow = null;xmlSerializer.serializeDataflow(df);//FIXME
+		String dataflowString = null;
+		try {
+		    XMLOutputter outputter = new XMLOutputter();
+		    StringWriter stringWriter = new StringWriter();
+		    outputter.output(serializeDataflow, stringWriter);
+		    dataflowString = stringWriter.toString();
+		} catch (java.io.IOException e) {
+		    logger.error("Could not serialise dataflow", e);
+		    // FIXME Bad Exception handling!
+		}
+		return new SerialBlob(dataflowString.getBytes("UTF-8"));
+	}
+
+	/**
+	 * note: this method can be called as part of a recursion on sub-workflows
+	 * 
+	 * @param df
+	 * @param dataflowID
+	 *            the UUID for the entire dataflow (may be a sub-dataflow)
+	 * @param localName
+	 *            the external name of the dataflow. Null if this is top level,
+	 *            not null if a sub-dataflow
+	 * @return the workflowRunId for this workflow structure
+	 */
+	private String processDataflowStructure(Dataflow df, String dataflowID, String externalName) {
+		String localWorkflowRunID = getWorkflowRunId();
+
+		//dataflowDepth++;
+
+		try {
+			// check whether we already have this WF in the DB
+			boolean alreadyInDb;
+			try {
+				List<String> workflowIds = pq.getAllworkflowIds();
+				alreadyInDb = workflowIds != null && workflowIds.contains(dataflowID);
+			} catch (SQLException e) {
+				logger.warn("Problem processing dataflow structure for " + dataflowID, e);
+				alreadyInDb = false;
+			}
+
+			// add workflow ID -- this is NOT THE SAME AS the workflowRunId
+
+			/*
+			 * this could be a nested workflow -- in this case, override its
+			 * workflowRunId with that of its parent
+			 */
+			if (!alreadyInDb) {
+				String parentDataflow = wfNestingMap.get(dataflowID);
+				Blob blob = serialize(df);
+				if (parentDataflow == null) {
+					// this is a top level dataflow description
+					pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent
+
+				} else {
+					// we are processing a nested workflow structure
+					logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow);
+
+					pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent
+
+					// override workflowRunId to point to top level -- UNCOMMENTED PM 9/09  CHECK
+					localWorkflowRunID = pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId();
+				}
+			}
+			// Log the run itself
+			pw.addWorkflowRun(dataflowID, localWorkflowRunID);
+
+			// add processors along with their variables
+			List<Port> vars = new ArrayList<Port>();
+			for (Processor p : df.getProcessors()) {
+				String pName = p.getLocalName();
+
+				//CHECK get type of first activity and set this as the type of the processor itself
+				List<? extends Activity<?>> activities = p.getActivityList();
+
+				if (! alreadyInDb) {
+					ProvenanceProcessor provProc;
+					String pType = null;
+					if (activities != null && !activities.isEmpty())
+						pType = activities.get(0).getClass().getCanonicalName();
+					provProc = new ProvenanceProcessor();
+					provProc.setIdentifier(UUID.randomUUID().toString());
+					provProc.setProcessorName(pName);
+					provProc.setFirstActivityClassName(pType);
+					provProc.setWorkflowId(dataflowID);
+					provProc.setTopLevelProcessor(false);
+
+					pw.addProcessor(provProc);
+
+					//pw.addProcessor(pName, pType, dataflowID, false);  // false: not a top level processor
+
+					/*
+					 * add all input ports for this processor as input variables
+					 */
+					for (ProcessorInputPort ip : p.getInputPorts()) {
+						Port inputVar = new Port();
+						inputVar.setIdentifier(UUID.randomUUID().toString());
+						inputVar.setProcessorId(provProc.getIdentifier());
+						inputVar.setProcessorName(pName);
+						inputVar.setWorkflowId(dataflowID);
+						inputVar.setPortName(ip.getName());
+						inputVar.setDepth(ip.getDepth());
+						inputVar.setInputPort(true);
+					 	vars.add(inputVar);
+					}
+
+					/*
+					 * add all output ports for this processor as output
+					 * variables
+					 */
+					for (ProcessorOutputPort op : p.getOutputPorts()) {
+						Port outputVar = new Port();
+						outputVar.setIdentifier(UUID.randomUUID().toString());
+						outputVar.setProcessorName(pName);
+						outputVar.setProcessorId(provProc.getIdentifier());
+						outputVar.setWorkflowId(dataflowID);
+						outputVar.setPortName(op.getName());
+						outputVar.setDepth(op.getDepth());
+						outputVar.setInputPort(false);
+
+						vars.add(outputVar);
+					}
+				}
+
+				/*
+				 * check for nested structures: if the activity is
+				 * DataflowActivity then this processor is a nested workflow;
+				 * make an entry into wfNesting map with its ID and recurse on
+				 * the nested workflow
+				 */
+
+				if (activities != null)
+					for (Activity<?> a : activities) {
+						if (!(a instanceof NestedDataflow))
+							continue;
+
+						Dataflow nested = ((NestedDataflow) a)
+								.getNestedDataflow();
+						wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent
+
+						// RECURSIVE CALL
+						processDataflowStructure(nested,
+								nested.getIdentifier(), p.getLocalName());
+					}
+			} // end for each processor
+
+			// add inputs to entire dataflow
+			String pName = INPUT_CONTAINER_PROCESSOR;  // overridden -- see below
+
+			/*
+			 * check whether we are processing a nested workflow. in this case
+			 * the input vars are not assigned to the INPUT processor but to the
+			 * containing dataflow
+			 */
+			if (! alreadyInDb) {
+				if (externalName != null) // override the default if we are nested or someone external name is provided
+					pName = externalName;
+
+				for (DataflowInputPort ip : df.getInputPorts()) {
+					Port inputVar = new Port();
+					inputVar.setIdentifier(UUID.randomUUID().toString());
+					inputVar.setProcessorId(null); // meaning workflow port
+					inputVar.setProcessorName(pName);
+					inputVar.setWorkflowId(dataflowID);
+					inputVar.setPortName(ip.getName());
+					inputVar.setDepth(ip.getDepth());
+					inputVar.setInputPort(true);  // CHECK PM modified 11/08 -- input vars are actually outputs of input processors...
+
+					vars.add(inputVar);
+				}
+
+				// add outputs of entire dataflow
+				pName = OUTPUT_CONTAINER_PROCESSOR;  // overridden -- see below
+
+				/*
+				 * check whether we are processing a nested workflow. in this
+				 * case the output vars are not assigned to the OUTPUT processor
+				 * but to the containing dataflow
+				 */
+				if (externalName != null) // we are nested
+					pName = externalName;
+
+				for (DataflowOutputPort op : df.getOutputPorts()) {
+					Port outputVar = new Port();
+					outputVar.setIdentifier(UUID.randomUUID().toString());
+					outputVar.setProcessorId(null); // meaning workflow port
+					outputVar.setProcessorName(pName);
+					outputVar.setWorkflowId(dataflowID);
+					outputVar.setPortName(op.getName());
+					outputVar.setDepth(op.getDepth());
+					outputVar.setInputPort(false);  // CHECK PM modified 11/08 -- output vars are actually outputs of output processors...
+					vars.add(outputVar);
+				}
+
+				pw.addPorts(vars, dataflowID);
+				makePortMapping(vars);
+
+				/*
+				 * add datalink records using the dataflow links retrieving the
+				 * processor names requires navigating from links to source/sink
+				 * and from there to the processors
+				 */
+				for (Datalink l : df.getLinks()) {
+					// TODO cover the case of datalinks from an input and to an output to the entire dataflow
+
+					Port sourcePort = null;
+					Port destinationPort = null;
+
+					OutputPort source = l.getSource();
+					if (source instanceof ProcessorOutputPort) {
+						String sourcePname = ((ProcessorOutputPort) source)
+								.getProcessor().getLocalName();
+						sourcePort = lookupPort(sourcePname, source.getName(), false);
+					} else if (source instanceof MergeOutputPort) {
+						// TODO: Handle merge output ports
+					} else
+						// Assume it is internal port from DataflowInputPort
+						sourcePort = lookupPort(externalName, source.getName(), true);
+
+					InputPort sink = l.getSink();
+					if (sink instanceof ProcessorInputPort) {
+						String sinkPname = ((ProcessorInputPort) sink)
+								.getProcessor().getLocalName();
+						destinationPort = lookupPort(sinkPname, sink.getName(), true);
+					} else if (sink instanceof MergeInputPort) {
+						// TODO: Handle merge input ports
+					} else
+						// Assume it is internal port from DataflowOutputPort
+						destinationPort = lookupPort(externalName, sink.getName(), false);
+
+					if (sourcePort != null && destinationPort != null)
+						pw.addDataLink(sourcePort, destinationPort, dataflowID);
+					else
+						logger.info("Can't record datalink " + l);
+				}
+			}
+		} catch (Exception e) {
+			logger.error("Problem processing provenance for dataflow", e);
+		}
+
+		return dataflowID;
+	}
+
+	private void makePortMapping(List<Port> ports) {
+		mapping = new HashMap<>();
+		for (Port port: ports) {
+			String key = port.getProcessorName()
+					+ (port.isInputPort() ? "/i:" : "/o:") + port.getPortName();
+			mapping.put(key, port);
+		}
+	}
+
+	private Port lookupPort(String processorName, String portName, boolean isInputPort) {
+		String key = processorName + (isInputPort ? "/i:" : "/o:") + portName;
+		return mapping.get(key);
+	}
+
+	/**
+	 * processes an elementary process execution event from T2. Collects info
+	 * from events as they happen and sends them to the writer for processing
+	 * when the iteration event is received. Uses the map of procBindings to
+	 * process event id and the map of child ids to parent ids to ensure that
+	 * the correct proc binding is used
+	 * @param currentWorkflowID
+	 *
+	 * @param d
+	 * @param context
+	 */
+	public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) {
+		switch (provenanceItem.getEventType()) {
+		case PROCESS_EVENT_TYPE: {
+			String parentId = provenanceItem.getParentId();  // this is the workflowID
+			String identifier = provenanceItem.getIdentifier();  // use this as workflowRunId if this is the top-level process
+			
+			parentChildMap.put(identifier, parentId);
+			ProcessorBinding pb = new ProcessorBinding();
+			pb.setWorkflowRunId(getWorkflowRunId());
+			pb.setWorkflowId(currentWorkflowID);
+			procBindingMap.put(identifier, pb);
+			return;
+		}
+		case PROCESSOR_EVENT_TYPE: {
+			String identifier = provenanceItem.getIdentifier();
+			String parentId = provenanceItem.getParentId();
+			String processID = provenanceItem.getProcessId(); // this is the external process ID
+
+			// this has the weird form facade0:dataflowname:pname  need to extract pname from here
+			String[] processName = processID.split(":");
+			procBindingMap.get(parentId).setProcessorName(
+					processName[processName.length - 1]);
+			// 3rd component of composite name
+
+			parentChildMap.put(identifier, parentId);
+			return;
+		}
+		case ACTIVITY_EVENT_TYPE: {
+			String identifier = provenanceItem.getIdentifier();
+			String parentId = provenanceItem.getParentId();
+			procBindingMap.get(parentChildMap.get(parentId))
+					.setFirstActivityClassName(identifier);
+			parentChildMap.put(identifier, parentId);
+			return;
+		}
+		case ITERATION_EVENT_TYPE: {
+			IterationProvenanceItem iterationProvenanceItem = (IterationProvenanceItem)provenanceItem;
+			if (iterationProvenanceItem.getParentIterationItem() != null)
+				// Skipping pipelined outputs, we'll process the parent output later instead
+				return;
+
+			// traverse up to root to retrieve ProcBinding that was created when we saw the process event
+			String activityID = provenanceItem.getParentId();
+			String processorID = parentChildMap.get(activityID);
+			String processID = parentChildMap.get(processorID);
+			String iterationID = provenanceItem.getIdentifier();
+			parentChildMap.put(iterationID, activityID);
+
+			ProcessorEnactment processorEnactment = processorEnactmentMap
+					.get(iterationID);
+			if (processorEnactment == null)
+				processorEnactment = new ProcessorEnactment();
+
+			ProcessorBinding procBinding = procBindingMap.get(processID);
+
+			String itVector = extractIterationVector(iterationToString(iterationProvenanceItem
+					.getIteration()));
+			procBinding.setIterationVector(itVector);
+
+			processorEnactment.setEnactmentStarted(iterationProvenanceItem
+					.getEnactmentStarted());
+			processorEnactment.setEnactmentEnded(iterationProvenanceItem
+					.getEnactmentEnded());
+			processorEnactment.setWorkflowRunId(workflowRunId);
+			processorEnactment.setIteration(itVector);
+
+			String processId = iterationProvenanceItem.getProcessId();
+			String parentProcessId = parentProcess(processId, 3);
+			if (parentProcessId != null) {
+				ProcessorEnactment parentProcEnact = getWfdp().invocationProcessToProcessEnactment
+						.get(parentProcessId);
+				if (parentProcEnact != null)
+					processorEnactment
+							.setParentProcessorEnactmentId(parentProcEnact
+									.getProcessEnactmentId());
+			}
+			processorEnactment.setProcessEnactmentId(iterationProvenanceItem
+					.getIdentifier());
+			processorEnactment.setProcessIdentifier(processId);
+
+			ProvenanceProcessor provenanceProcessor;
+			if (processorEnactment.getProcessorId() == null) {
+				provenanceProcessor = pq.getProvenanceProcessorByName(
+						currentWorkflowID, procBinding.getProcessorName());
+				if (provenanceProcessor == null)
+					// already logged warning
+					return;
+				processorMapById.put(provenanceProcessor.getIdentifier(),
+						provenanceProcessor);
+				processorEnactment.setProcessorId(provenanceProcessor
+						.getIdentifier());
+			} else {
+				provenanceProcessor = processorMapById.get(processorEnactment
+						.getProcessorId());
+				if (provenanceProcessor == null) {
+					provenanceProcessor = pq
+							.getProvenanceProcessorById(processorEnactment
+									.getProcessorId());
+					processorMapById.put(provenanceProcessor.getIdentifier(),
+							provenanceProcessor);
+				}
+			}
+
+			InputDataProvenanceItem inputDataEl = iterationProvenanceItem.getInputDataItem();
+			OutputDataProvenanceItem outputDataEl = iterationProvenanceItem.getOutputDataItem();
+
+			if (inputDataEl != null
+					&& processorEnactment.getInitialInputsDataBindingId() == null) {
+				processorEnactment
+						.setInitialInputsDataBindingId(processDataBindings(
+								inputDataEl, provenanceProcessor));
+				processInput(inputDataEl, procBinding, currentWorkflowID);
+			}
+
+			if (outputDataEl != null
+					&& processorEnactment.getFinalOutputsDataBindingId() == null) {
+				processorEnactment
+						.setFinalOutputsDataBindingId(processDataBindings(
+								outputDataEl, provenanceProcessor));
+				processOutput(outputDataEl, procBinding, currentWorkflowID);
+			}
+
+			try {
+				if (processorEnactmentMap.containsKey(iterationID)) {
+					getPw().updateProcessorEnactment(processorEnactment);
+				} else {
+					getPw().addProcessorEnactment(processorEnactment);
+					processorEnactmentMap.put(iterationID, processorEnactment);
+				}
+			} catch (SQLException e) {
+				logger.warn("Could not store processor enactment", e);
+			}
+			return;
+		}
+		case END_WORKFLOW_EVENT_TYPE: {
+			DataflowRunComplete completeEvent = (DataflowRunComplete) provenanceItem;
+			// use this event to do housekeeping on the input/output varbindings
+
+			// process the input and output values accumulated by WorkflowDataProcessor
+			getWfdp().processTrees(completeEvent, getWorkflowRunId());
+
+			reconcileLocalOutputs(provenanceItem.getWorkflowId());
+
+			if (! provenanceItem.getProcessId().contains(":")) {
+				// Top-level workflow finished
+				// No longer needed, done by processTrees()
+//				patchTopLevelnputs();
+
+				workflowStructureDone = false; // CHECK reset for next run...
+//				reconcileTopLevelOutputs(); // Done by reconcileLocalOutputs
+				getPw().closeCurrentModel();  // only real impl is for RDF
+			}
+			return;
+		}
+		case WORKFLOW_DATA_EVENT_TYPE: {
+			// give this event to a WorkflowDataProcessor object for pre-processing
+			//			try {
+			// TODO may generate an exception when the data is an error CHECK
+			getWfdp().addWorkflowDataItem(provenanceItem);
+			//			} catch (NumberFormatException e) {
+			//			logger.error(e);
+			//			}
+			//			logger.info("Received workflow data - not processing");
+			//FIXME not sure  - needs to be stored somehow
+			return;
+		}
+		case INVOCATION_STARTED_EVENT_TYPE: {
+			InvocationStartedProvenanceItem startedItem = (InvocationStartedProvenanceItem) provenanceItem;
+			ProcessorEnactment processorEnactment = processorEnactmentMap
+					.get(startedItem.getParentId());
+			if (processorEnactment == null) {
+				logger.error("Could not find ProcessorEnactment for invocation "
+						+ startedItem);
+				return;
+			}
+			getWfdp().invocationProcessToProcessEnactment.put(
+					startedItem.getInvocationProcessId(), processorEnactment);
+			return;
+		}
+		case ERROR_EVENT_TYPE:
+			//TODO process the error
+			return;
+		default:
+			// TODO broken, should we throw something here?
+			return;
+		}
+	}
+
+	private String processDataBindings(
+			DataProvenanceItem provenanceItem, ProvenanceProcessor provenanceProcessor) {
+		// TODO: Cache known provenaneItems and avoid registering again
+		String dataBindingId = UUID.randomUUID().toString();
+		boolean isInput = provenanceItem instanceof InputDataProvenanceItem;
+
+		for (Entry<String, T2Reference> entry : provenanceItem.getDataMap().entrySet()) {
+			DataBinding dataBinding = new DataBinding();
+			dataBinding.setDataBindingId(dataBindingId);
+			Port port = findPort(provenanceProcessor, entry.getKey(), isInput); // findPort
+			if (port == null) {
+				logger.warn("Could not find port for " + entry.getKey());
+				continue;
+			}
+			dataBinding.setPort(port);
+			dataBinding.setT2Reference(entry.getValue().toUri().toASCIIString());
+			dataBinding.setWorkflowRunId(workflowRunId);
+			try {
+				getPw().addDataBinding(dataBinding);
+			} catch (SQLException e) {
+				logger.warn("Could not register data binding for " + port, e);
+			}
+		}
+		return dataBindingId;
+	}
+
+	private Port findPort(ProvenanceProcessor provenanceProcessor,
+			String portName, boolean isInput) {
+		// TODO: Query pr dataflow and cache
+		Map<String, String> queryConstraints = new HashMap<>();
+		queryConstraints.put("V.workflowId",
+				provenanceProcessor.getWorkflowId());
+		String processorName = provenanceProcessor.getProcessorName();
+		queryConstraints.put("processorName", processorName);
+		queryConstraints.put("portName", portName);
+		queryConstraints.put("isInputPort", isInput ? "1" : "0");
+		try {
+			List<Port> vars = pq.getPorts(queryConstraints);
+			if (vars.isEmpty()) {
+				logger.warn("Can't find port " + portName + " in "
+						+ processorName);
+			} else if (vars.size() > 1) {
+				logger.warn("Multiple matches for port " + portName + " in "
+						+ processorName + ", got:" + vars);
+			} else
+				return vars.get(0);
+		} catch (SQLException e) {
+			logger.error(
+					"Problem getting ports for processor: " + processorName
+							+ " worflow: "
+							+ provenanceProcessor.getWorkflowId(), e);
+		}
+		return null;
+	}
+
+
+	/**
+	 * fills in the VBs for the global inputs -- this removes the need for explicit events
+	 * that account for these value bindings...
+	 */
+	public void patchTopLevelnputs() {
+
+		// for each input I to topLevelDataflow:
+		// pick first outgoing datalink with sink P:X
+		// copy value X to I -- this can be a collection, so copy everything
+
+		// get all global input vars
+
+		//		logger.info("\n\n BACKPATCHING GLOBAL INPUTS with dataflowDepth = "+dataflowDepth+"*******\n");
+
+		List<Port> inputs=null;
+		try {
+			inputs = getPq().getInputPorts(topLevelDataflowName, topLevelDataflowID);
+
+			for (Port input:inputs)  {
+
+				//				logger.info("global input: "+input.getVName());
+
+				Map<String,String> queryConstraints = new HashMap<String,String>();
+
+//				queryConstraints.put("sourcePortName", input.getVName());
+//				queryConstraints.put("sourceProcessorName", input.getPName());
+				queryConstraints.put("sourcePortId", input.getIdentifier());
+				queryConstraints.put("workflowId", input.getWorkflowId());
+				List<DataLink> outgoingDataLinks = getPq().getDataLinks(queryConstraints);
+
+				// any datalink will do, use the first
+				String targetPname = outgoingDataLinks.get(0).getDestinationProcessorName();
+				String targetVname = outgoingDataLinks.get(0).getDestinationPortName();
+
+//				logger.info("copying values from ["+targetPname+":"+targetVname+"] for instance ID: ["+workflowRunId+"]");
+
+				queryConstraints.clear();
+				queryConstraints.put("V.portName", targetVname);
+				queryConstraints.put("V.processorName", targetPname);
+				queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
+				queryConstraints.put("V.workflowId", topLevelDataflowID);
+
+				for (PortBinding vb : getPq().getPortBindings(queryConstraints)) {
+					PortBinding inputPortBinding = new PortBinding(vb);
+
+					// insert PortBinding back into VB with the global input portName
+					inputPortBinding.setProcessorName(input.getProcessorName());
+					inputPortBinding.setPortName(input.getPortName());
+					try {
+						getPw().addPortBinding(inputPortBinding);
+					} catch (SQLException ex) {
+						logger.info("Already logged port binding", ex);
+					}
+				}
+			}
+		} catch (SQLException e) {
+			logger.warn("Patch top level inputs problem for provenance", e);
+		} catch (IndexOutOfBoundsException e) {
+			logger.error("Could not patch top level", e);
+		}
+	}
+
+	public void reconcileTopLevelOutputs() {
+		reconcileLocalOutputs(topLevelDataflowID);
+	}
+
+	// PM added 23/4/09
+	/**
+	 * reconcile the top level outputs with the results from its immediate precedessors in the graph.<br/>
+	 * various cases have to be considered: predecessors may include records that are not in the output,
+	 * while the output may include nested list structures that are not in the precedessors. This method accounts
+	 * for a 2-way reconciliation that considers all possible cases.<br/>
+	 * at the end, outputs and their predecessors contain the same data.<p/>
+	 * NOTE: if we assume that data values (URIs) are <em>always</em> unique then this is greatly simplified by just
+	 * comparing two sets of value records by their URIs and reconciling them. But this is not the way it is done here
+	 */
+	public void reconcileLocalOutputs(String dataflowID) {
+		/*
+	for each output O
+
+	for each variable V in predecessors(O)
+
+	fetch all VB records for O into list OValues
+	fetch all VB records for V  into list Yalues
+
+	compare OValues and VValues:
+	it SHOULD be the case that OValues is a subset of YValues. Under this assumption:
+
+	for each vb in YValues:
+	- if there is a matching o in OValues then (vb may be missing collection information)
+	    copy o to vb
+	  else
+	    if vb has no collection info && there is a matching tree node tn  in OTree (use iteration index for the match) then
+	       set vb to be in collection tb
+	       copy vb to o
+
+     finally copy all Collection records for O in OTree -- catch duplicate errors
+		 */
+
+		Map<String, String> queryConstraints = new HashMap<>();
+
+		try {
+			// for each output O
+			for (Port output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID))  {
+				// collect all VBs for O
+//				String oPName = output.getPName();
+//				String oVName = output.getVName();
+//				queryConstraints.put("V.portName", oVName);
+//				queryConstraints.put("V.processorName", oPName);
+//				queryConstraints.put("VB.workflowRunId", workflowRunId);
+//				queryConstraints.put("V.workflowId", topLevelDataflowID);
+
+//				List<PortBinding> OValues = pq.getPortBindings(queryConstraints);
+
+				// find all records for the immediate precedessor Y of O
+				queryConstraints.clear();
+//				queryConstraints.put("destinationPortName", output.getVName());
+//				queryConstraints.put("destinationProcessorName", output.getPName());
+				queryConstraints.put("destinationPortId", output.getIdentifier());
+				queryConstraints.put("workflowId", output.getWorkflowId());
+				List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
+
+				// there can be only one -- but check that there is one!
+				if (incomingDataLinks.isEmpty())
+					continue;
+
+				String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
+				String sourceVname = incomingDataLinks.get(0).getSourcePortName();
+
+				queryConstraints.clear();
+				queryConstraints.put("V.portName", sourceVname);
+				queryConstraints.put("V.processorName", sourcePname);
+				queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
+				queryConstraints.put("V.workflowId", topLevelDataflowID);
+
+				List<PortBinding> YValues = pq.getPortBindings(queryConstraints);
+
+				// for each YValue look for a match in OValues
+				// (assume the YValues values are a superset of OValues)!)
+
+				for (PortBinding yValue:YValues) {
+					// look for a matching record in PortBinding for output O
+					queryConstraints.clear();
+					queryConstraints.put("V.portName", output.getPortName());
+					queryConstraints.put("V.processorName", output.getProcessorName());
+					queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
+					queryConstraints.put("V.workflowid", topLevelDataflowID);
+					queryConstraints.put("VB.iteration", yValue.getIteration());
+					if (yValue.getCollIDRef()!= null) {
+						queryConstraints.put("VB.collIDRef", yValue.getCollIDRef());
+						queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl()));
+					}
+					List<PortBinding> matchingOValues = pq.getPortBindings(queryConstraints);
+
+					// result at most size 1
+					if (!matchingOValues.isEmpty()) {
+						PortBinding oValue = matchingOValues.get(0);
+
+						// copy collection info from oValue to yValue
+						yValue.setCollIDRef(oValue.getCollIDRef());
+						yValue.setPositionInColl(oValue.getPositionInColl());
+
+						pw.updatePortBinding(yValue);
+					} else {
+						// copy the yValue to O
+						// insert PortBinding back into VB with the global output portName
+						yValue.setProcessorName(output.getProcessorName());
+						yValue.setPortName(output.getPortName());
+						pw.addPortBinding(yValue);
+					}
+
+				} // for each yValue in YValues
+
+				// copy all Collection records for O to Y
+
+				// get all collections refs for O
+				queryConstraints.clear();
+				queryConstraints.put("workflowRunId", getWorkflowRunId());
+				queryConstraints.put("processorNameRef", output.getProcessorName());
+				queryConstraints.put("portName", output.getPortName());
+
+				List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints);
+
+				// insert back as collection refs for Y -- catch duplicates
+				for (NestedListNode nln:oCollections) {
+					nln.setProcessorName(sourcePname);
+					nln.setProcessorName(sourceVname);
+
+					getPw().replaceCollectionRecord(nln, sourcePname, sourceVname);
+				}
+
+			} // for each output var
+
+		} catch (SQLException e) {
+			logger.warn("Problem reconciling top level outputs", e);
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	private void processOutput(OutputDataProvenanceItem provenanceItem,
+			ProcessorBinding procBinding, String currentWorkflowID) {
+		Element dataItemAsXML = getDataItemAsXML(provenanceItem);
+		List<Element> outputPorts = dataItemAsXML.getChildren("port");
+		for (Element outputport : outputPorts) {
+			String portName = outputport.getAttributeValue("name");
+
+			// value type may vary
+			List<Element> valueElements = outputport.getChildren();
+			if (valueElements != null && !valueElements.isEmpty()) {
+				Element valueEl = valueElements.get(0); // only really 1 child
+
+				processPortBinding(valueEl, procBinding.getProcessorName(),
+						portName, procBinding.getIterationVector(),
+						getWorkflowRunId(), currentWorkflowID);
+			}
+		}
+	}
+
+	/**
+	 * this method reconciles values in varBindings across an datalink: Firstly,
+	 * if vb's value is within a collection, _and_ it is copied from a value
+	 * generated during a previous iteration, then this method propagates the
+	 * list reference to that iteration value, which wouldn't have it.
+	 * Conversely, if vb is going to be input to an iteration, then it's lost
+	 * its containing list node, and we put it back in by looking at the
+	 * corresponding predecessor
+	 * 
+	 * @param vb
+	 * @throws SQLException
+	 */
+	private void backpatchIterationResults(List<PortBinding> newBindings) throws SQLException {
+		logger.debug("backpatchIterationResults: start");
+		for (PortBinding vb : newBindings) {
+			logger.debug("backpatchIterationResults: processing vb "
+					+ vb.getProcessorName() + "/" + vb.getPortName() + "="
+					+ vb.getValue());
+
+			if (vb.getCollIDRef()!= null) // this is a member of a collection
+				logger.debug("...which is inside a collection ");
+
+			// look for its antecedent
+			Map<String,String> queryConstraints = new HashMap<>();
+			queryConstraints.put("destinationPortName", vb.getPortName());
+			queryConstraints.put("destinationProcessorName", vb.getProcessorName());
+			queryConstraints.put("workflowId", pq.getWorkflowIdsForRun(vb.getWorkflowRunId()).get(0));  // CHECK picking first element in list...
+			List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
+
+			// there can be only one -- but check that there is one!
+			if (incomingDataLinks.isEmpty())
+				return;
+
+			String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
+			String sourceVname = incomingDataLinks.get(0).getSourcePortName();
+
+			logger.debug("antecedent: "+sourcePname+":"+sourceVname);
+
+			// get the varbindings for this port and select the one with the same iteration vector as its successor
+			queryConstraints.clear();
+			queryConstraints.put("VB.portName", sourceVname);
+			queryConstraints.put("V.processorName", sourcePname);
+			queryConstraints.put("VB.value", vb.getValue());
+			queryConstraints.put("VB.workflowRunId", vb.getWorkflowRunId());
+
+			// reconcile
+			for (PortBinding b : pq.getPortBindings(queryConstraints)) {
+				logger.debug("backpatching " + sourceVname + " " + sourcePname);
+
+				if (vb.getCollIDRef() != null && b.getCollIDRef() == null) {
+					logger.debug("successor " + vb.getPortName()
+							+ " is in collection " + vb.getCollIDRef()
+							+ " but pred " + b.getPortName() + " is not");
+					logger.debug("putting " + b.getPortName()
+							+ " in collection " + vb.getCollIDRef()
+							+ " at pos " + vb.getPositionInColl());
+					b.setCollIDRef(vb.getCollIDRef());
+					b.setPositionInColl(vb.getPositionInColl());
+					getPw().updatePortBinding(b);
+
+				} else if (vb.getCollIDRef() == null && b.getCollIDRef() != null) {
+					logger.debug("successor " + vb.getPortName()
+							+ " is NOT in collection but pred "
+							+ b.getPortName() + " IS");
+					logger.debug("putting " + vb.getPortName()
+							+ " in collection " + b.getCollIDRef() + " at pos "
+							+ b.getPositionInColl());
+					vb.setCollIDRef(b.getCollIDRef());
+					vb.setPositionInColl(b.getPositionInColl());
+					getPw().updatePortBinding(vb);
+				}
+			}
+		}
+	}
+
+
+	/**
+	 * create one new PortBinding record for each input port binding
+	 * @param currentWorkflowID
+	 */
+	@SuppressWarnings("unchecked")
+	private void processInput(InputDataProvenanceItem provenanceItem,
+			ProcessorBinding procBinding, String currentWorkflowID) {
+		Element dataItemAsXML = getDataItemAsXML(provenanceItem);
+		int order = 0;
+		for (Element inputport : (List<Element>) dataItemAsXML.getChildren("port")) {
+			String portName = inputport.getAttributeValue("name");
+
+			try {
+				// add process order sequence to Port for this portName
+				Map<String, String> queryConstraints = new HashMap<>();
+				queryConstraints.put("V.workflowId", currentWorkflowID);
+				queryConstraints.put("processorName", procBinding.getProcessorName());
+				queryConstraints.put("portName", portName);
+				queryConstraints.put("isInputPort", "1");
+
+				Port v = getPq().getPorts(queryConstraints).get(0);
+				v.setIterationStrategyOrder(order++);
+				getPw().updatePort(v);
+			} catch (IndexOutOfBoundsException e) {
+				logger.error("Could not process input " + portName, e);
+			} catch (SQLException e1) {
+				logger.error("Could not process input " + portName, e1);
+			}
+
+			// value type may vary
+			List<Element> valueElements = inputport.getChildren(); // hopefully
+			// in the right order...
+			if (valueElements != null && valueElements.size() > 0) {
+				Element valueEl = valueElements.get(0); // expect only 1 child
+				//				processVarBinding(valueEl, processor, portName, iterationVector,
+				//				dataflow);
+
+				List<PortBinding> newBindings = processPortBinding(valueEl,
+						procBinding.getProcessorName(), portName,
+						procBinding.getIterationVector(), getWorkflowRunId(),
+						currentWorkflowID);
+				// this is a list whenever valueEl is of type list: in this case processVarBinding recursively
+				// processes all values within the collection, and generates one PortBinding record for each of them
+
+				allInputVarBindings.addAll(newBindings);
+
+				//				// if the new binding involves list values, then check to see if they need to be propagated back to
+//				// results of iterations
+
+				// Backpatching disabled as it is very inefficient and not needed
+				// for current Taverna usage
+
+				try {
+					if (backpatching)
+						backpatchIterationResults(newBindings);
+				} catch (SQLException e) {
+					logger.warn("Problem with back patching iteration results", e);
+				}
+			} else {
+				if (valueElements != null)
+					logger.debug("port name " + portName + "  "
+							+ valueElements.size());
+				else
+					logger.debug("valueElements is null for port name "
+							+ portName);
+			}
+		}
+	}
+
+	/**
+	 * capture the default case where the value is not a list
+	 *
+	 * @param valueEl
+	 * @param processorId
+	 * @param portName
+	 * @param iterationId
+	 * @param workflowRunId
+	 * @param currentWorkflowID
+	 */
+	private List<PortBinding> processPortBinding(Element valueEl,
+			String processorId, String portName, String iterationId,
+			String workflowRunId, String currentWorkflowID) {
+		// uses the defaults:
+		// collIdRef = null
+		// parentcollectionRef = null
+		// positionInCollection = 1
+		return processPortBinding(valueEl, processorId, portName, null, 1, null,
+				iterationId, workflowRunId, null, currentWorkflowID);
+	}
+
+	/**
+	 * general case where value can be a list
+	 * @param valueEl
+	 * @param processorId
+	 * @param portName
+	 * @param collIdRef
+	 * @param positionInCollection
+	 * @param parentCollectionRef
+	 * @param iterationId
+	 * @param workflowRunId
+	 * @param currentWorkflowID
+	 */
+	@SuppressWarnings("unchecked")
+	private List<PortBinding> processPortBinding(Element valueEl,
+			String processorId, String portName, String collIdRef,
+			int positionInCollection, String parentCollectionRef,
+			String iterationId, String workflowRunId, String itVector,
+			String currentWorkflowID) {
+		List<PortBinding> newBindings = new ArrayList<>();
+
+		String valueType = valueEl.getName();
+		//		logger.info("value element for " + processorId + ": "
+		//		+ valueType);
+
+		String iterationVector = (itVector == null ? extractIterationVector(iterationId)
+				: itVector);
+
+		PortBinding vb = new PortBinding();
+
+		vb.setWorkflowId(currentWorkflowID);
+		vb.setWorkflowRunId(workflowRunId);
+		vb.setProcessorName(processorId);
+		vb.setValueType(valueType);
+		vb.setPortName(portName);
+		vb.setCollIDRef(collIdRef);
+		vb.setPositionInColl(positionInCollection);
+
+		newBindings.add(vb);
+
+		if (valueType.equals("literal")) {
+			try {
+				vb.setIteration(iterationVector);
+				vb.setValue(valueEl.getAttributeValue("id"));
+				logger.debug("new input VB with workflowId="+currentWorkflowID+" processorId="+processorId+
+						" valueType="+valueType+" portName="+portName+" collIdRef="+collIdRef+
+						" position="+positionInCollection+" itvector="+iterationVector+
+						" value="+vb.getValue());
+				getPw().addPortBinding(vb);
+			} catch (SQLException e) {
+				logger.warn("Process Port Binding problem with provenance", e);
+			}
+
+		} else if (valueType.equals("referenceSet")) {
+			vb.setIteration(iterationVector);
+			vb.setValue(valueEl.getAttributeValue("id"));
+			vb.setReference(valueEl.getChildText("reference"));
+
+			logger.debug("new input VB with workflowId=" + currentWorkflowID
+					+ " processorId=" + processorId + " valueType=" + valueType
+					+ " portName=" + portName + " collIdRef=" + collIdRef
+					+ " position=" + positionInCollection + " itvector="
+					+ iterationVector + " value=" + vb.getValue());
+
+			try {
+				getPw().addPortBinding(vb);
+			} catch (SQLException e) {
+				logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e);
+				// try to update the existing record instead using the current collection info
+
+				getPw().updatePortBinding(vb);
+			}
+
+		} else if (valueType.equals("list")) {
+			logger.debug("input of type list");
+
+			// add entries to the Collection and to the PortBinding tables
+			// list id --> Collection.collId
+
+			String collId = valueEl.getAttributeValue("id");
+			try {
+				parentCollectionRef = getPw().addCollection(processorId, collId,
+						parentCollectionRef, iterationVector, portName,
+						workflowRunId);
+
+				// iterate over each list element
+				List<Element> listElements = valueEl.getChildren();
+
+				positionInCollection = 1;  // also use this as a suffix to extend the iteration vector
+
+				// extend iteration vector to account for additional levels within the list
+
+				String originalIterationVector = iterationVector;
+
+				// children can be any base type, including list itself -- so
+				// use recursion
+				for (Element el : listElements) {
+					if (originalIterationVector.length() > 2) // vector is not empty
+						iterationVector = originalIterationVector.substring(0,
+								originalIterationVector.length()-1) + ","+
+								Integer.toString(positionInCollection-1) + "]";
+					else
+						iterationVector = "["+ (positionInCollection-1) + "]";
+
+					newBindings.addAll(processPortBinding(el, processorId,
+							portName, collId, positionInCollection,
+							parentCollectionRef, iterationId, workflowRunId,
+							iterationVector, currentWorkflowID));
+
+					positionInCollection++;
+				}
+			} catch (SQLException e) {
+				logger.warn("Problem processing var binding", e);
+			}
+		} else if (valueType.equals("error")) {
+			vb.setIteration(iterationVector);
+			vb.setValue(valueEl.getAttributeValue("id"));
+			vb.setReference(valueEl.getChildText("reference"));
+			try {
+				getPw().addPortBinding(vb);
+			} catch (SQLException e) {
+				logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e);
+				// try to update the existing record instead using the current collection info
+
+				getPw().updatePortBinding(vb);
+			}
+		} else {
+			logger.warn("unrecognized value type element for "
+					+ processorId + ": " + valueType);
+		}
+
+		return newBindings;
+	}
+
+
+	/**
+	 * OBSOLETE: returns the iteration vector x,y,z,... from [x,y,z,...]
+	 * <p/>
+	 * now returns the vector itself -- this is still experimental
+	 *
+	 * @param iteration
+	 * @return
+	 */
+	@Deprecated
+	String extractIterationVector(String iteration) {
+		return iteration;
+	}
+
+	/**
+	 * silly class to hold pairs of strings. any better way??
+	 * @author paolo
+	 *
+	 */
+	class Pair {
+		String v1, v2;
+
+		public Pair(String current, String workflowId) {
+			v1=current; v2=workflowId;
+		}
+
+		/**
+		 * @return the v1
+		 */
+		public String getV1() {
+			return v1;
+		}
+
+		/**
+		 * @param v1 the v1 to set
+		 */
+		public void setV1(String v1) {
+			this.v1 = v1;
+		}
+
+		/**
+		 * @return the v2
+		 */
+		public String getV2() {
+			return v2;
+		}
+
+		/**
+		 * @param v2 the v2 to set
+		 */
+		public void setV2(String v2) {
+			this.v2 = v2;
+		}
+	}
+
+	@SuppressWarnings("deprecation")
+	public List<Pair> toposort(String dataflowName, String workflowRunId) throws SQLException  {
+
+//		String workflowId = pq.getworkflowIdForDataflow(dataflowName, workflowRunId);
+		String workflowId = pq.getWorkflowIdForExternalName(dataflowName);
+
+		// fetch processors along with the count of their predecessors
+		Map<String, Integer> predecessorsCount = getPq().getPredecessorsCount(workflowRunId);
+		Map<String, List<String>> successorsOf = new HashMap<String, List<String>>();
+//		List<String> procList = pq.getContainedProcessors(dataflowName, workflowRunId);
+		List<String> procList = pq.getContainedProcessors(dataflowName);
+
+		for (String s:procList) {
+			List<String> successors = getPq().getSuccProcessors(s, workflowId, workflowRunId);
+			successorsOf.put(s, successors);
+		}
+
+		List<Pair>  sorted = tsort(procList, dataflowName, predecessorsCount, successorsOf, workflowId, workflowRunId);
+
+		for (int i=0; i< sorted.size(); i++) {
+			String procName = sorted.get(i).getV1();
+
+			if (pq.isDataflow(procName) && !procName.equals(dataflowName)) {  // handle weirdness: a dataflow is contained within itself..
+				// recurse on procName
+				List<Pair> sortedSublist = toposort(procName, workflowRunId);
+
+				// replace procName with sortedSublist in sorted
+				sorted.remove(i);
+				sorted.addAll(i, sortedSublist);
+			}
+		}
+		return sorted;
+	}
+
+
+
+	/**
+	 * @param procList
+	 * @param predecessorsCount
+	 * @param successorsOf
+	 * @param workflowRunId
+	 * @return
+	 * @throws SQLException
+	 */
+	public List<Pair> tsort(List<String> procList, String dataflowName,
+			Map<String, Integer> predecessorsCount,
+			Map<String, List<String>> successorsOf, String workflowId,
+			String workflowRunId) throws SQLException {
+		List<Pair> l = new ArrayList<>();		// holds sorted elements
+		List<String> q = new ArrayList<>(); 	// temp queue
+
+		// init queue with procList processors that have no predecessors
+		for (String proc:procList)
+			if (predecessorsCount.get(proc) == null || predecessorsCount.get(proc) == 0 &&
+					!proc.equals(dataflowName))
+				q.add(proc);
+
+		while (!q.isEmpty()) {
+			String current = q.remove(0);
+			l.add(new Pair(current, workflowId));
+
+			List<String> successors = successorsOf.get(current);
+
+			if (successors == null)
+				continue;
+
+			// reduce the number of predecessors to each of the successors by one
+			// NB we must traverse an additional datalink through a nested workflow input if the successor is a dataflow!!
+			for (String succ : successors) {
+				// decrease edge count for each successor processor
+				predecessorsCount.put(succ, predecessorsCount.get(succ) - 1);
+
+				if (predecessorsCount.get(succ) == 0 && !succ.equals(dataflowName))
+					q.add(succ);
+			}
+		} // end loop on q
+		return l;
+	}
+
+	@SuppressWarnings("deprecation")
+	public void propagateANL(String workflowRunId) throws SQLException {
+		String top = pq.getTopLevelDataflowName(workflowRunId);
+
+		// //////////////////////
+		// PHASE I: toposort the processors in the whole graph
+		// //////////////////////
+		List<Pair> sorted = toposort(top, workflowRunId);
+
+		List<String> sortedProcessors = new ArrayList<>();
+
+		for (Pair p : sorted)
+			sortedProcessors.add(p.getV1());
+
+		logger.debug("final sorted list of processors");
+		for (Pair p : sorted)
+			logger.debug(p.getV1() + " in workflowId " + p.getV2());
+
+		// //////////////////////
+		// PHASE II: traverse and set anl on each port
+		// //////////////////////
+
+		//		// sorted processor names in L at this point
+		//		// process them in order
+		for (Pair pnameInContext : sorted) {
+			//			// process pname's inputs -- set ANL to be the DNL if not set in prior steps
+			String pname     = pnameInContext.getV1();
+			String workflowId = pnameInContext.getV2();
+
+			List<Port> inputs = getPq().getInputPorts(pname, workflowId); // null -> do not use instance (??) CHECK
+
+			int totalANL = 0;
+			for (Port iv : inputs) {
+
+				if (! iv.isResolvedDepthSet()) {
+					iv.setResolvedDepth(iv.getDepth());
+					getPw().updatePort(iv);
+				}
+
+				int delta_nl = iv.getResolvedDepth() - iv.getDepth();
+
+				// if delta_nl < 0 then Taverna wraps the value into a list --> use dnl(X) in this case
+				if (delta_nl < 0 ) delta_nl = 0;// CHECK iv.getTypedepth();
+
+				totalANL += delta_nl;
+
+				// this should take care of the special case of the top level dataflow with inputs that have successors in the graph
+				// propagate this through all the links from this var
+//				List<Port> successors = getPq().getSuccVars(pname, iv.getVName(), workflowRunId);
+
+//				for (Port v : successors) {
+//				v.setresolvedDepth(iv.getresolvedDepth());
+//				getPw().updateVar(v);
+//				}
+			}
+
+			// process pname's outputs -- set ANL based on the sum formula (see
+			// paper)
+			for (Port ov : getPq().getOutputPorts(pname, workflowId)) {
+
+				ov.setResolvedDepth(ov.getDepth() + totalANL);
+
+				logger.debug("anl for "+pname+":"+ov.getPortName()+" = "+(ov.getDepth() + totalANL));
+				getPw().updatePort(ov);
+
+				// propagate this through all the links from this var
+				for (Port v : getPq().getSuccPorts(pname, ov.getPortName(), workflowId)) {
+					List<Port> toBeProcessed = new ArrayList<>();
+					toBeProcessed.add(v);
+
+					if (v.getProcessorId() == null && v.isInputPort()) {  // this is the input to a nested workflow
+//						String tempWorkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
+						String tempWorkflowId = pq
+								.getWorkflowIdForExternalName(v
+										.getProcessorName());
+						List<Port> realSuccessors = getPq().getSuccPorts(
+								v.getProcessorName(), v.getPortName(),
+								tempWorkflowId);
+
+						toBeProcessed.remove(0);
+						toBeProcessed.addAll(realSuccessors);
+
+					} else if (v.getProcessorId() == null && !v.isInputPort()) {  // this is the output to a nested workflow
+//						String tempworkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
+						List<Port> realSuccessors = getPq().getSuccPorts(
+								v.getProcessorName(), v.getPortName(), null);
+
+						toBeProcessed.remove(0);
+						toBeProcessed.addAll(realSuccessors);
+					}
+
+					for (Port v1 : toBeProcessed) {
+						v1.setResolvedDepth(ov.getResolvedDepth());
+						logger.debug("anl for " + v1.getProcessorName() + ":"
+								+ v1.getPortName() + " = "
+								+ ov.getResolvedDepth());
+						getPw().updatePort(v1);
+					}
+				}
+			}
+		}
+	}
+
+	public void setPw(ProvenanceWriter pw) {
+		this.pw = pw;
+	}
+
+	public ProvenanceWriter getPw() {
+		return pw;
+	}
+
+	public void setPq(ProvenanceQuery pq) {
+		this.pq = pq;
+	}
+
+	public ProvenanceQuery getPq() {
+		return pq;
+	}
+
+	public void setWorkflowRunId(String workflowRunId) {
+		this.workflowRunId = workflowRunId;
+	}
+
+	public String getWorkflowRunId() {
+		return workflowRunId;
+	}
+
+	public void setWfdp(WorkflowDataProcessor wfdp) {
+		this.wfdp = wfdp;
+	}
+
+	public WorkflowDataProcessor getWfdp() {
+		return wfdp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageQueryResultRecord.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageQueryResultRecord.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageQueryResultRecord.java
new file mode 100644
index 0000000..c3865a0
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageQueryResultRecord.java
@@ -0,0 +1,243 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester   
+ * 
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ * 
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *    
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *    
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+/**
+ * 
+ * This Java bean holds a single provenance record, i.e., the finest element of
+ * a provenance graph that is stored in the provenance DB. Essentially this
+ * represents one data element (value) flowing through a port (vname) of a
+ * processor (pname), in the context of one run (wfInstance) of a workflow
+ * (wfname). The record may include an <b>iteration</b> vector, used when the
+ * same processor receives multiple values on the same port, as part of
+ * iterative processing. When the value belongs to a collection (a nested list),
+ * the <b>collIdRef</b> field contains a reference to that collection.
+ * 
+ * @author Paolo Missier
+ * 
+ */
+public class LineageQueryResultRecord {
+	private String workflowId;
+	private String processorName;
+	private String portName;
+	private String workflowRunId;
+	private String iteration;
+	private String value;     // atomic or XML-formatted collection -- this is actually a reference to the value...
+	private String collIdRef;
+	private String parentCollIDRef;
+	private String resolvedValue;
+	private String type;  // one of referenceSet, referenceSetCollection
+	boolean printResolvedValue;
+	boolean isInput; 
+	boolean isCollection;
+
+	@Override
+	public String toString() {
+		if (isCollection) {
+			return "COLLECTION: proc " + getProcessorName() + " var "
+					+ getPortName() + " " + " iteration: " + getIteration()
+					+ " value: " + getValue() + " collection id: "
+					+ getCollectionT2Reference() + " parent collection: "
+					+ getParentCollIDRef();
+		} else if (printResolvedValue)
+			return "workflow " + getworkflowId() + " proc "
+					+ getProcessorName() + " var " + getPortName() + " "
+					+ " iteration: " + getIteration() + " value: " + getValue()
+					+ " collection id: " + getCollectionT2Reference()
+					+ " resolvedValue: " + getResolvedValue();
+		else
+			return "workflow " + getworkflowId() + " proc "
+					+ getProcessorName() + " var " + getPortName() + " "
+					+ " iteration: " + getIteration() + " collection id: "
+					+ getCollectionT2Reference() + " value: " + getValue();
+	}
+
+	/**
+	 * @return the pname
+	 */
+	public String getProcessorName() {
+		return processorName;
+	}
+	/**
+	 * @param pname the pname to set
+	 */
+	public void setProcessorName(String pname) {
+		this.processorName = pname;
+	}
+	/**
+	 * @return the vname
+	 */
+	public String getPortName() {
+		return portName;
+	}
+	/**
+	 * @param vname the vname to set
+	 */
+	public void setPortName(String vname) {
+		this.portName = vname;
+	}
+	/**
+	 * @return the workflowRun
+	 */
+	public String getWorkflowRunId() {
+		return workflowRunId;
+	}
+	/**
+	 * @param workflowRun the workflowRun to set
+	 */
+	public void setWorkflowRunId(String workflowRun) {
+		this.workflowRunId = workflowRun;
+	}
+	/**
+	 * @return the value
+	 */
+	public String getValue() {
+		return value;
+	}
+	/**
+	 * @param value the value to set
+	 */
+	public void setValue(String value) {
+		this.value = value;
+	}
+	/**
+	 * @return the type
+	 */
+	public String getType() {
+		return type;
+	}
+	/**
+	 * @param type the type to set
+	 */
+	public void setType(String type) {
+		this.type = type;
+	}
+	/**
+	 * @return the iteration
+	 */
+	public String getIteration() {
+		return iteration;
+	}
+	/**
+	 * @param iteration the iteration to set
+	 */
+	public void setIteration(String iteration) {
+		this.iteration = iteration;
+	}
+	/**
+	 * @return the resolvedValue
+	 */
+	public String getResolvedValue() {
+		return resolvedValue;
+	}
+	/**
+	 * @param resolvedValue the resolvedValue to set
+	 */
+	public void setResolvedValue(String resolvedValue) {
+		this.resolvedValue = resolvedValue;
+	}
+
+
+	public void setPrintResolvedValue(boolean printResolvedValue) {
+		this.printResolvedValue = printResolvedValue;
+	}
+
+
+	/**
+	 * @return the isInput
+	 */
+	public boolean isInputPort() {
+		return isInput;
+	}
+
+
+	/**
+	 * @param isInput the isInput to set
+	 */
+	public void setIsInputPort(boolean isInput) {
+		this.isInput = isInput;
+	}
+
+
+	/**
+	 * @return the collIdRef
+	 */
+	public String getCollectionT2Reference() {
+		return collIdRef;
+	}
+
+
+	/**
+	 * @param collIdRef the collIdRef to set
+	 */
+	public void setCollectionT2Reference(String collIdRef) {
+		this.collIdRef = collIdRef;
+	}
+
+
+	/**
+	 * @return the isCollection
+	 */
+	public boolean isCollection() {
+		return isCollection;
+	}
+
+
+	/**
+	 * @param isCollection the isCollection to set
+	 */
+	public void setCollection(boolean isCollection) {
+		this.isCollection = isCollection;
+	}
+
+
+	/**
+	 * @return the parentCollIDRef
+	 */
+	public String getParentCollIDRef() {
+		return parentCollIDRef;
+	}
+
+
+	/**
+	 * @param parentCollIDRef the parentCollIDRef to set
+	 */
+	public void setParentCollIDRef(String parentCollIDRef) {
+		this.parentCollIDRef = parentCollIDRef;
+	}
+
+
+	/**
+	 * @return the workflowId
+	 */
+	public String getworkflowId() {
+		return workflowId;
+	}
+
+
+	/**
+	 * @param workflowId the workflowId to set
+	 */
+	public void setWorkflowId(String workflowId) {
+		this.workflowId = workflowId;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageSQLQuery.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageSQLQuery.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageSQLQuery.java
new file mode 100644
index 0000000..38fd8f8
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/LineageSQLQuery.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester   
+ * 
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ * 
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *    
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *    
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+/**
+ * encapsulates an SQL query along with directives on how to interpret the
+ * results, i.e., which elements of the select clause are to be considered
+ * relevant. For instance when the query includes a join with Collection, the
+ * intent is that lineage should return the collection itself as opposed to any
+ * of its elements.
+ * 
+ * @author paolo
+ * 
+ */
+public class LineageSQLQuery {
+	private String vbQuery = null;
+	private String collQuery = null;
+
+	/** =0 => use var values, >0 => use enclosing collection */
+	private int depth = 0;
+
+	/**
+	 * @return the depth
+	 */
+	public int getdepth() {
+		return depth;
+	}
+
+	/**
+	 * @param depth
+	 *            the depth to set
+	 */
+	public void setdepth(int depth) {
+		this.depth = depth;
+	}
+
+	/**
+	 * @return the collQuery
+	 */
+	public String getCollQuery() {
+		return collQuery;
+	}
+
+	/**
+	 * @param collQuery the collQuery to set
+	 */
+	public void setCollQuery(String collQuery) {
+		this.collQuery = collQuery;
+	}
+
+	/**
+	 * @return the vbQuery
+	 */
+	public String getVbQuery() {
+		return vbQuery;
+	}
+
+	/**
+	 * @param vbQuery the vbQuery to set
+	 */
+	public void setVbQuery(String vbQuery) {
+		this.vbQuery = vbQuery;
+	}	
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Provenance.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Provenance.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Provenance.java
new file mode 100644
index 0000000..839985e
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Provenance.java
@@ -0,0 +1,199 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester   
+ * 
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ * 
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *    
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *    
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.taverna.provenance.connector.AbstractProvenanceConnector;
+import net.sf.taverna.t2.provenance.item.ProvenanceItem;
+import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
+import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
+import org.apache.taverna.workflowmodel.Dataflow;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Implemented by the database class that a {@link AbstractProvenanceConnector}
+ * implementation uses for storage purposes
+ * 
+ * @author Paolo Missier
+ * @author Ian Dunlop
+ * 
+ */
+//FIXME is this class really needed. Can't we just push the
+//acceptRawProvanceEvent up into the ProvenanceConnector?
+public class Provenance {
+	private static Logger logger = Logger.getLogger(Provenance.class);
+
+	protected ProvenanceQuery pq;
+	protected ProvenanceWriter pw;
+	protected EventProcessor ep;
+
+	private String saveEvents;
+	
+	private volatile boolean firstWorkflowStructure = true;
+
+	public boolean isFirstWorkflowStructure() {
+		return firstWorkflowStructure;
+	}
+
+	public void setFirstWorkflowStructure(boolean firstWorkflowStructure) {
+		this.firstWorkflowStructure = firstWorkflowStructure;
+	}
+
+	private List<String> workflowIDStack = Collections.synchronizedList(new ArrayList<String>());
+	
+	private Map<String, String> workflowIDMap = new ConcurrentHashMap<String, String>();
+
+	public Provenance() {	}
+
+	public Provenance(EventProcessor eventProcessor) {
+		this.ep = eventProcessor;
+		this.pq = ep.getPq();
+		this.pw = ep.getPw();		
+	}
+
+	public void clearDB() throws SQLException {
+		getPw().clearDBStatic();
+		getPw().clearDBDynamic();
+	}
+	
+
+	/**
+	 * @return the saveEvents
+	 */
+	public String getSaveEvents() {
+		return saveEvents;
+	}
+
+	/**
+	 * @param saveEvents
+	 *            the saveEvents to set
+	 */
+	public void setSaveEvents(String saveEvents) {
+		this.saveEvents = saveEvents;
+	}
+
+	// FIXME I think the provenance query and writer should both come from the
+	// EventProcessor
+	// seems silly setting the ep, pq and pw separately.
+	public void setPq(ProvenanceQuery pq) {
+		this.pq = pq;
+	}
+
+	public ProvenanceQuery getPq() {
+		return pq;
+	}
+
+	public void setPw(ProvenanceWriter pw) {
+		this.pw = pw;
+	}
+
+	public ProvenanceWriter getPw() {
+		return pw;
+	}
+
+	public void setEp(EventProcessor ep) {
+		this.ep = ep;
+	}
+
+	public EventProcessor getEp() {
+		return ep;
+	}
+
+	/**
+	 * maps each incoming event to an insert query into the provenance store
+	 * 
+	 * @param eventType
+	 * @param content
+	 * @throws SQLException
+	 * @throws IOException
+	 */
+	public void acceptRawProvenanceEvent(SharedVocabulary eventType,
+			ProvenanceItem provenanceItem) throws SQLException, IOException {
+		processEvent(provenanceItem, eventType);
+	}
+
+	/**
+	 * parse d and generate SQL insert calls into the provenance DB
+	 * 
+	 * @param d
+	 *            DOM for the event
+	 * @param eventType
+	 *            see {@link SharedVocabulary}
+	 * @throws SQLException
+	 * @throws IOException
+	 */
+	protected void processEvent(ProvenanceItem provenanceItem,
+			SharedVocabulary eventType) throws SQLException, IOException {
+		if (eventType.equals(SharedVocabulary.WORKFLOW_EVENT_TYPE)) {
+			// process the workflow structure
+			//workflowStartedMap.put()
+			WorkflowProvenanceItem workflowProvenanceItem = (WorkflowProvenanceItem) provenanceItem;
+			
+			getEp().getWfdp().workflowStarted.put(workflowProvenanceItem.getIdentifier(), workflowProvenanceItem.getInvocationStarted());
+			if (isFirstWorkflowStructure()) {
+				String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier();
+				String instanceId = provenanceItem.getIdentifier();
+				
+				workflowIDMap.put(instanceId, dataflowId);
+				setFirstWorkflowStructure(false);
+				String processWorkflowStructure = getEp().processWorkflowStructure(provenanceItem);
+				synchronized(workflowIDStack) {
+					workflowIDStack.add(0,processWorkflowStructure);
+				}
+				
+				getEp().propagateANL(provenanceItem.getIdentifier());
+			} else {
+				String dataflowId = workflowProvenanceItem.getDataflow().getIdentifier();
+				String instanceId = provenanceItem.getIdentifier();
+				
+				workflowIDMap.put(instanceId, dataflowId);
+
+				Dataflow df = workflowProvenanceItem.getDataflow();
+				synchronized(workflowIDStack) {
+					workflowIDStack.add(0,df.getIdentifier());
+				}
+			}
+		} else if (provenanceItem.getEventType().equals(SharedVocabulary.END_WORKFLOW_EVENT_TYPE)) {
+//			String currentWorkflowID = workflowIDStack.get(0);
+//			workflowIDStack.remove(0);
+			String currentWorkflowID = provenanceItem.getParentId();
+			
+			getEp().processProcessEvent(provenanceItem, currentWorkflowID);
+			
+		} else {  // all other event types (iteration etc.)
+			logger.debug("processEvent of type "+provenanceItem.getEventType()+" for item of type "+provenanceItem.getClass().getName());
+			String currentWorkflowID = provenanceItem.getWorkflowId();
+//			String currentWorkflowID = workflowIDMap.get(provenanceItem.getParentId());
+
+			getEp().processProcessEvent(provenanceItem, currentWorkflowID);
+		
+//			getEp().processProcessEvent(provenanceItem, workflowIDStack.get(0));
+		}
+	}
+}