You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/30 15:47:15 UTC

[03/12] incubator-taverna-engine git commit: some provenance refactoring

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/ProvenanceQuery.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/ProvenanceQuery.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/ProvenanceQuery.java
new file mode 100644
index 0000000..b864fb4
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/ProvenanceQuery.java
@@ -0,0 +1,2069 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+import static org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataflowInvocationTable.DataflowInvocation;
+import static org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataflowInvocationTable.parentProcessorEnactmentId;
+import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY;
+
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.CollectionTable;
+import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataBindingTable;
+import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataflowInvocationTable;
+import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.ProcessorEnactmentTable;
+import org.apache.taverna.provenance.lineageservice.utils.Collection;
+import org.apache.taverna.provenance.lineageservice.utils.DDRecord;
+import org.apache.taverna.provenance.lineageservice.utils.DataLink;
+import org.apache.taverna.provenance.lineageservice.utils.DataflowInvocation;
+import org.apache.taverna.provenance.lineageservice.utils.NestedListNode;
+import org.apache.taverna.provenance.lineageservice.utils.Port;
+import org.apache.taverna.provenance.lineageservice.utils.PortBinding;
+import org.apache.taverna.provenance.lineageservice.utils.ProcessorEnactment;
+import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor;
+import org.apache.taverna.provenance.lineageservice.utils.Workflow;
+import org.apache.taverna.provenance.lineageservice.utils.WorkflowTree;
+import org.apache.taverna.provenance.lineageservice.utils.WorkflowRun;
+
+import org.apache.log4j.Logger;
+import org.jdom.Document;
+import org.jdom.Element;
+
+import org.apache.taverna.configuration.database.DatabaseManager;
+
+/**
+ * Handles all the querying of provenance items in the database layer. Uses
+ * standard SQL so all specific instances of this class can extend this writer
+ * to handle all of the db queries
+ *
+ * @author Paolo Missier
+ * @author Ian Dunlop
+ * @author Stuart Owen
+ *
+ */
+public abstract class ProvenanceQuery {
+	protected Logger logger = Logger.getLogger(ProvenanceQuery.class);
+	private final DatabaseManager databaseManager;
+
+	public ProvenanceQuery(DatabaseManager databaseManager) {
+		this.databaseManager = databaseManager;
+	}
+
+	public Connection getConnection() throws InstantiationException,
+			IllegalAccessException, ClassNotFoundException, SQLException {
+		return databaseManager.getConnection();
+	}
+
+	private Q query(String baseQuery) {
+		return new Q(baseQuery);
+	}
+	private class Q {
+		private String q;
+		private Map<String,String>where;
+		private List<String> order;
+
+		Q(String baseQuery) {
+			q = baseQuery;
+		}
+
+		public Q where(String key, String value) {
+			if (where == null)
+				where = new HashMap<>();
+			where.put(key, value);
+			return this;
+		}
+
+		public Q where(Map<String, String> clauses) {
+			if (where == null)
+				where = new HashMap<>();
+			where.putAll(clauses);
+			return this;
+		}
+
+		public Q orderBy(String key) {
+			if (order == null)
+				order = new ArrayList<>();
+			order.add(key);
+			return this;
+		}
+		public ResultSet exec(Statement statement) throws SQLException {
+			return statement.executeQuery(query());
+		}
+		public String query() {
+			return addOrderByToQuery(addWhereClauseToQuery(q, where, false), order, false);
+		}
+	}
+
+	/**
+	 * implements a set of query constraints of the form var = value into a
+	 * WHERE clause
+	 *
+	 * @param q
+	 * @param queryConstraints
+	 * @return
+	 */
+	protected String addWhereClauseToQuery(String q,
+			Map<String, String> queryConstraints, boolean terminate) {
+
+		// complete query according to constraints
+		StringBuilder buffer = new StringBuilder(q);
+
+		String sep = " WHERE ";
+		if (queryConstraints != null)
+			for (Entry<String, String> entry : queryConstraints.entrySet()) {
+				buffer.append(sep).append(entry.getKey())
+						.append(" = \'").append(entry.getValue()).append("\' ");
+				sep = " AND ";
+			}
+		return buffer.toString();
+	}
+
+	protected String addOrderByToQuery(String q, List<String> orderAttr,
+			boolean terminate) {
+		// complete query according to constraints
+		StringBuilder buffer = new StringBuilder(q);
+
+		String sep = " ORDER BY ";
+		if (orderAttr != null)
+			for (String attr : orderAttr) {
+				buffer.append(sep).append(attr);
+				sep = ",";
+			}
+		return buffer.toString();
+	}
+
+	/**
+	 * select Port records that satisfy constraints
+	 */
+	public List<Port> getPorts(Map<String, String> queryConstraints)
+			throws SQLException {
+		List<Port> result = new ArrayList<>();
+
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement();
+				ResultSet rs = query(
+						"SELECT DISTINCT V.* FROM Port V "
+								+ "JOIN WorkflowRun W ON W.workflowId = V.workflowId")
+						.where(queryConstraints)
+						.orderBy("V.iterationStrategyOrder").exec(stmt)) {
+			while (rs.next()) {
+				Port aPort = new Port();
+
+				aPort.setWorkflowId(rs.getString("workflowId"));
+				aPort.setInputPort(rs.getBoolean("isInputPort"));
+				aPort.setIdentifier(rs.getString("portId"));
+				aPort.setProcessorName(rs.getString("processorName"));
+				aPort.setProcessorId(rs.getString("processorId"));
+				aPort.setPortName(rs.getString("portName"));
+				aPort.setDepth(rs.getInt("depth"));
+				if (rs.getString("resolvedDepth") != null)
+					aPort.setResolvedDepth(rs.getInt("resolvedDepth"));
+				result.add(aPort);
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+
+	/**
+	 * return the input variables for a given processor and a workflowRunId
+	 *
+	 * @param pname
+	 * @param workflowRunId
+	 * @return list of input variables
+	 * @throws SQLException
+	 */
+	public List<Port> getInputPorts(String pname, String wfID)
+			throws SQLException {
+		// get (var, proc) from Port to see if it's input/output
+		Map<String, String> varQueryConstraints = new HashMap<>();
+
+		varQueryConstraints.put("V.workflowId", wfID);
+		varQueryConstraints.put("V.processorName", pname);
+		varQueryConstraints.put("V.isInputPort", "1");
+		return getPorts(varQueryConstraints);
+	}
+
+	/**
+	 * return the output variables for a given processor and a workflowRunId
+	 *
+	 * @param pname
+	 * @param workflowRunId
+	 * @return list of output variables
+	 * @throws SQLException
+	 */
+	public List<Port> getOutputPorts(String pname, String wfID)
+			throws SQLException {
+		// get (var, proc) from Port to see if it's input/output
+		Map<String, String> varQueryConstraints = new HashMap<>();
+
+		varQueryConstraints.put("V.workflowId", wfID);
+		varQueryConstraints.put("V.processorName", pname);
+		varQueryConstraints.put("V.isInputPort", "0");
+		return getPorts(varQueryConstraints);
+	}
+
+	/**
+	 * selects all Datalinks
+	 *
+	 * @param queryConstraints
+	 * @return
+	 * @throws SQLException
+	 */
+	public List<DataLink> getDataLinks(Map<String, String> queryConstraints)
+			throws SQLException {
+		List<DataLink> result = new ArrayList<>();
+
+		String q = addWhereClauseToQuery("SELECT A.* FROM Datalink A", queryConstraints, true);
+
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement()) {
+			ResultSet rs = stmt.executeQuery(q);
+			while (rs.next()) {
+				DataLink aDataLink = new DataLink();
+
+				aDataLink.setWorkflowId(rs.getString("workflowId"));
+				aDataLink.setSourceProcessorName(rs
+						.getString("sourceProcessorName"));
+				aDataLink.setSourcePortName(rs.getString("sourcePortName"));
+				aDataLink.setDestinationProcessorName(rs
+						.getString("destinationProcessorName"));
+				aDataLink.setDestinationPortName(rs
+						.getString("destinationPortName"));
+				aDataLink.setSourcePortId(rs.getString("sourcePortId"));
+				aDataLink.setDestinationPortId(rs
+						.getString("destinationPortId"));
+				result.add(aDataLink);
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+
+		return result;
+	}
+
+	public String getTopLevelWorkflowIdForRun(String runID) throws SQLException {
+		for (Workflow w : getWorkflowsForRun(runID))
+			if (w.getParentWorkflowId() == null)
+				return w.getWorkflowId();
+		return null;
+	}
+
+	/**
+	 * returns the names of all workflows (top level + nested) for a given runID
+	 * @param runID
+	 * @return
+	 * @throws SQLException
+	 */
+	public List<String> getWorkflowIdsForRun(String runID) throws SQLException {
+		List<String> workflowIds = new ArrayList<>();
+		for (Workflow w : getWorkflowsForRun(runID))
+			workflowIds.add(w.getWorkflowId());
+		return workflowIds;
+	}
+
+	/**
+	 * returns the workflows associated to a single runID
+	 * @param runID
+	 * @return
+	 * @throws SQLException
+	 */
+	public List<Workflow> getWorkflowsForRun(String runID) throws SQLException {
+		List<Workflow> result = new ArrayList<>();
+		String q = "SELECT DISTINCT W.* FROM WorkflowRun I JOIN Workflow W ON I.workflowId = W.workflowId WHERE workflowRunId = ?";
+		try (Connection connection = getConnection();
+				PreparedStatement stmt = connection.prepareStatement(q)) {
+			stmt.setString(1, runID);
+			ResultSet rs = stmt.executeQuery();
+			while (rs.next()) {
+				Workflow w = new Workflow();
+				w.setWorkflowId(rs.getString("workflowId"));
+				w.setParentWorkflowId(rs.getString("parentWorkflowId"));
+				result.add(w);
+			}
+		} catch (InstantiationException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.error("Error finding the workflow reference", e);
+		}
+		return result;
+	}
+
+	public String getLatestRunID() throws SQLException {
+		String q = "SELECT workflowRunId FROM WorkflowRun ORDER BY timestamp DESC";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(q)) {
+			ResultSet rs = ps.executeQuery();
+			if (rs.next())
+				return rs.getString("workflowRunId");
+		} catch (Exception e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	/**
+	 * @param dataflowID
+	 * @param conditions currently only understands "from" and "to" as timestamps for range queries
+	 * @return
+	 * @throws SQLException
+	 */
+	public List<WorkflowRun> getRuns(String dataflowID,
+			Map<String, String> conditions) throws SQLException {
+		List<WorkflowRun> result = new ArrayList<>();
+		StringBuilder q = new StringBuilder(
+				"SELECT * FROM WorkflowRun I join Workflow W on I.workflowId = W.workflowId");
+		List<String> conds = new ArrayList<>();
+		if (dataflowID != null)
+			conds.add("I.workflowId = '" + dataflowID + "'");
+		if (conditions != null) {
+			if (conditions.get("from") != null)
+				conds.add("timestamp >= " + conditions.get("from"));
+			if (conditions.get("to") != null)
+				conds.add("timestamp <= " + conditions.get("to"));
+		}
+		String sep = " where ";
+		for (String cond : conds) {
+			q.append(sep).append(cond);
+			sep = " and ";
+		}
+
+		q.append(" ORDER BY timestamp desc ");
+
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(q.toString())) {
+			logger.debug(q);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next()) {
+				WorkflowRun i = new WorkflowRun();
+				i.setWorkflowRunId(rs.getString("workflowRunId"));
+				i.setTimestamp(rs.getString("timestamp"));
+				i.setWorkflowId(rs.getString("workflowId"));
+				i.setWorkflowExternalName(rs.getString("externalName"));
+				Blob blob = rs.getBlob("dataflow");
+				long length = blob.length();
+				blob.getBytes(1, (int) length);
+				i.setDataflowBlob(blob.getBytes(1, (int) length));
+				result.add(i);
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+	/**
+	 * @param constraints
+	 *            a Map columnName -> value that defines the query constraints.
+	 *            Note: columnName must be fully qualified. This is not done
+	 *            well at the moment, i.e., processorNameRef should be
+	 *            PortBinding.processorNameRef to avoid ambiguities
+	 * @return
+	 * @throws SQLException
+	 */
+	public List<PortBinding> getPortBindings(Map<String, String> constraints)
+			throws SQLException {
+		List<PortBinding> result = new ArrayList<>();
+
+		String q = "SELECT * FROM PortBinding VB "
+				+ "JOIN Port V ON VB.portName = V.portName "
+				+ "AND VB.processorNameRef = V.processorName "
+				+ "AND VB.workflowId = V.workflowId ";
+
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement();) {
+			ResultSet rs = stmt.executeQuery(addWhereClauseToQuery(q,
+					constraints, true));
+			while (rs.next()) {
+				PortBinding vb = new PortBinding();
+
+				vb.setWorkflowId(rs.getString("workflowId"));
+				vb.setPortName(rs.getString("portName"));
+				vb.setWorkflowRunId(rs.getString("workflowRunId"));
+				vb.setValue(rs.getString("value"));
+
+				if (rs.getString("collIdRef") == null || rs.getString("collIdRef").equals("null")) {
+					vb.setCollIDRef(null);
+				} else {
+					vb.setCollIDRef(rs.getString("collIdRef"));
+				}
+
+				vb.setIteration(rs.getString("iteration"));
+				vb.setProcessorName(rs.getString("processorNameRef"));
+				vb.setPositionInColl(rs.getInt("positionInColl"));
+				vb.setPortId(rs.getString("portId"));
+				vb.setIsInputPort(rs.getBoolean("isInputPort"));
+				result.add(vb);
+			}
+		} catch (Exception e) {
+			logger.warn("Add VB failed", e);
+		}
+		return result;
+	}
+
+	public List<NestedListNode> getNestedListNodes(
+			Map<String, String> constraints) throws SQLException {
+		List<NestedListNode> result = new ArrayList<>();
+
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement();
+				ResultSet rs = query("SELECT * FROM Collection C ").where(
+						constraints).exec(stmt)) {
+			while (rs.next()) {
+				NestedListNode nln = new NestedListNode();
+
+				nln.setCollectionT2Reference(rs.getString("collId"));
+				nln.setParentCollIdRef(rs.getString("parentCollIdRef"));
+				nln.setWorkflowRunId(rs.getString("workflowRunId"));
+				nln.setProcessorName(rs.getString("processorNameRef"));
+				nln.setPortName(rs.getString("portName"));
+				nln.setIteration(rs.getString("iteration"));
+
+				result.add(nln);
+			}
+		} catch (InstantiationException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.error("Error finding the nested list nodes", e);
+		}
+		return result;
+	}
+
+	public Map<String, Integer> getPredecessorsCount(String workflowRunId) {
+		Map<String, Integer> result = new HashMap<>();
+
+		// get all datalinks for the entire workflow structure for this particular instance
+		try (Connection connection = getConnection()) {
+			PreparedStatement ps = connection
+					.prepareStatement("SELECT A.sourceProcessorName as source , A.destinationProcessorName as sink, A.workflowId as workflowId1, W1.workflowId as workflowId2, W2.workflowId as workflowId3 "
+							+ "FROM Datalink A join WorkflowRun I on A.workflowId = I.workflowId "
+							+ "left outer join Workflow W1 on W1.externalName = A.sourceProcessorName "
+							+ "left outer join Workflow W2 on W2.externalName = A.destinationProcessorName "
+							+ "where I.workflowRunId = ?");
+			ps.setString(1, workflowRunId);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next()) {
+				String sink = rs.getString("sink");
+				String source = rs.getString("source");
+
+				if (result.get(sink) == null)
+					result.put(sink, 0);
+
+				String name1 = rs.getString("workflowId1");
+				String name2 = rs.getString("workflowId2");
+				String name3 = rs.getString("workflowId3");
+
+				if (isDataflow(source) && name1.equals(name2))
+					continue;
+				if (isDataflow(sink) && name1.equals(name3))
+					continue;
+
+				result.put(sink, result.get(sink) + 1);
+			}
+		} catch (InstantiationException | SQLException | IllegalAccessException | ClassNotFoundException e1) {
+			logger.warn("Could not execute query", e1);
+		}
+		return result;
+	}
+
+	/**
+	 * new impl of getProcessorsIncomingLinks whicih avoids complications due to nesting, and relies on the workflowRunId
+	 * rather than the workflowId
+	 * @param workflowRunId
+	 * @return
+	 */
+	public Map<String, Integer> getPredecessorsCountOld(String workflowRunId) {
+		Map<String, Integer> result = new HashMap<>();
+
+		// get all datalinks for the entire workflow structure for this particular instance
+		try (Connection connection = getConnection()) {
+			PreparedStatement ps = connection
+					.prepareStatement("SELECT destinationProcessorName, P1.firstActivityClass, count(*) as pred "
+							+ " FROM Datalink A join WorkflowRun I on A.workflowId = I.workflowId "
+							+ " join Processor P1 on P1.processorName = A.destinationProcessorName "
+							+ " join Processor P2 on P2.processorName = A.sourceProcessorName "
+							+ "  where I.workflowRunId = ? "
+							+ "  and P2.firstActivityClass <> '"
+							+ DATAFLOW_ACTIVITY
+							+ "' "
+							+ " and ((P1.firstActivityClass = '"
+							+ DATAFLOW_ACTIVITY
+							+ "'  and P1.workflowId = A.workflowId) or "
+							+ " (P1.firstActivityClass <> '"
+							+ DATAFLOW_ACTIVITY
+							+ "' )) "
+							+ " group by A.destinationProcessorName, firstActivityClass");
+			ps.setString(1, workflowRunId);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next())
+				result.put(rs.getString("destinationProcessorName"),
+						new Integer(rs.getInt("pred")));
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e1) {
+			logger.warn("Could not execute query", e1);
+		}
+		return result;
+	}
+
+	/**
+	 * used in the toposort phase -- propagation of anl() values through the
+	 * graph
+	 *
+	 * @param workflowId
+	 *            reference to static wf name
+	 * @return a map <processor name> --> <incoming links count> for each
+	 *         processor, without counting the datalinks from the dataflow input to
+	 *         processors. So a processor is at the root of the graph if it has
+	 *         no incoming links, or all of its incoming links are from dataflow
+	 *         inputs.<br/>
+	 *         Note: this must be checked for processors that are roots of
+	 *         sub-flows... are these counted as top-level root nodes??
+	 */
+	public Map<String, Integer> getProcessorsIncomingLinks(String workflowId)
+			throws SQLException {
+		Map<String, Integer> result = new HashMap<>();
+
+		String currentWorkflowProcessor = null;
+		String sql = "SELECT processorName, firstActivityClass FROM Processor "
+				+ "WHERE workflowId = ?";
+
+		try (Connection c = getConnection()) {
+			try (PreparedStatement ps = c.prepareStatement(sql)) {
+				ps.setString(1, workflowId);
+				ResultSet rs = ps.executeQuery();
+				while (rs.next()) {
+					// PM CHECK 6/09
+					if (rs.getString("firstActivityClass").equals(
+							DATAFLOW_ACTIVITY)) {
+						currentWorkflowProcessor = rs
+								.getString("processorName");
+						logger.info("currentWorkflowProcessor = "
+								+ currentWorkflowProcessor);
+					}
+					result.put(rs.getString("processorName"), 0);
+				}
+			}
+
+			/*
+			 * fetch the name of the top-level dataflow. We use this to exclude
+			 * datalinks outgoing from its inputs
+			 */
+
+			// CHECK below -- gets confused on nested workflows
+			String parentWF = getParentOfWorkflow(workflowId);
+			if (parentWF == null)
+				parentWF = workflowId; // null parent means we are the top
+			logger.debug("parent WF: " + parentWF);
+
+			// get nested dataflows -- we want to avoid these in the toposort algorithm
+			List<ProvenanceProcessor> procs = getProcessorsShallow(c,
+					DATAFLOW_ACTIVITY, parentWF);
+
+			StringBuilder q = new StringBuilder("SELECT destinationProcessorName, count(*) AS cnt ");
+			q.append("FROM Datalink WHERE workflowId = \'").append(workflowId)
+					.append("\' AND destinationProcessorName NOT IN (");
+			String sep = "";
+			for (ProvenanceProcessor p : procs) {
+				q.append(sep).append("'").append(p.getProcessorName())
+						.append("'");
+				sep = ",";
+			}
+			q.append(") GROUP BY destinationProcessorName");
+
+			logger.info("executing \n" + q);
+
+			try (Statement stmt = c.createStatement();
+					ResultSet rs = stmt.executeQuery(q.toString())) {
+				while (rs.next())
+					if (!rs.getString("destinationProcessorName").equals(
+							currentWorkflowProcessor))
+						result.put(rs.getString("destinationProcessorName"),
+								rs.getInt("cnt"));
+				result.put(currentWorkflowProcessor, 0);
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+
+		return result;
+	}
+
+	public List<Port> getSuccPorts(String processorName, String portName,
+			String workflowId) throws SQLException {
+		List<Port> result = new ArrayList<>();
+		String sql = "SELECT v.* "
+				+ "FROM Datalink a JOIN Port v ON a.destinationProcessorName = v.processorName "
+				+ "AND  a.destinationPortName = v.portName "
+				+ "AND a.workflowId = v.workflowId "
+				+ "WHERE sourcePortName=? AND sourceProcessorName=?";
+		if (workflowId != null)
+			sql += " AND a.workflowId=?";
+
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, portName);
+			ps.setString(2, processorName);
+			if (workflowId != null)
+				ps.setString(3, workflowId);
+
+			ResultSet rs = ps.executeQuery();
+			while (rs.next()) {
+				Port aPort = new Port();
+
+				aPort.setWorkflowId(rs.getString("workflowId"));
+				aPort.setInputPort(rs.getBoolean("isInputPort"));
+				aPort.setIdentifier(rs.getString("portId"));
+				aPort.setProcessorName(rs.getString("processorName"));
+				aPort.setProcessorId(rs.getString("processorId"));
+				aPort.setPortName(rs.getString("portName"));
+				aPort.setDepth(rs.getInt("depth"));
+				if (rs.getString("resolvedDepth") != null)
+					aPort.setResolvedDepth(rs.getInt("resolvedDepth"));
+				result.add(aPort);
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+	public List<String> getSuccProcessors(String pName, String workflowId,
+			String workflowRunId) throws SQLException {
+		List<String> result = new ArrayList<>();
+		String sql = "SELECT distinct destinationProcessorName FROM Datalink A JOIN WorkflowRun I on A.workflowId = I.workflowId "
+				+ "WHERE A.workflowId = ? and I.workflowRunId = ? AND sourceProcessorName = ?";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, workflowId);
+			ps.setString(2, workflowRunId);
+			ps.setString(3, pName);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next())
+				result.add(rs.getString("destinationProcessorName"));
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+	/**
+	 * get all processors of a given type within a structure identified by
+	 * workflowId (reference to dataflow). type constraint is ignored if value is null.<br>
+	 * this only returns the processor for the input workflowId, without going into any neted workflows
+	 *
+	 * @param workflowId
+	 * @param firstActivityClass
+	 * @return a list, that contains at most one element
+	 * @throws SQLException
+	 */
+	public List<ProvenanceProcessor> getProcessorsShallow(
+			String firstActivityClass, String workflowId) throws SQLException {
+		Map<String, String> constraints = new HashMap<>();
+		constraints.put("P.workflowId", workflowId);
+		if (firstActivityClass != null)
+			constraints.put("P.firstActivityClass", firstActivityClass);
+		return getProcessors(constraints);
+	}
+
+	private List<ProvenanceProcessor> getProcessorsShallow(Connection c,
+			String firstActivityClass, String workflowId) throws SQLException {
+		Map<String, String> constraints = new HashMap<>();
+		constraints.put("P.workflowId", workflowId);
+		if (firstActivityClass != null)
+			constraints.put("P.firstActivityClass", firstActivityClass);
+		return getProcessors(c, constraints);
+	}
+
+	public ProvenanceProcessor getProvenanceProcessorByName(
+			String workflowId, String processorName) {
+		Map<String, String> constraints = new HashMap<>();
+		constraints.put("P.workflowId", workflowId);
+		constraints.put("P.processorName", processorName);
+		List<ProvenanceProcessor> processors;
+		try {
+			processors = getProcessors(constraints);
+		} catch (SQLException e1) {
+			logger.warn("Could not find processor for " + constraints, e1);
+			return null;
+		}
+		if (processors.size() != 1) {
+			logger.warn("Could not uniquely find processor for " + constraints + ", got: " + processors);
+			return null;
+		}
+		return processors.get(0);
+	}
+
+	public ProvenanceProcessor getProvenanceProcessorById(String processorId) {
+		Map<String, String> constraints = new HashMap<>();
+		constraints.put("P.processorId", processorId);
+		List<ProvenanceProcessor> processors;
+		try {
+			processors = getProcessors(constraints);
+		} catch (SQLException e1) {
+			logger.warn("Could not find processor for " + constraints, e1);
+			return null;
+		}
+		if (processors.size() != 1) {
+			logger.warn("Could not uniquely find processor for " + constraints
+					+ ", got: " + processors);
+			return null;
+		}
+		return processors.get(0);
+	}
+
+	/**
+	 * this is similar to {@link #getProcessorsShallow(String, String)} but it
+	 * recursively fetches all processors within nested workflows. The result is
+	 * collected in the form of a map: workflowId -> {ProvenanceProcessor}
+	 * 
+	 * @param firstActivityClass
+	 * @param workflowId
+	 * @return a map: workflowId -> {ProvenanceProcessor} where workflowId is
+	 *         the name of a (possibly nested) workflow, and the values are the
+	 *         processors within that workflow
+	 */
+	public Map<String, List<ProvenanceProcessor>> getProcessorsDeep(
+			String firstActivityClass, String workflowId) {
+		Map<String, List<ProvenanceProcessor>> result = new HashMap<>();
+
+		try {
+			List<ProvenanceProcessor> currentProcs = getProcessorsShallow(null,
+					workflowId);
+			List<ProvenanceProcessor> matchingProcessors = new ArrayList<>();
+			result.put(workflowId, matchingProcessors);
+			for (ProvenanceProcessor pp:currentProcs) {
+				if (firstActivityClass == null
+						|| pp.getFirstActivityClassName().equals(
+								firstActivityClass))
+					matchingProcessors.add(pp);
+				if (pp.getFirstActivityClassName().equals(DATAFLOW_ACTIVITY)) {
+					// Can't recurse as there's no way to find ID of nested workflow
+					continue;
+					//result.putAll(getProcessorsDeep(firstActivityClass, NESTED_WORKFLOW_ID));
+				}
+			}
+
+			// Silly fallback - use the broken getChildrenOfWorkflow() assuming that no other workflows
+			// have used the same nested workflow
+			for (String childWf : getChildrenOfWorkflow(workflowId))
+				result.putAll(getProcessorsDeep(firstActivityClass, childWf));
+		} catch (SQLException e) {
+			logger.error("Problem getting nested workflow processors for: " + workflowId, e);
+		}
+		return result;
+	}
+
+	public String getDataValue(String valueRef) {
+		String q = "SELECT * FROM Data where dataReference = ?;";
+
+		try (Connection connection = getConnection();
+				PreparedStatement stmt = connection.prepareStatement(q)) {
+			stmt.setString(1, valueRef);
+			ResultSet rs = stmt.executeQuery(q);
+			if (rs.next())
+				return rs.getString("data");
+		} catch (Exception e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	/**
+	 * generic method to fetch processors subject to additional query constraints
+	 * @param constraints
+	 * @return
+	 * @throws SQLException
+	 */
+	public List<ProvenanceProcessor> getProcessors(
+			Map<String, String> constraints) throws SQLException {
+		try (Connection connection = getConnection()) {
+			return getProcessors(connection, constraints);
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return new ArrayList<ProvenanceProcessor>();
+	}
+
+	private List<ProvenanceProcessor> getProcessors(Connection c,
+			Map<String, String> constraints) throws SQLException {
+		List<ProvenanceProcessor> result = new ArrayList<>();
+		try (Statement stmt = c.createStatement();
+				ResultSet rs = query("SELECT P.* FROM Processor P").where(
+						constraints).exec(stmt)) {
+			while (rs.next()) {
+				ProvenanceProcessor proc = new ProvenanceProcessor();
+				proc.setIdentifier(rs.getString("processorId"));
+				proc.setProcessorName(rs.getString("processorName"));
+				proc.setFirstActivityClassName(rs
+						.getString("firstActivityClass"));
+				proc.setWorkflowId(rs.getString("workflowId"));
+				proc.setTopLevelProcessor(rs.getBoolean("isTopLevel"));
+				result.add(proc);
+			}
+		}
+		return result;
+	}
+
+	public List<ProvenanceProcessor> getProcessorsForWorkflow(String workflowID) {
+		List<ProvenanceProcessor> result = new ArrayList<>();
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection
+						.prepareStatement("SELECT * from Processor WHERE workflowId=?")) {
+			ps.setString(1, workflowID);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next()) {
+				ProvenanceProcessor proc = new ProvenanceProcessor();
+				proc.setIdentifier(rs.getString("processorId"));
+				proc.setProcessorName(rs.getString("processorName"));
+				proc.setFirstActivityClassName(rs
+						.getString("firstActivityClass"));
+				proc.setWorkflowId(rs.getString("workflowId"));
+				proc.setTopLevelProcessor(rs.getBoolean("isTopLevel"));
+				result.add(proc);
+			}
+		} catch (SQLException | InstantiationException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.error("Problem getting processor for workflow: "
+					+ workflowID, e);
+		}
+		return result;
+	}
+
+	/**
+	 * simplest possible pinpoint query. Uses iteration info straight away. Assumes result is in PortBinding not in Collection
+	 *
+	 * @param workflowRun
+	 * @param pname
+	 * @param vname
+	 * @param iteration
+	 * @return
+	 */
+	public LineageSQLQuery simpleLineageQuery(String workflowRun, String workflowId, String pname,
+			String vname, String iteration) {
+		LineageSQLQuery lq = new LineageSQLQuery();
+		Q q = query("SELECT * FROM PortBinding VB "
+				+ "JOIN Port V ON (VB.portName = V.portName AND VB.processorNameRef = V.processorName AND VB.workflowId = V.workflowId) "
+				+ "JOIN WorkflowRun W ON VB.workflowRunId = W.workflowRunId AND VB.workflowId = W.workflowId ");
+
+		// constraints:
+		q.where("W.workflowRunId", workflowRun)
+				.where("VB.processorNameRef", pname)
+				.where("VB.workflowId", workflowId);
+
+		if (vname != null)
+			q.where("VB.portName", vname);
+		if (iteration != null)
+			q.where("VB.iteration", iteration);
+
+		// add order by clauses
+		q.orderBy("V.portName").orderBy("iteration");
+
+		logger.debug("Query is: " + q.query());
+		lq.setVbQuery(q.query());
+		return lq;
+	}
+
+	/**
+	 * if var2Path is null this generates a trivial query for the current output
+	 * var and current path
+	 *
+	 * @param workflowRunId
+	 * @param proc
+	 * @param var2Path
+	 * @param outputVar
+	 * @param path
+	 * @param returnOutputs
+	 *            returns inputs *and* outputs if set to true
+	 * @return
+	 */
+	public List<LineageSQLQuery> lineageQueryGen(String workflowRunId, String proc,
+			Map<Port, String> var2Path, Port outputVar, String path,
+			boolean returnOutputs) {
+		// setup
+		List<LineageSQLQuery> newQueries = new ArrayList<>();
+
+		// use the calculated path for each input var
+		boolean isInput = true;
+		for (Port v : var2Path.keySet()) {
+			LineageSQLQuery q = generateSQL2(workflowRunId, proc, v.getPortName(), var2Path.get(v), isInput);
+			if (q != null)
+				newQueries.add(q);
+		}
+
+		// is returnOutputs is true, then use proc, path for the output var as well
+		if (returnOutputs) {
+			isInput = false;
+			LineageSQLQuery q = generateSQL2(workflowRunId, proc, outputVar.getPortName(), path, isInput);  // && !var2Path.isEmpty());
+			if (q != null)
+				newQueries.add(q);
+		}
+		return newQueries;
+	}
+
+	protected LineageSQLQuery generateSQL2(String workflowRun, String proc,
+			String var, String path, boolean returnInput) {
+		LineageSQLQuery lq = new LineageSQLQuery();
+		Q q;
+
+		// base Collection query
+		q = query("SELECT C.*,W.workflowId,V.isInputPort FROM Collection C "
+				+ "JOIN WorkflowRun W ON C.workflowRunId = W.workflowRunId "
+				+ "JOIN Port V ON V.workflowId = W.workflowId "
+				+ "AND C.processorNameRef = V.processorName "
+				+ "AND C.portName = V.portName ");
+		if (path != null && path.length() > 0)
+			q.where("C.iteration", "[" + path + "]"); // PM 1/09 -- path
+		lq.setCollQuery(q.where("W.workflowRunId", workflowRun).where("C.processorNameRef",
+				proc).where("V.isInputPort", returnInput ? "1" : "0").query());
+
+		// base PortBinding query
+		q = query("SELECT VB.*,V.isInputPort FROM PortBinding VB "
+				+ "JOIN WorkflowRun W ON VB.workflowRunId = W.workflowRunId "
+				+ "JOIN Port V on V.workflowId = W.workflowId "
+				+ "AND VB.processorNameRef = V.processorName "
+				+ "AND VB.portName = V.portName ");
+		if (path != null && path.length() > 0)
+			q.where("VB.iteration", "[" + path + "]"); // PM 1/09 -- path
+		lq.setVbQuery(q.where("W.workflowRunId", workflowRun)
+				.where("VB.processorNameRef", proc).where("VB.portName", var)
+				.where("V.isInputPort", returnInput ? "1" : "0")
+				.orderBy("V.portName").orderBy("iteration").query());
+		return lq;
+	}
+
+	/**
+	 * if effectivePath is not null: query varBinding using: workflowRunId =
+	 * workflowRun, iteration = effectivePath, processorNameRef = proc if input vars is
+	 * null, then use the output var this returns the bindings for the set of
+	 * input vars at the correct iteration if effectivePath is null: fetch
+	 * PortBindings for all input vars, without constraint on the iteration<br/>
+	 * additionally, try querying the collection table first -- if the query succeeds, it means
+	 * the path is pointing to an internal node in the collection, and we just got the right node.
+	 * Otherwise, query PortBinding for the leaves
+	 *
+	 * @param workflowRun
+	 * @param proc
+	 * @param effectivePath
+	 * @param returnOutputs
+	 *            returns both inputs and outputs if set to true
+	 * @return
+	 */
+	public LineageSQLQuery generateSQL(String workflowRun, String proc,
+			String effectivePath, boolean returnOutputs) {
+		LineageSQLQuery lq = new LineageSQLQuery();
+		Q q;
+
+		// base Collection query
+		q = query("SELECT * FROM Collection C "
+				+ "JOIN WorkflowRun W ON C.workflowRunId = W.workflowRunId "
+				+ "JOIN Port V ON V.workflowRunId = W.workflowId "
+				+ "AND C.processorNameRef = V.processorNameRef "
+				+ "AND C.portName = V.portName ");
+
+		if (effectivePath != null && effectivePath.length() > 0)
+			q.where("C.iteration", "[" + effectivePath.toString() + "]"); // PM 1/09 -- path
+		// limit to inputs?
+		if (returnOutputs)
+			q.where("V.isInputPort", "1");
+
+		lq.setCollQuery(q.where("W.workflowRunId", workflowRun).where("C.processorNameRef",
+				proc).query());
+
+		// base PortBinding query
+		q = query("SELECT * FROM PortBinding VB "
+				+ "JOIN WorkflowRun W ON VB.workflowRunId = W.workflowRunId "
+				+ "JOIN Port V on V.workflowRunId = W.workflowId "
+				+ "AND VB.processorNameRef = V.processorNameRef "
+				+ "AND VB.portName = V.portName ");
+
+		if (effectivePath != null && effectivePath.length() > 0)
+			q.where("VB.iteration", "[" + effectivePath.toString() + "]"); // PM 1/09 -- path
+		// limit to inputs?
+		if (!returnOutputs)
+			q.where("V.isInputPort", "1");
+
+		lq.setVbQuery(q.where("W.workflowRunId", workflowRun)
+				.where("VB.processorNameRef", proc).orderBy("portName")
+				.orderBy("iteration").query());
+		return lq;
+	}
+
+	public Dependencies runCollectionQuery(LineageSQLQuery lq) throws SQLException {
+		String q = lq.getCollQuery();
+		Dependencies lqr = new Dependencies();
+		if (q == null)
+			return lqr;
+
+		logger.debug("running collection query: " + q);
+
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement()) {
+			ResultSet rs = stmt.executeQuery(q);
+			while (rs.next()) {
+				String type = Dependencies.ATOM_TYPE; // temp -- FIXME
+
+				String workflowId = rs.getString("workflowId");
+				String workflowRun = rs.getString("workflowRunId");
+				String proc = rs.getString("processorNameRef");
+				String var = rs.getString("portName");
+				String it = rs.getString("iteration");
+				String coll = rs.getString("collID");
+				String parentColl = rs.getString("parentCollIDRef");
+				//boolean isInput = rs.getBoolean("isInputPort");
+
+				lqr.addLineageQueryResultRecord(workflowId, proc, var, workflowRun,
+						it, coll, parentColl, null, null, type, false, true);  // true -> is a collection
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return lqr;
+	}
+
+	/**
+	 *
+	 * @param lq
+	 * @param includeDataValue  IGNORED. always false
+	 * @return
+	 * @throws SQLException
+	 */
+	public Dependencies runVBQuery(LineageSQLQuery lq, boolean includeDataValue)
+			throws SQLException {
+		String q = lq.getVbQuery();
+
+		logger.info("running VB query: " + q);
+
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement()) {
+			ResultSet rs = stmt.executeQuery(q);
+			Dependencies lqr = new Dependencies();
+
+			while (rs.next()) {
+				String type = Dependencies.ATOM_TYPE; // temp -- FIXME
+
+				String workflowId = rs.getString("workflowId");
+				String workflowRun = rs.getString("workflowRunId");
+				String proc = rs.getString("processorNameRef");
+				String var = rs.getString("portName");
+				String it = rs.getString("iteration");
+				String coll = rs.getString("collIDRef");
+				String value = rs.getString("value");
+				boolean isInput = rs.getBoolean("isInputPort");
+
+				// FIXME if the data is required then the query needs fixing
+				lqr.addLineageQueryResultRecord(workflowId, proc, var, workflowRun,
+						it, coll, null, value, null, type, isInput, false);
+			}
+			return lqr;
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	/**
+	 * executes one of the lineage queries produced by the graph visit algorithm. This first executes the collection query, and then
+	 * if no result is returned, the varBinding query
+	 *
+	 * @param lq
+	 *            a lineage query computed during the graph traversal
+	 * @param includeDataValue
+	 *            if true, then the referenced value is included in the result.
+	 *            This may only be necessary for testing: the data reference in
+	 *            field value (which is a misleading field name, and actually
+	 *            refers to the data reference) should be sufficient
+	 * @return
+	 * @throws SQLException
+	 */
+	public Dependencies runLineageQuery(LineageSQLQuery lq,
+			boolean includeDataValue) throws SQLException {
+		Dependencies result = runCollectionQuery(lq);
+		if (result.getRecords().isEmpty())
+			return runVBQuery(lq, includeDataValue);
+		return result;
+	}
+
+	public List<Dependencies> runLineageQueries(List<LineageSQLQuery> lqList,
+			boolean includeDataValue) throws SQLException {
+		List<Dependencies> allResults = new ArrayList<>();
+		if (lqList == null)
+			logger.warn("lineage queries list is NULL, nothing to evaluate");
+		else
+			for (LineageSQLQuery lq : lqList)
+				if (lq != null)
+					allResults.add(runLineageQuery(lq, includeDataValue));
+		return allResults;
+	}
+
+	/**
+	 * takes an ordered set of records for the same variable with iteration
+	 * indexes and builds a collection out of it
+	 *
+	 * @param lqr
+	 * @return a jdom Document with the collection
+	 */
+	public Document recordsToCollection(Dependencies lqr) {
+		// process each var name in turn
+		// lqr ordered by var name and by iteration number
+		Document d = new Document(new Element("list"));
+
+		String currentVar = null;
+		for (ListIterator<LineageQueryResultRecord> it = lqr.iterator(); it.hasNext();) {
+			LineageQueryResultRecord record = it.next();
+
+			if (currentVar != null && record.getPortName().equals(currentVar)) {
+				// multiple occurrences
+				addToCollection(record, d);
+				// adds record to d in the correct position given by the iteration vector
+			}
+			if (currentVar == null)
+				currentVar = record.getPortName();
+		}
+		return d;
+	}
+
+	private void addToCollection(LineageQueryResultRecord record, Document d) {
+		Element root = d.getRootElement();
+		String[] itVector = record.getIteration().split(",");
+		Element currentEl = root;
+		// each element gives us a corresponding child in the tree
+		for (int i = 0; i < itVector.length; i++) {
+			int index = Integer.parseInt(itVector[i]);
+			List<?> children = currentEl.getChildren();
+			if (index < children.size())
+				currentEl = (Element) children.get(index);
+			else if (i == itVector.length - 1)
+				currentEl.addContent(new Element(record.getValue()));
+			else
+				currentEl.addContent(new Element("list"));
+		}
+	}
+
+	/**
+	 *
+	 * returns the set of all processors that are structurally contained within
+	 * the wf corresponding to the input dataflow name
+	 * @param workflowName the name of a processor of type DataFlowActivity
+	 * @return
+	 *
+	 * @deprecated as workflow 'names' are not globally unique, this method should not be used!
+	 */
+	@Deprecated
+	public List<String> getContainedProcessors(String workflowName) {
+		List<String> result = new ArrayList<>();
+
+		// dataflow name -> wfRef
+		String containerDataflow = getWorkflowIdForExternalName(workflowName);
+
+		// get all processors within containerDataflow
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection
+						.prepareStatement("SELECT processorName FROM Processor P "
+								+ "WHERE workflowId = ?")) {
+			ps.setString(1, containerDataflow);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next())
+				result.add(rs.getString("processorName"));
+		} catch (InstantiationException | SQLException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+	public String getTopLevelDataflowName(String workflowRunId) {
+		String sql = "SELECT processorName FROM Processor P "
+				+ "JOIN WorkflowRun I on P.workflowId = I.workflowId "
+				+ "WHERE I.workflowRunId = ? AND isTopLevel = 1";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, workflowRunId);
+			ResultSet rs = ps.executeQuery();
+			if (rs.next())
+				return rs.getString("processorName");
+		} catch (InstantiationException | SQLException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	/**
+	 * retrieve a tree structure starting from the top parent
+	 * @param workflowID
+	 * @return
+	 * @throws SQLException
+	 */
+	public WorkflowTree getWorkflowNestingStructure(String workflowID) throws SQLException {
+		WorkflowTree tree = new WorkflowTree();
+
+	    Workflow wf = getWorkflow(workflowID);
+	    tree.setNode(wf);
+
+	    List<String> children = getChildrenOfWorkflow(workflowID);
+	    for (String childWfName:children) {
+	    	WorkflowTree childStructure = getWorkflowNestingStructure(childWfName);
+	    	tree.addChild(childStructure);
+	    }
+	    return tree;
+	}
+
+	/**
+	 * returns the internal ID of a dataflow given its external name
+	 * @param externalName
+	 * @param workflowRunId
+	 * @return
+	 * @deprecated as workflow 'names' are not globally unique, this method should not be used!
+	 */
+	@Deprecated
+	public String getWorkflowIdForExternalName(String externalName) {
+		//"SELECT workflowId FROM Workflow W join WorkflowRun I on W.workflowId = I.workflowId WHERE W.externalName = ? and I.workflowRunId = ?");
+		String sql = "SELECT workflowId FROM Workflow W WHERE W.externalName = ?";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, externalName);
+			// ps.setString(2, workflowRunId);
+			ResultSet rs = ps.executeQuery();
+			if (rs.next())
+				return rs.getString("workflowId");
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	/**
+	 * This method is deprecated as parent workflow ID is not correctly
+	 * recorded. If two workflows both contain the same nested workflow, only
+	 * one of them (the most recently added) will return that nested workflow
+	 * from this method.
+	 *
+	 * @deprecated
+	 * @param parentWorkflowId
+	 * @return
+	 * @throws SQLException
+	 */
+	@Deprecated
+	public List<String> getChildrenOfWorkflow(String parentWorkflowId)
+			throws SQLException {
+		List<String> result = new ArrayList<>();
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection
+						.prepareStatement("SELECT workflowId FROM Workflow WHERE parentWorkflowId = ? ")) {
+			ps.setString(1, parentWorkflowId);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next())
+				result.add(rs.getString("workflowId"));
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+	/**
+	 * fetch children of parentWorkflowId from the Workflow table
+	 *
+	 * @return
+	 * @param childworkflowId
+	 * @throws SQLException
+	 */
+	public String getParentOfWorkflow(String childworkflowId)
+			throws SQLException {
+		String result = null;
+		String q = "SELECT parentWorkflowId FROM Workflow WHERE workflowId = ?";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(q)) {
+			ps.setString(1, childworkflowId);
+
+			logger.debug("getParentOfWorkflow - query: " + q
+					+ "  with workflowId = " + childworkflowId);
+
+			ResultSet rs = ps.executeQuery();
+			while (rs.next()) {
+				result = rs.getString("parentWorkflowId");
+				logger.debug("result: " + result);
+				break;
+			}
+		} catch (InstantiationException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+	public List<String> getAllworkflowIds() throws SQLException {
+		List<String> result = new ArrayList<>();
+		String q = "SELECT workflowId FROM Workflow";
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement()) {
+			ResultSet rs = stmt.executeQuery(q);
+			while (rs.next())
+				result.add(rs.getString("workflowId"));
+		} catch (InstantiationException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+
+	/**
+	 * @deprecated This method is not workflowId aware and should not be used
+	 * @param procName
+	 * @return true if procName is the external name of a dataflow, false
+	 *         otherwise
+	 * @throws SQLException
+	 */
+	public boolean isDataflow(String procName) throws SQLException {
+		String sql = "SELECT firstActivityClass FROM Processor WHERE processorName = ?";
+		try (Connection c = getConnection();
+				PreparedStatement ps = c.prepareStatement(sql)) {
+			ps.setString(1, procName);
+			ResultSet rs = ps.executeQuery();
+			if (rs.next()
+					&& DATAFLOW_ACTIVITY.equals(rs
+							.getString("firstActivityClass")))
+				return true;
+		} catch (InstantiationException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return false;
+	}
+
+	public boolean isTopLevelDataflow(String workflowIdID) {
+		String sql = "SELECT * FROM Workflow W where W.workflowId = ?";
+		try (Connection c = getConnection();
+				PreparedStatement ps = c.prepareStatement(sql)) {
+			ps.setString(1, workflowIdID);
+			ResultSet rs = ps.executeQuery();
+			if (rs.next())
+				return (rs.getString("parentWorkflowId") == null);
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return false;
+	}
+
+	public boolean isTopLevelDataflow(String workflowId, String workflowRunId) {
+		String sql = "SELECT " + parentProcessorEnactmentId + " AS parent"
+				+ " FROM " + DataflowInvocation + " W " + " WHERE "
+				+ DataflowInvocationTable.workflowId + "=? AND "
+				+ DataflowInvocationTable.workflowRunId + "=?";
+		try (Connection c = getConnection();
+				PreparedStatement ps = c.prepareStatement(sql)) {
+			ps.setString(1, workflowId);
+			ps.setString(2, workflowRunId);
+			ResultSet rs = ps.executeQuery();
+			if (rs.next())
+				return (rs.getString("parent") == null);
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return false;
+	}
+
+	public String getTopDataflow(String workflowRunId) {
+		String sql = "SELECT processorName FROM "
+				+ "Processor P JOIN WorkflowRun I ON P.workflowId = I.workflowId "
+				+ " WHERE I.workflowRunId = ? AND isTopLevel = 1 ";
+		try (Connection c = getConnection();
+				PreparedStatement ps = c.prepareStatement(sql)) {
+			ps.setString(1, workflowRunId);
+			ResultSet rs = ps.executeQuery();
+			if (rs.next())
+				return rs.getString("processorName");
+		} catch (SQLException | InstantiationException | IllegalAccessException
+				| ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	/**
+	 *
+	 * @param p
+	 *            pTo processor
+	 * @param var
+	 *            vTo
+	 * @param value
+	 *            valTo
+	 * @return a set of DDRecord
+	 * @throws SQLException
+	 */
+	public List<DDRecord> queryDD(String p, String var, String value,
+			String iteration, String workflowRun) throws SQLException {
+		Q q = query("SELECT * FROM DD ");
+		q.where("pTo", p);
+		q.where("vTo", var);
+		if (value != null)
+			q.where("valTo", value);
+		if (iteration != null)
+			q.where("iteration", iteration);
+		if (workflowRun != null)
+			q.where("workflowRun", workflowRun);
+
+		try (Connection connection = getConnection();
+				Statement stmt = connection.createStatement();
+				ResultSet rs = q.exec(stmt)) {
+			List<DDRecord> result = new ArrayList<>();
+			while (rs.next()) {
+				DDRecord aDDrecord = new DDRecord();
+				aDDrecord.setPFrom(rs.getString("pFrom"));
+				aDDrecord.setVFrom(rs.getString("vFrom"));
+				aDDrecord.setValFrom(rs.getString("valFrom"));
+				aDDrecord.setPTo(rs.getString("pTo"));
+				aDDrecord.setVTo(rs.getString("vTo"));
+				aDDrecord.setValTo(rs.getString("valTo"));
+				result.add(aDDrecord);
+			}
+			return result;
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	public Set<DDRecord> queryDataLinksForDD(String p, String v, String val,
+			String workflowRun) throws SQLException {
+		String sql = "SELECT DISTINCT A.sourceProcessorName AS p, A.sourcePortName AS var, VB.value AS val "
+				+ "FROM   Datalink A "
+				+ "JOIN   PortBinding VB ON VB.portName = A.destinationPortName AND VB.processorNameRef = A.destinationProcessorName "
+				+ "JOIN   WorkflowRun WF ON WF.workflowId = A.workflowId AND WF.workflowRunId = VB.workflowRunId  "
+				+ "WHERE  WF.workflowRunId = ? AND A.destinationProcessorName = ? AND A.destinationPortName = ? AND VB.value = ?";
+
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, workflowRun);
+			ps.setString(2, p);
+			ps.setString(3, v);
+			ps.setString(4, val);
+			ResultSet rs = ps.executeQuery();
+			Set<DDRecord> result = new HashSet<>();
+			while (rs.next()) {
+				DDRecord aDDrecord = new DDRecord();
+				aDDrecord.setPTo(rs.getString("p"));
+				aDDrecord.setVTo(rs.getString("var"));
+				aDDrecord.setValTo(rs.getString("val"));
+				result.add(aDDrecord);
+			}
+			return result;
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	public Set<DDRecord> queryAllFromValues(String workflowRun)
+			throws SQLException {
+		String sql = "SELECT DISTINCT PFrom, vFrom, valFrom FROM DD where workflowRun = ?";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, workflowRun);
+			ResultSet rs = ps.executeQuery();
+			Set<DDRecord> result = new HashSet<>();
+			while (rs.next()) {
+				DDRecord aDDrecord = new DDRecord();
+				aDDrecord.setPFrom(rs.getString("PFrom"));
+				aDDrecord.setVFrom(rs.getString("vFrom"));
+				aDDrecord.setValFrom(rs.getString("valFrom"));
+				result.add(aDDrecord);
+			}
+			return result;
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query", e);
+			return null;
+		}
+	}
+
+	public boolean isRootProcessorOfWorkflow(String procName, String workflowId,
+			String workflowRunId) {
+		String sql = "SELECT * FROM Datalink A JOIN WorkflowRun I ON A.workflowId = I.workflowId "
+				+ "JOIN Processor P on P.processorName = A.sourceProcessorName "
+				+ "WHERE sourceProcessorName = ? "
+				+ "AND P.workflowId <> A.workflowId "
+				+ "AND I.workflowRunId = ? "
+				+ "AND destinationProcessorName = ? ";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, workflowId);
+			ps.setString(2, workflowRunId);
+			ps.setString(3, procName);
+			if (ps.executeQuery().next())
+				return true;
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return false;
+	}
+
+	/**
+	 * returns a Workflow record from the DB given the workflow internal ID
+	 * @param dataflowID
+	 * @return
+	 */
+	public Workflow getWorkflow(String dataflowID) {
+		String sql = "SELECT * FROM Workflow W WHERE workflowId = ? ";
+		try (Connection connection = getConnection();
+				PreparedStatement ps = connection.prepareStatement(sql)) {
+			ps.setString(1, dataflowID);
+			ResultSet rs = ps.executeQuery();
+			if (rs.next()) {
+				Workflow wf = new Workflow();
+				wf.setWorkflowId(rs.getString("workflowId"));
+				wf.setParentWorkflowId(rs.getString("parentWorkflowId"));
+				wf.setExternalName(rs.getString("externalName"));
+
+				return wf;
+			} else {
+				logger.warn("Could not find workflow " + dataflowID);
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	/**
+	 * @param record
+	 *            a record representing a single value -- possibly within a list
+	 *            hierarchy
+	 * @return the URI for topmost containing collection when the input record
+	 *         is within a list hierarchy, or null otherwise
+	 */
+	public String getContainingCollection(LineageQueryResultRecord record) {
+		if (record.getCollectionT2Reference() == null)
+			return null;
+		String sql = "SELECT * FROM Collection "
+				+ "WHERE collID = ? and workflowRunId = ? and processorNameRef = ? and portName = ?";
+		try (Connection connection = getConnection()) {
+			String parentCollIDRef = null;
+			try (PreparedStatement stmt = connection.prepareStatement(sql)) {
+				stmt.setString(1, record.getCollectionT2Reference());
+				stmt.setString(2, record.getWorkflowRunId());
+				stmt.setString(3, record.getProcessorName());
+				stmt.setString(4, record.getPortName());
+				ResultSet rs = stmt.executeQuery();
+				if (rs.next())
+					parentCollIDRef = rs.getString("parentCollIDRef");
+			}
+
+			// INITIALLY not null -- would be TOP if the initial had no parent
+			while (parentCollIDRef != null) {
+				String oldParentCollIDRef = parentCollIDRef;
+
+				// query Collection again for parent collection
+				try (PreparedStatement stmt = connection.prepareStatement(sql)) {
+					stmt.setString(1, oldParentCollIDRef);
+					stmt.setString(2, record.getWorkflowRunId());
+					stmt.setString(3, record.getProcessorName());
+					stmt.setString(4, record.getPortName());
+					ResultSet rs = stmt.executeQuery();
+					if (rs.next()) {
+						parentCollIDRef = rs.getString("parentCollIDRef");
+						if (parentCollIDRef.equals("TOP"))
+							return oldParentCollIDRef;
+					}
+				} catch (Exception e) {
+					logger.warn("Could not execute query", e);
+				}
+			}
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
+			logger.warn("Could not execute query", e);
+		}
+		return null;
+	}
+
+	public List<ProcessorEnactment> getProcessorEnactments(
+			String workflowRunId, String... processorPath) {
+		return getProcessorEnactments(workflowRunId,
+				(List<ProcessorEnactment>) null, Arrays.asList(processorPath));
+	}
+
+	private List<ProcessorEnactment> getProcessorEnactments(
+			String workflowRunId, List<ProcessorEnactment> parentProcessorEnactments,
+			List<String> processorPath) {
+		List<String> processorEnactmentIds = null;
+		if (parentProcessorEnactments != null) {
+			processorEnactmentIds = new ArrayList<>();
+			for (ProcessorEnactment processorEnactment : parentProcessorEnactments)
+				processorEnactmentIds.add(processorEnactment.getProcessEnactmentId());
+		}
+		if (processorPath.size() > 1) {
+			return getProcessorEnactments(
+					workflowRunId,
+					getProcessorEnactmentsByProcessorName(workflowRunId,
+							processorEnactmentIds, processorPath.get(0)),
+					processorPath.subList(1, processorPath.size()));
+		} else if (processorPath.size() == 1) {
+			return getProcessorEnactmentsByProcessorName(workflowRunId,
+					processorEnactmentIds, processorPath.get(0));
+		} else {
+			return getProcessorEnactmentsByProcessorName(workflowRunId,
+					processorEnactmentIds, null);
+		}
+	}
+
+	public List<ProcessorEnactment> getProcessorEnactmentsByProcessorName(
+			String workflowRunId, List<String> parentProcessorEnactmentIds,
+			String processorName) {
+		StringBuilder query = new StringBuilder();
+		query.append("SELECT ")
+				.append(ProcessorEnactmentTable.enactmentStarted).append(", ")
+				.append(ProcessorEnactmentTable.enactmentEnded).append(", ")
+				.append(ProcessorEnactmentTable.finalOutputsDataBindingId)
+				.append(", ")
+				.append(ProcessorEnactmentTable.initialInputsDataBindingId)
+				.append(", ")
+				.append(ProcessorEnactmentTable.ProcessorEnactment).append(".")
+				.append(ProcessorEnactmentTable.processorId)
+				.append(" AS procId, ")
+				.append(ProcessorEnactmentTable.processIdentifier).append(", ")
+				.append(ProcessorEnactmentTable.processEnactmentId)
+				.append(", ")
+				.append(ProcessorEnactmentTable.parentProcessorEnactmentId)
+				.append(", ").append(ProcessorEnactmentTable.workflowRunId)
+				.append(", ").append(ProcessorEnactmentTable.iteration)
+				.append(", Processor.processorName FROM ")
+				.append(ProcessorEnactmentTable.ProcessorEnactment)
+				.append(" INNER JOIN Processor ON ")
+				.append(ProcessorEnactmentTable.ProcessorEnactment).append(".")
+				.append(ProcessorEnactmentTable.processorId)
+				.append(" = Processor.processorId WHERE ")
+				.append(ProcessorEnactmentTable.workflowRunId).append(" = ? ");
+
+		if (processorName != null)
+			// Specific processor
+			query.append(" AND Processor.processorName = ? ");
+		if ((parentProcessorEnactmentIds == null || parentProcessorEnactmentIds.isEmpty()) && processorName != null) {
+			// null - ie. top level
+			query.append(" AND " + ProcessorEnactmentTable.parentProcessorEnactmentId + " IS NULL");
+		} else if (parentProcessorEnactmentIds != null) {
+			// not null, ie. inside nested workflow
+			query.append(" AND " + ProcessorEnactmentTable.parentProcessorEnactmentId + " IN (");
+			for (int i=0; i<parentProcessorEnactmentIds.size(); i++) {
+				query.append('?');
+				if (i < (parentProcessorEnactmentIds.size()-1))
+					query.append(',');
+			}
+			query.append(')');
+		}
+
+		ArrayList<ProcessorEnactment> procEnacts = new ArrayList<>();
+
+		try (Connection connection = getConnection();
+				PreparedStatement statement = connection.prepareStatement(query
+						.toString())) {
+			int pos = 1;
+			statement.setString(pos++, workflowRunId);
+			if (processorName != null)
+				statement.setString(pos++, processorName);
+			if (parentProcessorEnactmentIds != null)
+				for (String parentId : parentProcessorEnactmentIds)
+					statement.setString(pos++, parentId);
+			ResultSet resultSet = statement.executeQuery();
+			while (resultSet.next()) {
+				ProcessorEnactment procEnact = readProcessorEnactment(resultSet);
+				procEnacts.add(procEnact);
+			}
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query " + query, e);
+		}
+		return procEnacts;
+	}
+
+	private ProcessorEnactment readProcessorEnactment(ResultSet resultSet) throws SQLException {
+		Timestamp enactmentStarted = resultSet.getTimestamp(ProcessorEnactmentTable.enactmentStarted.name());
+		Timestamp enactmentEnded = resultSet.getTimestamp(ProcessorEnactmentTable.enactmentEnded.name());
+		//String pName = resultSet.getString("processorName");
+		String finalOutputsDataBindingId = resultSet.getString(ProcessorEnactmentTable.finalOutputsDataBindingId.name());
+		String initialInputsDataBindingId = resultSet.getString(ProcessorEnactmentTable.initialInputsDataBindingId.name());
+
+		String iteration = resultSet.getString(ProcessorEnactmentTable.iteration.name());
+		String processorId = resultSet.getString("procId");
+		String processIdentifier = resultSet.getString(ProcessorEnactmentTable.processIdentifier.name());
+		String processEnactmentId = resultSet.getString(ProcessorEnactmentTable.processEnactmentId.name());
+		String parentProcessEnactmentId = resultSet.getString(ProcessorEnactmentTable.parentProcessorEnactmentId.name());
+		String workflowRunId = resultSet.getString(ProcessorEnactmentTable.workflowRunId.name());
+
+		ProcessorEnactment procEnact = new ProcessorEnactment();
+		procEnact.setEnactmentEnded(enactmentEnded);
+		procEnact.setEnactmentStarted(enactmentStarted);
+		procEnact.setFinalOutputsDataBindingId(finalOutputsDataBindingId);
+		procEnact.setInitialInputsDataBindingId(initialInputsDataBindingId);
+		procEnact.setIteration(iteration);
+		procEnact.setParentProcessorEnactmentId(parentProcessEnactmentId);
+		procEnact.setProcessEnactmentId(processEnactmentId);
+		procEnact.setProcessIdentifier(processIdentifier);
+		procEnact.setProcessorId(processorId);
+		procEnact.setWorkflowRunId(workflowRunId);
+		return procEnact;
+	}
+
+	public ProcessorEnactment getProcessorEnactment(String processorEnactmentId) {
+		String query  =
+				"SELECT " + ProcessorEnactmentTable.enactmentStarted + ","
+						+ ProcessorEnactmentTable.enactmentEnded + ","
+						+ ProcessorEnactmentTable.finalOutputsDataBindingId + ","
+						+ ProcessorEnactmentTable.initialInputsDataBindingId + ","
+						+ ProcessorEnactmentTable.ProcessorEnactment + "."
+						+ ProcessorEnactmentTable.processorId + " AS procId,"
+						+ ProcessorEnactmentTable.processIdentifier + ","
+						+ ProcessorEnactmentTable.workflowRunId + ","
+						+ ProcessorEnactmentTable.processEnactmentId + ","
+						+ ProcessorEnactmentTable.parentProcessorEnactmentId + ","
+						+ ProcessorEnactmentTable.iteration
+						+ " FROM "
+						+ ProcessorEnactmentTable.ProcessorEnactment
+						+ " WHERE "
+						+ ProcessorEnactmentTable.processEnactmentId + "=?";
+
+		ProcessorEnactment procEnact = null;
+		try (Connection connection = getConnection();
+				PreparedStatement statement = connection
+						.prepareStatement(query)) {
+			statement.setString(1, processorEnactmentId);
+			ResultSet resultSet = statement.executeQuery();
+			if (!resultSet.next()) {
+				logger.warn("Could not find ProcessorEnactment processEnactmentId="
+						+ processorEnactmentId);
+				return null;
+			}
+			procEnact = readProcessorEnactment(resultSet);
+			if (resultSet.next()) {
+				logger.error("Found more than one ProcessorEnactment processEnactmentId="
+						+ processorEnactmentId);
+				return null;
+			}
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query " + query, e);
+		}
+		return procEnact;
+	}
+
+	public ProcessorEnactment getProcessorEnactmentByProcessId(
+			String workflowRunId, String processIdentifier, String iteration) {
+		String query = "SELECT " + ProcessorEnactmentTable.enactmentStarted
+				+ "," + ProcessorEnactmentTable.enactmentEnded + ","
+				+ ProcessorEnactmentTable.finalOutputsDataBindingId + ","
+				+ ProcessorEnactmentTable.initialInputsDataBindingId + ","
+				+ ProcessorEnactmentTable.ProcessorEnactment + "."
+				+ ProcessorEnactmentTable.processorId + " AS procId,"
+				+ ProcessorEnactmentTable.processIdentifier + ","
+				+ ProcessorEnactmentTable.workflowRunId + ","
+				+ ProcessorEnactmentTable.processEnactmentId + ","
+				+ ProcessorEnactmentTable.parentProcessorEnactmentId + ","
+				+ ProcessorEnactmentTable.iteration + " FROM "
+				+ ProcessorEnactmentTable.ProcessorEnactment + " WHERE "
+				+ ProcessorEnactmentTable.workflowRunId + "=?" + " AND "
+				+ ProcessorEnactmentTable.processIdentifier + "=?" + " AND "
+				+ ProcessorEnactmentTable.iteration + "=?";
+
+		ProcessorEnactment procEnact = null;
+		try (Connection connection = getConnection();
+				PreparedStatement statement = connection
+						.prepareStatement(query)) {
+			statement.setString(1, workflowRunId);
+			statement.setString(2, processIdentifier);
+			statement.setString(3, iteration);
+
+			ResultSet resultSet = statement.executeQuery();
+			String debugString = "ProcessorEnactment runId=" + workflowRunId
+					+ " processIdentifier=" + processIdentifier + " iteration="
+					+ iteration;
+			if (!resultSet.next()) {
+				logger.warn("Could not find " + debugString);
+				return null;
+			}
+			procEnact = readProcessorEnactment(resultSet);
+			if (resultSet.next()) {
+				logger.error("Found more than one " + debugString);
+				return null;
+			}
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query " + query, e);
+		}
+		return procEnact;
+	}
+
+	public Map<Port, String> getDataBindings(String dataBindingId) {
+		HashMap<Port, String> dataBindings = new HashMap<>();
+		String query = "SELECT " + DataBindingTable.t2Reference + ","
+				+ "Port.portId AS portId," + "Port.processorName,"
+				+ "Port.processorId," + "Port.isInputPort," + "Port.portName,"
+				+ "Port.depth," + "Port.resolvedDepth," + "Port.workflowId"
+				+ " FROM " + DataBindingTable.DataBinding + " INNER JOIN "
+				+ "Port" + " ON " + " Port.portId="
+				+ DataBindingTable.DataBinding + "." + DataBindingTable.portId
+				+ " WHERE " + DataBindingTable.dataBindingId + "=?";
+		try (Connection connection = getConnection();
+				PreparedStatement statement = connection
+						.prepareStatement(query)) {
+			statement.setString(1, dataBindingId);
+			ResultSet rs = statement.executeQuery();
+			while (rs.next()) {
+				String t2Ref = rs.getString(DataBindingTable.t2Reference.name());
+
+				Port port = new Port();
+				port.setWorkflowId(rs.getString("workflowId"));
+				port.setInputPort(rs.getBoolean("isInputPort"));
+				port.setIdentifier(rs.getString("portId"));
+				port.setProcessorName(rs.getString("processorName"));
+				port.setProcessorId(rs.getString("processorId"));
+				port.setPortName(rs.getString("portName"));
+				port.setDepth(rs.getInt("depth"));
+				if (rs.getString("resolvedDepth") != null)
+					port.setResolvedDepth(rs.getInt("resolvedDepth"));
+				dataBindings.put(port, t2Ref);
+			}
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query " + query, e);
+		}
+		return dataBindings;
+	}
+
+	public List<Port> getAllPortsInDataflow(String workflowID) {
+		Map<String, String> queryConstraints = new HashMap<>();
+		queryConstraints.put("V.workflowId", workflowID);
+		try {
+			return getPorts(queryConstraints);
+		} catch (SQLException e) {
+			logger.error("Problem getting ports for dataflow: " + workflowID, e);
+			return null;
+		}
+	}
+
+	public List<Port> getPortsForDataflow(String workflowID) {
+		Workflow w = getWorkflow(workflowID);
+
+		Map<String, String> queryConstraints = new HashMap<>();
+		queryConstraints.put("V.workflowId", workflowID);
+		queryConstraints.put("processorName", w.getExternalName());
+
+		try {
+			return getPorts(queryConstraints);
+		} catch (SQLException e) {
+			logger.error("Problem getting ports for dataflow: " + workflowID, e);
+			return null;
+		}
+	}
+
+	public List<Port> getPortsForProcessor(String workflowID,
+			String processorName) {
+		Map<String, String> queryConstraints = new HashMap<>();
+		queryConstraints.put("V.workflowId", workflowID);
+		queryConstraints.put("processorName", processorName);
+
+		try {
+			return getPorts(queryConstraints);
+		} catch (SQLException e) {
+			logger.error("Problem getting ports for processor: "
+					+ processorName + " worflow: " + workflowID, e);
+			return null;
+		}
+	}
+
+	public DataflowInvocation getDataflowInvocation(String workflowRunId) {
+		String query = "SELECT " +
+				  DataflowInvocationTable.dataflowInvocationId + ","
+				+ DataflowInvocationTable.inputsDataBinding + ","
+				+ DataflowInvocationTable.invocationEnded + ","
+				+ DataflowInvocationTable.invocationStarted + ","
+				+ DataflowInvocationTable.outputsDataBinding + ","
+				+ DataflowInvocationTable.parentProcessorEnactmentId + ","
+				+ DataflowInvocationTable.workflowId + ","
+				+ DataflowInvocationTable.workflowRunId + ","
+				+ DataflowInvocationTable.completed
+				+ " FROM "
+				+ DataflowInvocationTable.DataflowInvocation +
+				" WHERE "
+				+ DataflowInvocationTable.parentProcessorEnactmentId + " IS NULL AND "
+				+ DataflowInvocationTable.workflowRunId + "=?";
+		DataflowInvocation dataflowInvocation = null;
+		try (Connection connection = getConnection();
+				PreparedStatement statement = connection
+						.prepareStatement(query)) {
+			statement.setString(1, workflowRunId);
+			ResultSet rs = statement.executeQuery();
+			if (!rs.next()) {
+				logger.warn("Could not find DataflowInvocation for workflowRunId=" + workflowRunId);
+				return null;
+			}
+			dataflowInvocation = new DataflowInvocation();
+			dataflowInvocation.setDataflowInvocationId(rs.getString(DataflowInvocationTable.dataflowInvocationId.name()));
+			dataflowInvocation.setInputsDataBindingId(rs.getString(DataflowInvocationTable.inputsDataBinding.name()));
+			dataflowInvocation.setInvocationEnded(rs.getTimestamp(DataflowInvocationTable.invocationEnded.name()));
+			dataflowInvocation.setInvocationStarted(rs.getTimestamp(DataflowInvocationTable.invocationStarted.name()));
+			dataflowInvocation.setOutputsDataBindingId(rs.getString(DataflowInvocationTable.outputsDataBinding.name()));
+			dataflowInvocation.setParentProcessorEnactmentId(rs.getString(DataflowInvocationTable.parentProcessorEnactmentId.name()));
+			dataflowInvocation.setWorkflowId(rs.getString(DataflowInvocationTable.workflowId.name()));
+			dataflowInvocation.setWorkflowRunId(rs.getString(DataflowInvocationTable.workflowRunId.name()));
+			dataflowInvocation.setCompleted(rs.getBoolean(DataflowInvocationTable.completed.name()));
+			if (rs.next()) {
+				logger.error("Found more than one DataflowInvocation for workflowRunId=" + workflowRunId);
+				return null;
+			}
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query " + query, e);
+		}
+		return dataflowInvocation;
+	}
+
+	public DataflowInvocation getDataflowInvocation(
+			ProcessorEnactment processorEnactment) {
+		String query = "SELECT " + DataflowInvocationTable.dataflowInvocationId
+				+ "," + DataflowInvocationTable.inputsDataBinding + ","
+				+ DataflowInvocationTable.invocationEnded + ","
+				+ DataflowInvocationTable.invocationStarted + ","
+				+ DataflowInvocationTable.outputsDataBinding + ","
+				+ DataflowInvocationTable.parentProcessorEnactmentId + ","
+				+ DataflowInvocationTable.workflowId + ","
+				+ DataflowInvocationTable.workflowRunId + ","
+				+ DataflowInvocationTable.completed + " FROM "
+				+ DataflowInvocationTable.DataflowInvocation + " WHERE "
+				+ DataflowInvocationTable.parentProcessorEnactmentId + "=?";
+		DataflowInvocation dataflowInvocation = null;
+		try (Connection connection = getConnection();
+				PreparedStatement statement = connection
+						.prepareStatement(query)) {
+			statement.setString(1, processorEnactment.getProcessEnactmentId());
+			ResultSet rs = statement.executeQuery();
+			if (!rs.next()) {
+				logger.warn("Could not find DataflowInvocation for processorEnactmentId="
+						+ processorEnactment.getProcessEnactmentId());
+				return null;
+			}
+			dataflowInvocation = new DataflowInvocation();
+			dataflowInvocation.setDataflowInvocationId(rs
+					.getString(DataflowInvocationTable.dataflowInvocationId
+							.name()));
+			dataflowInvocation
+					.setInputsDataBindingId(rs
+							.getString(DataflowInvocationTable.inputsDataBinding
+									.name()));
+			dataflowInvocation.setInvocationEnded(rs
+					.getTimestamp(DataflowInvocationTable.invocationEnded
+							.name()));
+			dataflowInvocation.setInvocationStarted(rs
+					.getTimestamp(DataflowInvocationTable.invocationStarted
+							.name()));
+			dataflowInvocation.setOutputsDataBindingId(rs
+					.getString(DataflowInvocationTable.outputsDataBinding
+							.name()));
+			dataflowInvocation
+					.setParentProcessorEnactmentId(rs
+							.getString(DataflowInvocationTable.parentProcessorEnactmentId
+									.name()));
+			dataflowInvocation.setWorkflowId(rs
+					.getString(DataflowInvocationTable.workflowId.name()));
+			dataflowInvocation.setWorkflowRunId(rs
+					.getString(DataflowInvocationTable.workflowRunId.name()));
+			dataflowInvocation.setCompleted(rs
+					.getBoolean(DataflowInvocationTable.completed.name()));
+
+			if (rs.next()) {
+				logger.error("Found more than one DataflowInvocation for processorEnactmentId="
+						+ processorEnactment.getProcessEnactmentId());
+				return null;
+			}
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query " + query, e);
+		}
+		return dataflowInvocation;
+	}
+
+	public List<DataflowInvocation> getDataflowInvocations(String workflowRunId) {
+		String query = "SELECT " + DataflowInvocationTable.dataflowInvocationId
+				+ "," + DataflowInvocationTable.inputsDataBinding + ","
+				+ DataflowInvocationTable.invocationEnded + ","
+				+ DataflowInvocationTable.invocationStarted + ","
+				+ DataflowInvocationTable.outputsDataBinding + ","
+				+ DataflowInvocationTable.parentProcessorEnactmentId + ","
+				+ DataflowInvocationTable.workflowId + ","
+				+ DataflowInvocationTable.workflowRunId + ","
+				+ DataflowInvocationTable.completed + " FROM "
+				+ DataflowInvocationTable.DataflowInvocation + " WHERE "
+				+ DataflowInvocationTable.workflowRunId + "=?";
+		List<DataflowInvocation> invocations = new ArrayList<>();
+		try (Connection connection = getConnection();
+				PreparedStatement statement = connection
+						.prepareStatement(query)) {
+			statement.setString(1, workflowRunId);
+			ResultSet rs = statement.executeQuery();
+			if (! rs.next()) {
+				logger.warn("Could not find DataflowInvocation for workflowRunId=" + workflowRunId);
+				return null;
+			}
+			DataflowInvocation dataflowInvocation = new DataflowInvocation();
+			dataflowInvocation.setDataflowInvocationId(rs.getString(DataflowInvocationTable.dataflowInvocationId.name()));
+			dataflowInvocation.setInputsDataBindingId(rs.getString(DataflowInvocationTable.inputsDataBinding.name()));
+			dataflowInvocation.setInvocationEnded(rs.getTimestamp(DataflowInvocationTable.invocationEnded.name()));
+			dataflowInvocation.setInvocationStarted(rs.getTimestamp(DataflowInvocationTable.invocationStarted.name()));
+			dataflowInvocation.setOutputsDataBindingId(rs.getString(DataflowInvocationTable.outputsDataBinding.name()));
+			dataflowInvocation.setParentProcessorEnactmentId(rs.getString(DataflowInvocationTable.parentProcessorEnactmentId.name()));
+			dataflowInvocation.setWorkflowId(rs.getString(DataflowInvocationTable.workflowId.name()));
+			dataflowInvocation.setWorkflowRunId(rs.getString(DataflowInvocationTable.workflowRunId.name()));
+			dataflowInvocation.setCompleted(rs.getBoolean(DataflowInvocationTable.completed.name()));
+			invocations.add(dataflowInvocation);
+		} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			logger.warn("Could not execute query " + query, e);
+		}
+		return invocations;
+	}
+
+	public List<Collection> getCollectionsForRun(String wfInstanceID) {
+		ArrayList<Collection> result = new ArrayList<>();
+		String sql = "SELECT * FROM Collection C WHERE workflowRunId = ?";
+		try (Connection c = getConnection();
+				PreparedStatement ps = c.prepareStatement(sql)) {
+			ps.setString(1, wfInstanceID);
+			ResultSet rs = ps.executeQuery();
+			while (rs.next()) {
+				Collection coll = new Collection();
+				coll.setCollId(rs.getString(CollectionTable.collID.name()));
+				coll.setParentIdentifier(rs
+						.getString(CollectionTable.parentCollIDRef.name()));
+				coll.setWorkflowRunIdentifier(rs
+						.getString(CollectionTable.workflowRunId.name()));
+				coll.setProcessorName(rs
+						.getString(CollectionTable.processorNameRef.name()));
+				coll.setPortName(rs.getString(CollectionTable.portName.name()));
+				coll.setIteration(rs.getString(CollectionTable.iteration.name()));
+				result.add(coll);
+			}
+		} catch (Exception e) {
+			logger.warn("Could not execute query", e);
+		}
+		return result;
+	}
+}