You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/30 15:47:18 UTC

[06/12] incubator-taverna-engine git commit: some provenance refactoring

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java
new file mode 100644
index 0000000..071e9c8
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java
@@ -0,0 +1,66 @@
+/**
+ * 
+ */
+package org.apache.taverna.provenance.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.taverna.provenance.lineageservice.Dependencies;
+import org.apache.taverna.provenance.lineageservice.utils.QueryPort;
+import org.apache.taverna.provenance.lineageservice.LineageQueryResultRecord;
+
+/**
+ * Java bean used to encapsulate the results of a provenance query. <br/>
+ * This takes the form of a nested map, see {@link #getAnswer} for details on
+ * its structure.
+ * 
+ * @author Paolo Missier
+ */
+public class NativeAnswer {
+	Map<QueryPort, Map<String, List<Dependencies>>> answer;
+
+	/**
+	 * @return a Map of the form: {@link QueryPort} --> ( &lt;path> --> [
+	 *         {@link Dependencies} ]) where
+	 *         <ul>
+	 *         <li> {@link QueryVar} denotes one of the ports in the &lt;select>
+	 *         element of the input query, for example:
+	 *         <em>converter:atlasSlice</em>
+	 *         <li>&lt;path&gt; is the index within the value associated to the
+	 *         port, for instance [1,2,3] or []. The inner Map structure
+	 *         accounts for multiple paths, so for example if the query asked
+	 *         for the provenance of elements [1,2] and [2,3] of the value bound
+	 *         to <em>converter:atlasSlice</em>, then the inner Map structure
+	 *         contains two entries, one for each of the two paths.
+	 *         <li>for each such path, the corresponding [{@link Dependencies}]
+	 *         is a list of {@link Dependencies}, each associated with one
+	 *         <it>target processor and port</it> mentioned in the input query.
+	 *         For example, for path [1,2] of value bound to
+	 *         <em>converter:atlasSlice</em>, you may see the following list of
+	 *         Dependencies:
+	 *         <ul>
+	 *         <li>converter:atlasSlice:[2]
+	 *         <li>slicer:atlasAverage[2]
+	 *         <li>slicer:atlasAverage[0]
+	 *         </ul>
+	 *         etc. <br/>
+	 *         Each of these elements is described by a Java bean,
+	 *         {@link LineageQueryResultRecord}, which represents a single
+	 *         provenance data record. This means that the particular value
+	 *         depends on each of these other values that are mentioned in the
+	 *         Dependencies list.
+	 */
+	public Map<QueryPort, Map<String, List<Dependencies>>> getAnswer() {
+		return answer;
+	}
+
+	/**
+	 * @param sets
+	 *            the query answer, in the format described in
+	 *            {@link #getAnswer()}
+	 */
+	public void setAnswer(Map<QueryPort, Map<String, List<Dependencies>>> answer) {
+		this.answer = answer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java
new file mode 100644
index 0000000..2445ca0
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java
@@ -0,0 +1,725 @@
+/*******************************************************************************
+ * Copyright (C) 2009 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ *
+ ******************************************************************************/
+package org.apache.taverna.provenance.api;
+
+import static java.sql.Connection.TRANSACTION_READ_UNCOMMITTED;
+import static javax.naming.Context.INITIAL_CONTEXT_FACTORY;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.taverna.invocation.InvocationContext;
+import org.apache.taverna.invocation.impl.InvocationContextImpl;
+import org.apache.taverna.provenance.Provenance;
+import org.apache.taverna.provenance.ProvenanceConnectorFactory;
+import org.apache.taverna.provenance.connector.AbstractProvenanceConnector;
+import org.apache.taverna.provenance.lineageservice.Dependencies;
+import org.apache.taverna.provenance.lineageservice.LineageQueryResultRecord;
+import org.apache.taverna.provenance.lineageservice.ProvenanceAnalysis;
+import org.apache.taverna.provenance.lineageservice.ProvenanceQuery;
+import org.apache.taverna.provenance.lineageservice.ProvenanceWriter;
+import org.apache.taverna.provenance.lineageservice.utils.Collection;
+import org.apache.taverna.provenance.lineageservice.utils.DataLink;
+import org.apache.taverna.provenance.lineageservice.utils.DataflowInvocation;
+import org.apache.taverna.provenance.lineageservice.utils.PortBinding;
+import org.apache.taverna.provenance.lineageservice.utils.ProcessorEnactment;
+import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor;
+import org.apache.taverna.provenance.lineageservice.utils.Port;
+import org.apache.taverna.provenance.lineageservice.utils.Workflow;
+import org.apache.taverna.provenance.lineageservice.utils.WorkflowRun;
+import org.apache.taverna.provenance.lineageservice.utils.WorkflowTree;
+import org.apache.taverna.reference.ReferenceService;
+import org.apache.taverna.reference.T2Reference;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.log4j.Logger;
+
+/**
+ * This API is the single access point into the Taverna provenance database. Its
+ * main functionality is to let clients query the content of the DB, either
+ * using dedicated methods that retrieve specific entity values from the DB, or
+ * through a more general XML-based query language. Examples of XML provenance
+ * queries can be found in the external package
+ * {@link net.sf.taverna.t2.provenance.apic.client.resources}. Class
+ * {@link net.sf.taverna.t2.provenance.api.client.ProvenanceAPISampleClient}
+ * provides an example of API client that third parties would use to interact
+ * with this API.
+ * <p/>
+ * The XML schema for the XML query language is {@code pquery.xsd} in
+ * {@link net.sf.taverna.t2.provenance.apic.client.resources}
+ * 
+ * @author Paolo Missier
+ * @author Stuart Owen
+ */
+public class ProvenanceAccess implements Provenance {
+	private static Logger logger = Logger.getLogger(ProvenanceAccess.class);
+
+	private AbstractProvenanceConnector connector = null;
+	private ProvenanceAnalysis analyser = null;
+	private ProvenanceQuery querier;
+	private ProvenanceWriter writer;
+	
+	private String connectorType;
+	private final List<ProvenanceConnectorFactory> provenanceConnectorFactories;
+
+	public ProvenanceAccess(String connectorType,
+			List<ProvenanceConnectorFactory> provenanceConnectorFactories) {
+		this.connectorType = connectorType;
+		this.provenanceConnectorFactories = provenanceConnectorFactories;
+		init();
+	}
+
+	public ProvenanceAccess(String connectorType, InvocationContext context,
+			List<ProvenanceConnectorFactory> provenanceConnectorFactories) {
+		this.connectorType = connectorType;
+		this.provenanceConnectorFactories = provenanceConnectorFactories;
+		init(context);
+	}
+
+	/**
+	 * The recommended data source intitialisation method, where only a driver
+	 * name and jdbc url are required.<p/>
+	 * If the driver supports multiple connections, then a pool will be created
+	 * of 10 min idle, 50 max idle, and 50 max active connections.
+	 * 
+	 * @param driverClassName
+	 * @param jdbcUrl
+	 */
+	public static void initDataSource(String driverClassName, String jdbcUrl) {
+		initDataSource(driverClassName, jdbcUrl, null, null, 10, 50, 50);
+	}
+
+	/**
+	 * Initialises a named JNDI DataSource if not already set up externally. The
+	 * DataSource is named jdbc/taverna
+	 * 
+	 * @param driverClassName
+	 *            - the classname for the driver to be used.
+	 * @param jdbcUrl
+	 *            - the jdbc connection url
+	 * @param username
+	 *            - the username, if required (otherwise null)
+	 * @param password
+	 *            - the password, if required (oteherwise null)
+	 * @param minIdle
+	 *            - if the driver supports multiple connections, then the
+	 *            minumum number of idle connections in the pool
+	 * @param maxIdle
+	 *            - if the driver supports multiple connections, then the
+	 *            maximum number of idle connections in the pool
+	 * @param maxActive
+	 *            - if the driver supports multiple connections, then the
+	 *            minumum number of connections in the pool
+	 */
+	public static void initDataSource(String driverClassName, String jdbcUrl,
+			String username, String password, int minIdle, int maxIdle,
+			int maxActive) {
+		System.setProperty(INITIAL_CONTEXT_FACTORY,
+				"org.osjava.sj.memory.MemoryContextFactory");
+		System.setProperty("org.osjava.sj.jndi.shared", "true");
+
+		BasicDataSource ds = new BasicDataSource();
+		ds.setDriverClassName(driverClassName);
+		ds.setDefaultTransactionIsolation(TRANSACTION_READ_UNCOMMITTED);
+		ds.setMaxActive(maxActive);
+		ds.setMinIdle(minIdle);
+		ds.setMaxIdle(maxIdle);
+		ds.setDefaultAutoCommit(true);
+		if (username != null)
+			ds.setUsername(username);
+		if (password != null)
+			ds.setPassword(password);
+		ds.setUrl(jdbcUrl);
+
+		try {
+			new InitialContext().rebind("jdbc/taverna", ds);
+		} catch (NamingException ex) {
+			logger.error("Problem rebinding the jdbc context", ex);
+		}
+	}
+
+	/**
+	 * Initialises a default Reference Service for storing data and their associated references.
+	 * This creates a reference service using the named JNDI Data Source 'jdbc/taverna'.<br/>
+	 * the new Reference Service is associated to the {@link AbstractProvenanceConnector}, enabling data references to be resolved
+	 */
+	@Override
+	public InvocationContext initDefaultReferenceService() {
+		// FIXME
+		return initReferenceService("hibernateReferenceServiceContext.xml");
+	}
+
+	/**
+	 * Initialises the Reference Service for a given hibernate context definition.
+	 * This mapping file must be available in the root of the classpath.
+	 * @see #initDefaultReferenceService()
+	 * @param hibernateContext
+	 */
+	@Override
+	public InvocationContext initReferenceService(String hibernateContext) {
+		// FIXME
+		return new InvocationContextImpl(refService, connector);
+	}
+
+	private ReferenceService refService;
+	/**
+	 * Set the Reference Service for the connector of this ProvenanceAccess
+	 * if you do not 'like' the default one created when ProvenanceAccess is created.
+	 */
+	public void setReferenceService(ReferenceService refService) {
+		this.refService = refService;
+		if (connector != null)
+			connector.setReferenceService(refService);
+	}
+
+	@Override
+	public void init() {
+		init(initDefaultReferenceService());
+	}
+
+	@Override
+	public void init(InvocationContext context) {
+		for (ProvenanceConnectorFactory factory : provenanceConnectorFactories)
+			if (connectorType.equalsIgnoreCase(factory.getConnectorType()))
+				connector = factory.getProvenanceConnector();
+		logger.info("Provenance being captured using: " + connector);
+
+		//slight change, the init is outside but it also means that the init call has to ensure that the dbURL
+		//is set correctly
+		connector.init();
+
+		connector.setReferenceService(context.getReferenceService()); // CHECK context.getReferenceService());
+		connector.setInvocationContext(context);
+
+		analyser = connector.getProvenanceAnalysis();
+		analyser.setInvocationContext(context);
+
+		querier = connector.getQuery();
+		writer = connector.getWriter();
+		writer.setQuery(querier);
+
+		logger.info("using writer of type: " + writer.getClass());
+	}
+
+	/*
+	 * main provenance query methods
+	 */
+
+	/**
+	 * Executes a provenance query. Please see separate doc. for the XML query language schema.
+	 * @throws SQLException
+	 */
+	@Override
+	public QueryAnswer executeQuery(Query pq) throws SQLException {
+		return analyser.lineageQuery(pq.getTargetPorts(), pq.getRunIDList().get(0),
+				pq.getSelectedProcessors());
+	}
+
+	/**
+	 * Returns individal records from the provenance DB in response to a query
+	 * that specifies specific elements within values associated with a
+	 * processor port, in the context of a specific run of a workflow. <br/>
+	 * This is used in the workbench to retrieve the "intermediate results" at
+	 * various points during workflow execution, as opposed to a set of
+	 * dependencies in response to a full-fledged provenance query.
+	 * 
+	 * @param workflowRunId
+	 *            lineage scope -- a specific instance
+	 * @param processorName
+	 *            for a specific processor [required]
+	 * @param a
+	 *            specific (input or output) variable [optional]
+	 * @param iteration
+	 *            and a specific iteration [optional]
+	 * @return a list of @ LineageQueryResultRecord} , encapsulated in a
+	 *         {@link Dependencies} object
+	 * @throws SQLException
+	 */
+	@Override
+	public Dependencies fetchPortData(String workflowRunId, String workflowId,
+			String processorName, String portName, String iteration) {
+		logger.info("running fetchPortData on instance " + workflowRunId
+				+ " workflow " + workflowId + " processor " + processorName
+				+ " port " + portName + " iteration " + iteration);
+		// TODO add context workflowID to query
+		try {
+			return analyser.fetchIntermediateResult(workflowRunId, workflowId,
+					processorName, portName, iteration);
+		} catch (SQLException e) {
+			logger.error("Problem with fetching intermediate results", e);
+			return null;
+		}
+	}
+
+	/**
+	 * @param record a record representing a single value -- possibly within a list hierarchy
+	 * @return the URI for topmost containing collection when the input record is within a list hierarchy, or null otherwise
+	 */
+	@Override
+	public String getContainingCollection(LineageQueryResultRecord record)  {
+		return querier.getContainingCollection(record);
+	}
+
+	/*
+	 * manage instances
+	 */
+
+	/**
+	 * @param workflowId
+	 *            defines the scope of the query - if null then the query runs
+	 *            on all available workflows
+	 * @param conditions
+	 *            additional conditions to be defined. This is a placeholder as
+	 *            conditions are currently ignored
+	 * @return a list of workflowRunId, each representing one run of the input
+	 *         workflowID
+	 */
+	@Override
+	public List<WorkflowRun> listRuns(String workflowId,
+			Map<String, String> conditions) {
+		try {
+			return querier.getRuns(workflowId, conditions);
+		} catch (SQLException e) {
+			logger.error("Problem with listing runs", e);
+			return null;
+		}
+	}
+
+	@Override
+	public boolean isTopLevelDataflow(String workflowId) {
+		return querier.isTopLevelDataflow(workflowId);
+	}
+
+	@Override
+	public boolean isTopLevelDataflow(String workflowId, String workflowRunId) {
+		return querier.isTopLevelDataflow(workflowId, workflowRunId);
+	}
+
+	@Override
+	public String getLatestRunID() throws SQLException {
+		return querier.getLatestRunID();
+	}
+
+	/**
+	 * Removes all records that pertain to a specific run (but not the static
+	 * specification of the workflow run)
+	 * 
+	 * @param runID
+	 *            the internal ID of a run. This can be obtained using
+	 *            {@link #listRuns(String, Map)}
+	 * @return the set of data references that pertain to the deleted run. This
+	 *         can be used by the Data Manager to ensure that no dangling
+	 *         references are left in the main Taverna data repositorry
+	 */
+	@Override
+	public Set<String> removeRun(String runID) {
+		// implement using clearDynamic() method or a variation. Collect references and forward
+		try {
+			Set<String> danglingDataRefs = writer.clearDBDynamic(runID);
+
+			if (logger.isDebugEnabled())
+				logger.debug("references collected during removeRun: " + danglingDataRefs);
+
+			// TODO send the list of dangling refs to the Data manager for removal of the corresponding data values
+			return danglingDataRefs;
+		} catch (SQLException e) {
+			logger.error("Problem while removing run : " + runID, e);
+			return null;
+		}
+	}
+
+	/**
+	 * removes all records pertaining to the static structure of a workflow.
+	 * 
+	 * @param workflowId
+	 *            the ID (not the external name) of the workflow whose static
+	 *            structure is to be deleted from the DB
+	 */
+	@Override
+	public void removeWorkflow(String workflowId) {
+		try {
+			writer.clearDBStatic(workflowId);
+		} catch (SQLException e) {
+			logger.error("Problem with removing static workflow: " + workflowId, e);
+		}
+	}
+
+	/**
+	 * returns a set of workflowIDs for a given runID. The set is a singleton if
+	 * the workflow has no nesting, but in general the list contains one
+	 * workflowID for each nested workflow involved in the run
+	 * 
+	 * @param runID
+	 *            the internal ID for a specific workflow run
+	 * @return a list of workflow IDs, one for each nested workflow involved in
+	 *         the input run
+	 */
+	@Override
+	public List<String> getWorkflowID(String runID) {
+		try {
+			return querier.getWorkflowIdsForRun(runID);
+		} catch (SQLException e) {
+			logger.error("Problem getting workflow ID: " + runID, e);
+			return null;
+		}
+	}
+
+	/**
+	 * @param runID
+	 *            the internal ID for a specific workflow run
+	 * @return the ID of the top-level workflow that executed during the input
+	 *         run
+	 */
+	@Override
+	public String getTopLevelWorkflowID(String runID) {
+		try {
+			return querier.getTopLevelWorkflowIdForRun(runID);
+		} catch (SQLException e) {
+			logger.error("Problem getting top level workflow: " + runID, e);
+			return null;
+		}
+	}
+
+	@Override
+	public List<Workflow> getWorkflowsForRun(String runID) {
+		try {
+			return querier.getWorkflowsForRun(runID);
+		} catch (SQLException e) {
+			logger.error("Problem getting workflows for run:" + runID, e);
+			return null;
+		}
+	}
+
+	/**
+	 * @return a list of {@link WorkflowRun} beans, each representing the
+	 *         complete description of a workflow run (note that this is not
+	 *         just the ID of the run)
+	 */
+	@Override
+	public List<WorkflowRun> getAllWorkflowIDs() {
+		try {
+			return querier.getRuns(null, null);
+		} catch (SQLException e) {
+			logger.error("Problem getting all workflow IDs", e);
+			return null;
+		}
+	}
+
+//	/ access static workflow structure
+
+	/**
+	 * @param workflowID
+	 * @return a Map: workflowID -> [ @ link ProvenanceProcessor} ] Each entry
+	 *         in the list pertains to one composing sub-workflow (if no nesting
+	 *         then this contains only one workflow, namely the top level one)
+	 */
+	@Override
+	public Map<String, List<ProvenanceProcessor>> getProcessorsInWorkflow(
+			String workflowID) {
+		return querier.getProcessorsDeep(null, workflowID);
+	}
+
+	@Override
+	public List<Collection> getCollectionsForRun(String wfInstanceID) {
+		return querier.getCollectionsForRun(wfInstanceID);
+	}
+
+	@Override
+	public List<PortBinding> getPortBindings(Map<String, String> constraints)
+			throws SQLException {
+		return querier.getPortBindings(constraints);
+	}
+
+	/**
+	 * lists all ports for a workflow
+	 * 
+	 * @param workflowID
+	 * @return a list of {@link Port} beans, each representing an input or
+	 *         output port for the workflow
+	 */
+	@Override
+	public List<Port> getPortsForDataflow(String workflowID) {
+		return querier.getPortsForDataflow(workflowID);
+	}
+
+	/**
+	 * lists all ports for a workflow
+	 * 
+	 * @param workflowID
+	 * @return a list of {@link Port} beans, each representing an input or
+	 *         output port for the workflow or a processor in the workflow
+	 */
+	@Override
+	public List<Port> getAllPortsInDataflow(String workflowID) {
+		return querier.getAllPortsInDataflow(workflowID);
+	}
+
+	/**
+	 * list all ports for a specific processor within a workflow
+	 * 
+	 * @param workflowID
+	 * @param processorName
+	 * @return a list of {@link Port} beans, each representing an input or
+	 *         output port for the input processor
+	 */
+	@Override
+	public List<Port> getPortsForProcessor(String workflowID,
+			String processorName) {
+		return querier.getPortsForProcessor(workflowID, processorName);
+	}
+
+	// PM added 5/2010
+	@Override
+	public String getWorkflowNameByWorkflowID(String workflowID) {
+		return querier.getWorkflow(workflowID).getExternalName();
+	}
+
+	@Override
+	public WorkflowTree getWorkflowNestingStructure(String workflowID)
+			throws SQLException {
+		return querier.getWorkflowNestingStructure(workflowID);
+	}
+
+//	public List<ProvenanceProcessor> getSuccessors(String workflowID, String processorName, String portName) {
+//	return null; // TODO
+//	}
+
+//	public List<String>   getActivities(String workflowID, String processorName) {
+//	return null; // TODO
+//	}
+
+//	/ configure provenance query functionality
+
+	/**
+	 * include valus of output ports in the query result? input port values are
+	 * always included<br>
+	 * default is FALSE
+	 */
+	@Override
+	public void toggleIncludeProcessorOutputs(boolean active) {
+		analyser.setReturnOutputs(active);
+	}
+
+	@Override
+	public boolean isIncludeProcessorOutputs() {
+		return analyser.isReturnOutputs();
+	}
+
+	/**
+	 * @return an instance of {@link InvocationContext} that can be used by a
+	 *         client to deref a Taverna data reference
+	 */
+	@Override
+	public InvocationContext getInvocationContext() {
+		return getProvenanceConnector().getInvocationContext();
+	}
+
+//	/ OPM management
+
+	/**
+	 * should an OPM graph be generated in response to a query?<br>
+	 * default is TRUE
+	 */
+	@Override
+	public void toggleOPMGeneration(boolean active) {
+		analyser.setGenerateOPMGraph(active);
+	}
+
+	/**
+	 * 
+	 * @return true if OPM is set to be generated in response to a query
+	 */
+	@Override
+	public boolean isOPMGenerationActive() {
+		return analyser.isGenerateOPMGraph();
+	}
+
+	/**
+	 * should actual artifact values be attached to OPM artifact nodes?<br>
+	 * default is FALSE<br/>
+	 * THIS IS CURRENTLY UNSUPPORTED -- DEFAULTS TO FALSE
+	 * 
+	 * @param active
+	 */
+	@Override
+	public void toggleAttachOPMArtifactValues(boolean active) {
+		analyser.setRecordArtifactValues(active);
+	}
+
+	/**
+	 * @return true if the OPM graph artifacts are annotated with actual values
+	 */
+	@Override
+	public boolean isAttachOPMArtifactValues() {
+		return analyser.isRecordArtifactValues();
+	}
+
+	/**
+	 * @deprecated as workflow 'names' are not globally unique, this method
+	 *             should not be used!
+	 * @param workflowName
+	 * @return
+	 */
+	@Override
+	public String getWorkflowIDForExternalName(String workflowName) {
+		return querier.getWorkflowIdForExternalName(workflowName);
+	}
+
+	@Override
+	public List<ProvenanceProcessor> getProcessorsForWorkflowID(
+			String workflowID) {
+		return querier.getProcessorsForWorkflow(workflowID);
+	}
+
+	/**
+	 * @return the singleton {@link AbstractProvenanceConnector} used by the API
+	 *         to operate on the DB. Currently we support MySQL
+	 *         {@link MySQLProvenanceConnector} and Derby
+	 *         {@link DerbyProvenanceConnector} connectors. The set of supported
+	 *         connectors is extensible. The available connectors are discovered
+	 *         automatically by the API upon startup, and it includes all the
+	 *         connectors that are mentioned in the &lt;dependencies> section of
+	 *         pom.xml for Maven module
+	 *         {@code net.sf.taverna.t2.core.provenanceconnector}
+	 */
+	@Override
+	public AbstractProvenanceConnector getProvenanceConnector() {
+		return connector;
+	}
+
+	/**
+	 * @param provenanceConnector
+	 *            a specific provenanceConnector used by the API
+	 */
+	public void setProvenanceConnector(
+			AbstractProvenanceConnector provenanceConnector) {
+		this.connector = provenanceConnector;
+	}
+
+	/**
+	 * @return
+	 */
+	@Override
+	public ProvenanceAnalysis getAnalysis() {
+		return analyser;
+	}
+
+	/**
+	 * @param pa
+	 *            the pa to set
+	 */
+	public void setPa(ProvenanceAnalysis pa) {
+		this.analyser = pa;
+	}
+
+	/**
+	 * @return the pq
+	 */
+	@Override
+	public ProvenanceQuery getQuery() {
+		return querier;
+	}
+
+	/**
+	 * @param pq
+	 *            the pq to set
+	 */
+	public void setPq(ProvenanceQuery pq) {
+		this.querier = pq;
+	}
+
+	@Override
+	public List<ProcessorEnactment> getProcessorEnactments(
+			String workflowRunId, String... processorPath) {
+		return querier.getProcessorEnactments(workflowRunId, processorPath);
+	}
+
+	@Override
+	public ProcessorEnactment getProcessorEnactmentByProcessId(
+			String workflowRunId, String processIdentifier, String iteration) {
+		return querier.getProcessorEnactmentByProcessId(workflowRunId,
+				processIdentifier, iteration);
+	}
+
+	@Override
+	public ProcessorEnactment getProcessorEnactment(String processorEnactmentId) {
+		return querier.getProcessorEnactment(processorEnactmentId);
+	}
+
+	@Override
+	public ProvenanceProcessor getProvenanceProcessor(String workflowId,
+			String processorNameRef) {
+		return querier.getProvenanceProcessorByName(workflowId, processorNameRef);
+	}
+
+	@Override
+	public ProvenanceProcessor getProvenanceProcessor(String processorId) {
+		return querier.getProvenanceProcessorById(processorId);
+	}
+
+	@Override
+	public Map<Port, T2Reference> getDataBindings(String dataBindingId) {
+		Map<Port, T2Reference> references = new HashMap<>();
+		for (Entry<Port, String> entry : querier.getDataBindings(dataBindingId)
+				.entrySet())
+			references.put(entry.getKey(), getProvenanceConnector()
+					.getReferenceService()
+					.referenceFromString(entry.getValue()));
+		return references;
+	}
+
+	@Override
+	public DataflowInvocation getDataflowInvocation(String workflowRunId) {
+		return querier.getDataflowInvocation(workflowRunId);
+	}
+
+	@Override
+	public DataflowInvocation getDataflowInvocation(
+			ProcessorEnactment processorEnactment) {
+		return querier.getDataflowInvocation(processorEnactment);
+	}
+
+	@Override
+	public List<DataflowInvocation> getDataflowInvocations(String workflowRunId) {
+		return querier.getDataflowInvocations(workflowRunId);
+	}
+
+	@Override
+	public List<DataLink> getDataLinks(String workflowId) {
+		try {
+			Map<String, String> queryConstraints = new HashMap<>();
+			queryConstraints.put("workflowId", workflowId);
+			return querier.getDataLinks(queryConstraints);
+		} catch (SQLException e) {
+			logger.error(
+					"Problem getting datalinks for workflow:" + workflowId, e);
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java
new file mode 100644
index 0000000..4fb61dc
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java
@@ -0,0 +1,16 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.taverna.provenance.api;
+
+/**
+ * Defines names for the common Provenance Connector types
+ * 
+ * @author Stuart Owen
+ */
+public class ProvenanceConnectorType {
+	public static final String MYSQL = "mysql";
+	public static final String DERBY = "derby";
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java
new file mode 100644
index 0000000..1c3227a
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java
@@ -0,0 +1,105 @@
+/**
+ * 
+ */
+package org.apache.taverna.provenance.api;
+
+import java.util.List;
+
+import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor;
+import org.apache.taverna.provenance.lineageservice.utils.QueryPort;
+
+/**
+ * Bean encapsulating one provenance query, consisting of the following
+ * elements:
+ * <ul>
+ * <li>static scope: the (single) name of the workflow whose run(s) are queried
+ * <li>dynamic scope: a list of workflow run IDs.
+ * <li>a list of &lt;select> variables, encoded as List&lt;{@link QueryPort}>
+ * <li>a list of &lt;target> processors, encoded as List&lt;
+ * {@link ProvenanceProcessor}>
+ * </ul>
+ * 
+ * @author Paolo Missier
+ */
+public class Query {
+	private String workflowName;
+	private List<QueryPort> targetPorts;
+	private List<String> runIDList;
+	private List<ProvenanceProcessor> selectedProcessors;
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("\n **** QUERY SCOPE: ****\n").append("\tworkflow name: ")
+				.append(getWorkflowName()).append("\n\truns: ");
+		for (String r : getRunIDList())
+			sb.append("\n\t").append(r);
+		sb.append("\n**** TARGET PORTS: ****\n");
+		for (QueryPort v : getTargetPorts())
+			sb.append("\n\t").append(v);
+		sb.append("\n\n**** SELECTED PROCESSORS: **** ");
+		for (ProvenanceProcessor pp : getSelectedProcessors())
+			sb.append("\n\t").append(pp);
+		return sb.toString();
+	}
+
+	/**
+	 * @return the targetVars
+	 */
+	public List<QueryPort> getTargetPorts() {
+		return targetPorts;
+	}
+
+	/**
+	 * @param targetVars
+	 *            the targetVars to set
+	 */
+	public void setTargetPorts(List<QueryPort> targetVars) {
+		this.targetPorts = targetVars;
+	}
+
+	/**
+	 * @return the selectedProcessors
+	 */
+	public List<ProvenanceProcessor> getSelectedProcessors() {
+		return selectedProcessors;
+	}
+
+	/**
+	 * @param selectedProcessors
+	 *            the selectedProcessors to set
+	 */
+	public void setFocus(List<ProvenanceProcessor> selectedProcessors) {
+		this.selectedProcessors = selectedProcessors;
+	}
+
+	/**
+	 * @return the runIDList
+	 */
+	public List<String> getRunIDList() {
+		return runIDList;
+	}
+
+	/**
+	 * @param runIDList
+	 *            the runIDList to set
+	 */
+	public void setRunIDList(List<String> runIDList) {
+		this.runIDList = runIDList;
+	}
+
+	/**
+	 * @return the workflowName
+	 */
+	public String getWorkflowName() {
+		return workflowName;
+	}
+
+	/**
+	 * @param workflowName
+	 *            the workflowName to set
+	 */
+	public void setWorkflowName(String workflowName) {
+		this.workflowName = workflowName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java
new file mode 100644
index 0000000..e57b0b9
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java
@@ -0,0 +1,47 @@
+/**
+ * 
+ */
+package org.apache.taverna.provenance.api;
+
+/**
+ * Encapsulates a native Java data structure as a well as a String that holds
+ * the OPM graph that represents the query answer
+ * 
+ * @author Paolo Missier
+ * 
+ */
+public class QueryAnswer {
+	private NativeAnswer nativeAnswer;
+	private String _OPMAnswer_AsRDF;
+
+	/**
+	 * @return the native Java part of the query answer
+	 */
+	public NativeAnswer getNativeAnswer() {
+		return nativeAnswer;
+	}
+
+	/**
+	 * @param sets
+	 *            the query answer
+	 */
+	public void setNativeAnswer(NativeAnswer a) {
+		this.nativeAnswer = a;
+	}
+
+	/**
+	 * @return the OPM graph as RDF/XML string, or null if OPM was inhibited
+	 *         {@see OPM.computeGraph in APIClient.properties}
+	 */
+	public String getOPMAnswer_AsRDF() {
+		return _OPMAnswer_AsRDF;
+	}
+
+	/**
+	 * @param set
+	 *            the OPM graph as RDF/XML string
+	 */
+	public void setOPMAnswer_AsRDF(String asRDF) {
+		_OPMAnswer_AsRDF = asRDF;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java
new file mode 100644
index 0000000..dd6b488
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java
@@ -0,0 +1,646 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ *
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.connector;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.taverna.invocation.InvocationContext;
+import net.sf.taverna.t2.provenance.item.ProvenanceItem;
+import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
+import org.apache.taverna.provenance.lineageservice.EventProcessor;
+import org.apache.taverna.provenance.lineageservice.LineageQueryResultRecord;
+import org.apache.taverna.provenance.lineageservice.Provenance;
+import org.apache.taverna.provenance.lineageservice.ProvenanceAnalysis;
+import org.apache.taverna.provenance.lineageservice.ProvenanceQuery;
+import org.apache.taverna.provenance.lineageservice.ProvenanceWriter;
+import org.apache.taverna.provenance.lineageservice.WorkflowDataProcessor;
+import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
+import org.apache.taverna.reference.ReferenceService;
+
+import org.apache.log4j.Logger;
+
+import org.apache.taverna.configuration.database.DatabaseManager;
+
+/**
+ * Collects {@link ProvenanceItem}s as it travels up and down the dispatch stack
+ * inside the InvocationContext
+ *
+ * @author Ian Dunlop
+ * @author Stuart Owen
+ *
+ */
+public abstract class AbstractProvenanceConnector implements ProvenanceReporter {
+
+	public static enum ActivityTable {
+		Activity, activityId, activityDefinition, workflowId;
+
+		public static String getCreateTable() {
+			return "CREATE TABLE " + Activity + "(\n"
+			+ activityId + " varchar(36) NOT NULL,\n"
+			+ activityDefinition + " blob NOT NULL,\n"
+			+ workflowId + " varchar(100) NOT NULL, \n"
+			+ "PRIMARY KEY (" + activityId + ")\n" + ")";
+		}
+	}
+
+	public static enum CollectionTable {
+		Collection, collID, parentCollIDRef, workflowRunId, processorNameRef, portName, iteration;
+		public static String getCreateTable() {
+			return "CREATE TABLE " + Collection + " (\n"
+				+ collID + " varchar(100) NOT NULL,\n"
+				+ parentCollIDRef + " varchar(100) NOT NULL ,\n"
+				+ workflowRunId + " varchar(36) NOT NULL,\n"
+				+ processorNameRef + " varchar(100) NOT NULL,\n"
+				+ portName + " varchar(100) NOT NULL,\n"
+				+ iteration + " varchar(2000) NOT NULL default '',\n"
+				+ " PRIMARY KEY (\n"
+				+ collID + "," + workflowRunId + "," + processorNameRef
+				+ "," + portName + "," + parentCollIDRef + "," + iteration
+				+ "))";
+		}
+	}
+
+	public static enum DataBindingTable {
+		DataBinding, dataBindingId, portId, t2Reference, workflowRunId;
+
+		public static String getCreateTable() {
+			return "CREATE TABLE " + DataBinding + "(\n"
+			+ dataBindingId + " varchar(36) NOT NULL,\n"
+			+ portId + " varchar(36) NOT NULL,\n"
+			+ t2Reference + " varchar(100) NOT NULL,\n"
+			+ workflowRunId + " varchar(100) NOT NULL, \n"
+			+ "PRIMARY KEY (" + dataBindingId + "," + portId + ")\n" + ")";
+		}
+	}
+
+	public static enum DataflowInvocationTable {
+		DataflowInvocation, dataflowInvocationId,
+		workflowId,
+		invocationStarted, invocationEnded,
+		inputsDataBinding, outputsDataBinding,
+		parentProcessorEnactmentId, workflowRunId, completed;
+
+		public static String getCreateTable() {
+			return "CREATE TABLE " + DataflowInvocation + "(\n"
+			+ dataflowInvocationId + " varchar(36) NOT NULL,\n"
+			+ workflowId + " varchar(100) NOT NULL, \n"
+			+ invocationStarted + " timestamp, \n"
+			+ invocationEnded + " timestamp, \n"
+			+ inputsDataBinding + " varchar(36),\n"
+			+ outputsDataBinding + " varchar(36),\n"
+			+ parentProcessorEnactmentId + " varchar(36), \n"
+			+ workflowRunId + " varchar(100) NOT NULL, \n"
+			+ completed + " smallint NOT NULL,\n"
+			+ "PRIMARY KEY (" + dataflowInvocationId+ ")\n" + ")";
+		}
+	}
+
+	public static enum DataLinkTable {
+		Datalink, sourcePortName, sourcePortId, destinationPortId,
+		destinationPortName, sourceProcessorName, destinationProcessorName, workflowId;
+		public static String getCreateTable() {
+			return "CREATE TABLE " + Datalink + " (\n"
+					+ sourcePortName + " varchar(100) NOT NULL ,\n"
+					+ sourcePortId + " varchar(36) NOT NULL ,\n"
+					+ destinationPortId + " varchar(36) NOT NULL ,\n"
+					+ destinationPortName + " varchar(100) NOT NULL,\n"
+					+ sourceProcessorName + " varchar(100) NOT NULL,\n"
+					+ destinationProcessorName + " varchar(100) NOT NULL,\n"
+					+ workflowId + " varchar(36) NOT NULL,"
+					+ " PRIMARY KEY  ("
+					+ sourcePortId + "," + destinationPortId + "," + workflowId
+					+ "))";
+		}
+	}
+
+	public static enum PortBindingTable {
+		PortBinding, portName, workflowRunId, value, collIDRef, positionInColl, processorNameRef, valueType, ref, iteration, workflowId;
+		public static String getCreateTable() {
+			return  "CREATE TABLE " + PortBinding + " (\n"
+			+ portName + " varchar(100) NOT NULL,\n"
+			+ workflowRunId + " varchar(100) NOT NULL,\n"
+			+ value + " varchar(100) default NULL,\n"
+			+ collIDRef + " varchar(100),\n"
+			+ positionInColl + " int NOT NULL,\n"
+			+ processorNameRef + " varchar(100) NOT NULL,\n"
+			+ valueType + " varchar(50) default NULL,\n"
+			+ ref + " varchar(100) default NULL,\n"
+			+ iteration + " varchar(2000) NOT NULL,\n"
+			+ workflowId + " varchar(36),\n"
+			+ "PRIMARY KEY (\n"
+			+ portName + "," + workflowRunId + ","
+			+ processorNameRef + "," + iteration + ", " + workflowId
+			+ "))";
+		}
+	}
+
+	public static enum PortTable {
+		Port, portId, processorId, portName, isInputPort, processorName,
+		workflowId, depth, resolvedDepth, iterationStrategyOrder;
+		public static String getCreateTable() {
+			return  "CREATE TABLE " + Port + " (\n"
+			+ portId + " varchar(36) NOT NULL,\n"
+			+ processorId + " varchar(36),\n"
+			+ portName + " varchar(100) NOT NULL,\n"
+			+ isInputPort + " smallint NOT NULL ,\n"
+			+ processorName + " varchar(100) NOT NULL,\n"
+			+ workflowId + " varchar(36) NOT NULL,\n"
+			+ depth + " int,\n"
+			+ resolvedDepth + " int,\n"
+			+ iterationStrategyOrder + " smallint, \n"
+			+ "PRIMARY KEY (" + "portId" + "),\n"
+			+ "CONSTRAINT port_constraint UNIQUE (\n"
+			+ portName + "," + isInputPort + "," + processorName + "," + workflowId + "\n"
+			+ "))";
+		}
+	}
+
+	public static enum ProcessorEnactmentTable {
+		ProcessorEnactment, processEnactmentId, workflowRunId, processorId,
+		processIdentifier, iteration, parentProcessorEnactmentId,
+		enactmentStarted, enactmentEnded, initialInputsDataBindingId,
+		finalOutputsDataBindingId;
+
+		public static String getCreateTable() {
+			return "CREATE TABLE " + ProcessorEnactment + " (\n"
+			+ processEnactmentId + " varchar(36) NOT NULL, \n"
+			+ workflowRunId + " varchar(100) NOT NULL, \n"
+			+ processorId + " varchar(36) NOT NULL, \n"
+			+ processIdentifier + " varchar(2047) NOT NULL, \n"
+			+ iteration + " varchar(100) NOT NULL, \n"
+			+ parentProcessorEnactmentId + " varchar(36), \n"
+			+ enactmentStarted + " timestamp, \n"
+			+ enactmentEnded + " timestamp, \n"
+			+ initialInputsDataBindingId + " varchar(36), \n"
+			+ finalOutputsDataBindingId + " varchar(36), \n"
+			+ " PRIMARY KEY (" + processEnactmentId + ")" + ")";
+		}
+	}
+
+	public static enum ProcessorTable {
+		Processor,processorId, processorName,workflowId,firstActivityClass,isTopLevel ;
+		public static String getCreateTable() {
+			return  "CREATE TABLE "+ Processor +" (\n"
+			+ processorId + " varchar(36) NOT NULL,\n"
+			+ processorName + " varchar(100) NOT NULL,\n"
+			+ workflowId + " varchar(36) NOT NULL ,\n\n"
+			+ firstActivityClass + " varchar(100) default NULL,\n"
+			+ isTopLevel + " smallint, \n"
+			+ "PRIMARY KEY (" + processorId+ "),\n"
+			+ "CONSTRAINT processor_constraint UNIQUE (\n"
+			+	processorName + "," + workflowId + "))";
+		}
+	}
+
+	public static enum ServiceInvocationTable {
+		ServiceInvocation, processorEnactmentId, workflowRunId,
+		invocationNumber, invocationStarted, invocationEnded,
+		inputsDataBinding, outputsDataBinding, failureT2Reference,
+		activityId, initiatingDispatchLayer, finalDispatchLayer;
+
+		public static String getCreateTable() {
+			return "CREATE TABLE " + ServiceInvocation + "(\n"
+			+ processorEnactmentId + " varchar(36) NOT NULL,\n"
+			+ workflowRunId + " varchar(100) NOT NULL, \n"
+			+ invocationNumber + " bigint NOT NULL,\n"
+			+ invocationStarted + " timestamp, \n"
+			+ invocationEnded + " timestamp, \n"
+			+ inputsDataBinding + " varchar(36),\n"
+			+ outputsDataBinding + " varchar(36),\n"
+			+ failureT2Reference + " varchar(100) default NULL,\n"
+			+ activityId + " varchar(36),\n"
+			+ initiatingDispatchLayer + " varchar(250) NOT NULL,\n"
+			+ finalDispatchLayer + " varchar(250) NOT NULL,\n"
+			+ "PRIMARY KEY (" + processorEnactmentId + ", "
+			+ invocationNumber + "))";
+		}
+	}
+
+	public static enum WorkflowRunTable {
+		WorkflowRun, workflowRunId, workflowId, timestamp;
+		public static String getCreateTable() {
+			return  "CREATE TABLE " + WorkflowRun + " (\n"
+			+ workflowRunId + " varchar(36) NOT NULL,\n"
+			+ workflowId + " varchar(36) NOT NULL,\n"
+			+ timestamp + " timestamp NOT NULL default CURRENT_TIMESTAMP,\n"
+			+ " PRIMARY KEY (" + workflowRunId + ", " + workflowId + "))";
+		}
+	}
+
+	public static enum WorkflowTable {
+		WorkflowTable, workflowId, parentWorkflowId, externalName, dataflow;
+		public static String getCreateTable() {
+			return "CREATE TABLE " + "Workflow (\n" +
+					workflowId	+ " varchar(36) NOT NULL,\n"
+					+ parentWorkflowId + " varchar(100),\n"
+					+ externalName + " varchar(100),\n"
+					+ dataflow + " blob, \n"
+					+ "PRIMARY KEY  (" + workflowId	+ "))";
+		}
+	}
+
+	private static Logger logger = Logger.getLogger(AbstractProvenanceConnector.class);
+	private String saveEvents;
+	private ProvenanceAnalysis provenanceAnalysis;
+	private ExecutorService executor = Executors.newSingleThreadExecutor();
+	private boolean finished = false;
+	private String sessionID;
+	private InvocationContext invocationContext;
+	private ReferenceService referenceService;
+
+	private Provenance provenance;
+	private ProvenanceWriter writer;
+	private ProvenanceQuery query;
+	private WorkflowDataProcessor wfdp;
+	private EventProcessor eventProcessor;
+	private final DatabaseManager databaseManager;
+
+	public AbstractProvenanceConnector(DatabaseManager databaseManager) {
+		this.databaseManager = databaseManager;
+	}
+
+	/**
+	 * Set up the the {@link EventProcessor}, {@link ProvenanceWriter} &
+	 * {@link ProvenanceQuery}. Since it is an SPI you don't want any code
+	 * cluttering the default constructor. Call this method after instantiation
+	 * and after the dbURL has been set.
+	 */
+	public void init() {
+        createDatabase();
+		try {
+			setWfdp(new WorkflowDataProcessor());
+			getWfdp().setPq(getQuery());
+			getWfdp().setPw(getWriter());
+
+			setEventProcessor(new EventProcessor());
+			getEventProcessor().setPw(getWriter());
+			getEventProcessor().setPq(getQuery());
+			getEventProcessor().setWfdp(getWfdp());
+
+			setProvenanceAnalysis(new ProvenanceAnalysis(getQuery()));
+			setProvenance(new Provenance(getEventProcessor()));
+		} catch (InstantiationException | IllegalAccessException
+				| ClassNotFoundException | SQLException e) {
+			logger.error("Problem with provenance initialisation: ", e);
+		}
+	}
+
+	/**
+	 * @return the invocationContext
+	 */
+	@Override
+	public InvocationContext getInvocationContext() {
+		return invocationContext;
+	}
+
+	/**
+	 * @param invocationContext
+	 *            the invocationContext to set
+	 */
+	@Override
+	public void setInvocationContext(InvocationContext invocationContext) {
+		this.invocationContext = invocationContext;
+	}
+
+	/**
+	 * @return the referenceService
+	 */
+	@Override
+	public ReferenceService getReferenceService() {
+		return referenceService;
+	}
+
+	/**
+	 * @param referenceService
+	 *            the referenceService to set
+	 */
+	@Override
+	public void setReferenceService(ReferenceService referenceService) {
+		this.referenceService = referenceService;
+	}
+
+	/**
+	 * Uses a {@link ScheduledThreadPoolExecutor} to process events in a Thread
+	 * safe manner
+	 */
+	@Override
+	public synchronized void addProvenanceItem(
+			final ProvenanceItem provenanceItem) {
+
+//		Runnable runnable = new Runnable() {
+//
+//			public void run() {
+				try {
+
+					getProvenance().acceptRawProvenanceEvent(
+							provenanceItem.getEventType(), provenanceItem);
+
+				} catch (SQLException e) {
+					logger.warn("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier(), e);
+				} catch (IOException e) {
+					logger.error("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier(), e);
+				} catch (RuntimeException e) {
+					logger.error("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier(), e);
+				}
+//
+//			}
+//		};
+//		getExecutor().execute(runnable);
+
+	}
+
+	protected Connection getConnection() throws SQLException {
+		return databaseManager.getConnection();
+	}
+
+	/**
+	 * Used by database backed provenance stores. Ask the implementation to
+	 * create the database. Requires each datbase type to create all its own
+	 * tables
+	 */
+	public abstract void createDatabase();
+
+
+	public void clearDatabase() { clearDatabase(true); }
+
+	/**
+	 * Clear all the values in the database but keep the db there
+	 */
+	public void clearDatabase(boolean isClearDB) {
+		if (isClearDB) {
+			logger.info("clearing DB");
+			try {
+				getWriter().clearDBStatic();
+
+				Set<String> danglingDataRefs = getWriter().clearDBDynamic();
+
+				logger.info("references collected during removeRun:");
+				for (String s : danglingDataRefs)
+					logger.info(s);
+			} catch (SQLException e) {
+				logger.error("Problem clearing database", e);
+			}
+		} else {
+			logger.error("clearDB is FALSE: not clearing");
+		}
+
+//		String q = null;
+//		Connection connection = null;
+
+//		Statement stmt = null;
+//		try {
+//		connection = getConnection();
+//		stmt = connection.createStatement();
+//		} catch (SQLException e) {
+//		logger.warn("Could not create database statement :" + e);
+//		} catch (InstantiationException e) {
+//		logger.warn("Could not create database statement :" + e);
+//		} catch (IllegalAccessException e) {
+//		logger.warn("Could not create database statement :" + e);
+//		} catch (ClassNotFoundException e) {
+//		logger.warn("Could not create database statement :" + e);
+//		}
+
+//		q = "DELETE FROM Workflow";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+//		q = "DELETE FROM Processor";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+//		q = "DELETE FROM Datalink";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+//		q = "DELETE FROM Port";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+//		q = "DELETE FROM WorkflowRun";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+//		q = "DELETE FROM ProcBinding";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+//		q = "DELETE FROM PortBinding";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+//		q = "DELETE FROM Collection";
+//		try {
+//		stmt.executeUpdate(q);
+//		} catch (SQLException e) {
+//		logger.warn("Could not execute statement " + q + " :" + e);
+//		}
+
+
+//		if (connection!=null) try {
+//		connection.close();
+//		} catch (SQLException ex) {
+//		logger.error("Error closing connection",ex);
+//		}
+	}
+
+	/**
+	 * The name for this type of provenance connector. Is used by the workbench
+	 * to ensure it adds the correct one to the InvocationContext
+	 *
+	 * @return
+	 */
+	public abstract String getName();
+
+	/**
+	 * A unique identifier for this run of provenance, should correspond to the
+	 * initial {@link WorkflowProvenanceItem} idenifier that gets sent through
+	 *
+	 * @param identifier
+	 */
+	@Override
+	public void setSessionID(String sessionID) {
+		this.sessionID = sessionID;
+	}
+
+	/**
+	 * What is the unique identifier used by this connector
+	 *
+	 * @return
+	 */
+	@Override
+	public String getSessionID() {
+		return sessionID;
+	}
+
+
+	public List<LineageQueryResultRecord> computeLineage(String workflowRun,
+			String port, String proc, String path, Set<String> selectedProcessors) {
+		return null;
+	}
+
+	public String getDataflowInstance(String dataflowId) {
+		try {
+			return (getProvenance()).getPq().getRuns(dataflowId, null).get(0)
+					.getWorkflowRunId();
+		} catch (SQLException e) {
+			logger.error("Error finding the dataflow instance", e);
+			return null;
+		}
+	}
+
+	/**
+	 * @return the saveEvents
+	 */
+	public String getSaveEvents() {
+		return saveEvents;
+	}
+
+	/**
+	 * @param saveEvents
+	 *            the saveEvents to set
+	 */
+	public void setSaveEvents(String saveEvents) {
+		this.saveEvents = saveEvents;
+	}
+
+	public void setProvenance(Provenance provenance) {
+		this.provenance = provenance;
+	}
+
+	public Provenance getProvenance() {
+		return provenance;
+	}
+
+	public void setFinished(boolean finished) {
+		this.finished = finished;
+	}
+
+	public boolean isFinished() {
+		return finished;
+	}
+
+	public void setExecutor(ExecutorService executor) {
+		this.executor = executor;
+	}
+
+	public synchronized ExecutorService getExecutor() {
+		return executor;
+	}
+
+	public void setProvenanceAnalysis(ProvenanceAnalysis provenanceAnalysis) {
+		this.provenanceAnalysis = provenanceAnalysis;
+	}
+
+	/**
+	 * Use this {@link ProvenanceAnalysis} to carry out lineage queries on the
+	 * provenance
+	 *
+	 * @return
+	 */
+	public ProvenanceAnalysis getProvenanceAnalysis() {
+		return provenanceAnalysis;
+	}
+
+	/**
+	 * @return the writer
+	 */
+	public ProvenanceWriter getWriter() {
+		return writer;
+	}
+
+	/**
+	 * @param writer the writer to set
+	 */
+	protected void setWriter(ProvenanceWriter writer) {
+		this.writer = writer;
+	}
+
+	/**
+	 * @return the query
+	 */
+	public ProvenanceQuery getQuery() {
+		return query;
+	}
+
+	/**
+	 * @param query the query to set
+	 */
+	protected void setQuery(ProvenanceQuery query) {
+		this.query = query;
+	}
+
+	/**
+	 * @return the wfdp
+	 */
+	public WorkflowDataProcessor getWfdp() {
+		return wfdp;
+	}
+
+	/**
+	 * @param wfdp the wfdp to set
+	 */
+	public void setWfdp(WorkflowDataProcessor wfdp) {
+		this.wfdp = wfdp;
+	}
+
+	/**
+	 * @return the eventProcessor
+	 */
+	public EventProcessor getEventProcessor() {
+		return eventProcessor;
+	}
+
+	/**
+	 * @param eventProcessor the eventProcessor to set
+	 */
+	public void setEventProcessor(EventProcessor eventProcessor) {
+		this.eventProcessor = eventProcessor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java
new file mode 100644
index 0000000..6e78c0d
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java
@@ -0,0 +1,5 @@
+package org.apache.taverna.provenance.connector;
+
+public class ProvenanceSQL {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java
new file mode 100644
index 0000000..193d0e0
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester   
+ * 
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ * 
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *    
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *    
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+
+/**
+ * @author Paolo Missier
+ * 
+ */
+public class AnnotationsLoader {
+	private static Logger logger = Logger.getLogger(AnnotationsLoader.class);
+
+	/**
+	 * @param annotationFile
+	 *            by convention we use <workflow file name>+"annotations"
+	 * @return a map pname -> annotation so that the lineage query alg can use
+	 *         the annotation when processing pname
+	 */
+	@SuppressWarnings("unchecked")
+	public Map<String,List<String>>  getAnnotations(String annotationFile)  {
+		Map<String, List<String>> procAnnotations = new HashMap<>();
+
+		// load XML file as doc
+//		parse the event into DOM
+		SAXBuilder b = new SAXBuilder();
+
+		try {
+			Document d = b.build(new FileReader(annotationFile));
+			if (d == null)
+				return null;
+
+			// look for all processor elements
+			for (Element el : (List<Element>) d.getRootElement().getChildren()) {
+				String pName = el.getAttributeValue("name");
+				logger.info("processor name: " + pName);
+
+				List<String> annotations = new ArrayList<>();
+				// extract all annotations for this pname
+
+				for (Element annotElement : (List<Element>) el.getChildren()) {
+					String annot = annotElement.getAttributeValue("type");
+					logger.info("annotation: " + annot);
+
+					// add this annotation
+					annotations.add(annot);
+				}
+
+				procAnnotations.put(pName, annotations);
+			}
+		} catch (JDOMException | IOException e) {
+			logger.error("Problem getting annotations from: " + annotationFile,
+					e);
+		}
+		return procAnnotations;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java
new file mode 100644
index 0000000..f06de95
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java
@@ -0,0 +1,127 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester   
+ * 
+ *  Modifications to the initial code base are copyright of their
+ *  respective authors, or their employers as appropriate.
+ * 
+ *  This program is free software; you can redistribute it and/or
+ *  modify it under the terms of the GNU Lesser General Public License
+ *  as published by the Free Software Foundation; either version 2.1 of
+ *  the License, or (at your option) any later version.
+ *    
+ *  This program is distributed in the hope that it will be useful, but
+ *  WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ *  Lesser General Public License for more details.
+ *    
+ *  You should have received a copy of the GNU Lesser General Public
+ *  License along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * Java bean to hold a list of {@link LineageQueryResultRecord}, representing
+ * one record in the provenance DB at the finest possible level of granularity,
+ * i.e., a single value possibly within a collection, bound to a processor port
+ * and associated to a specific run of a specific workflow.
+ * 
+ * @author Paolo Missier
+ * @see LineageQueryResultRecord
+ */
+public class Dependencies {
+	final public static String COLL_TYPE = "referenceSetCollection";
+	final public static String ATOM_TYPE = "referenceSet";
+	
+	boolean printResolvedValue;
+
+	private List<LineageQueryResultRecord> records = new ArrayList<>();
+
+	public ListIterator<LineageQueryResultRecord> iterator() {
+		return getRecords().listIterator();
+	}
+
+	/**
+	 * adds a single record to the list of dependencies
+	 * 
+	 * @param workflowId
+	 * @param pname
+	 * @param vname
+	 * @param workflowRun
+	 * @param iteration
+	 * @param collIdRef
+	 * @param parentCollIDRef
+	 * @param value
+	 * @param resolvedValue
+	 * @param type
+	 * @param isInput
+	 * @param isCollection
+	 */
+	public void addLineageQueryResultRecord(String workflowId, String pname,
+			String vname, String workflowRun, String iteration,
+			String collIdRef, String parentCollIDRef, String value,
+			String resolvedValue, String type, boolean isInput,
+			boolean isCollection) {
+
+		LineageQueryResultRecord record = new LineageQueryResultRecord();
+
+		record.setWorkflowId(workflowId);
+		record.setWorkflowRunId(workflowRun);
+		record.setProcessorName(pname);
+		record.setValue(value);
+		record.setPortName(vname);
+		record.setIteration(iteration);
+		record.setResolvedValue(resolvedValue);
+		record.setIsInputPort(isInput);
+		record.setCollectionT2Reference(collIdRef);
+		record.setParentCollIDRef(parentCollIDRef);
+		record.setCollection(isCollection);
+
+		getRecords().add(record);
+	}
+
+	/**
+	 * populates the bean with an entire list of {@link LineageQueryResultRecord} elements
+	 * @param records
+	 */
+	public void setRecords(List<LineageQueryResultRecord> records) {
+		this.records = records;
+	}
+
+	/**
+	 * @return the entire set of records
+	 */
+	public List<LineageQueryResultRecord> getRecords() {
+		return records;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		for (LineageQueryResultRecord record : getRecords()) {
+			record.setPrintResolvedValue(printResolvedValue);
+			sb.append("***  record: ****\n"+record.toString());
+		}		
+		return sb.toString();
+	}
+
+	/**
+	 * @return true is the records contain the actual values, in addition to the URI references to the values
+	 * <br/>NOT YET SUPPORTED. This switch is currently ignored and no values are returned in the current version 
+	 */
+	public boolean isPrintResolvedValue() {
+		return printResolvedValue;
+	}
+
+	/**
+	 * @param toggles insertion of values in addition to references to values in the records
+	 * <br/>NOT YET SUPPORTED. This switch is currently ignored and no values are returned in the current version 
+	 */
+	public void setPrintResolvedValue(boolean printResolvedValue) {
+		this.printResolvedValue = printResolvedValue;
+	}
+}