You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by re...@apache.org on 2015/03/30 15:47:18 UTC
[06/12] incubator-taverna-engine git commit: some provenance
refactoring
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java
new file mode 100644
index 0000000..071e9c8
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/NativeAnswer.java
@@ -0,0 +1,66 @@
+/**
+ *
+ */
+package org.apache.taverna.provenance.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.taverna.provenance.lineageservice.Dependencies;
+import org.apache.taverna.provenance.lineageservice.utils.QueryPort;
+import org.apache.taverna.provenance.lineageservice.LineageQueryResultRecord;
+
+/**
+ * Java bean used to encapsulate the results of a provenance query. <br/>
+ * This takes the form of a nested map, see {@link #getAnswer} for details on
+ * its structure.
+ *
+ * @author Paolo Missier
+ */
+public class NativeAnswer {
+ Map<QueryPort, Map<String, List<Dependencies>>> answer;
+
+ /**
+ * @return a Map of the form: {@link QueryPort} --> ( <path> --> [
+ * {@link Dependencies} ]) where
+ * <ul>
+ * <li> {@link QueryVar} denotes one of the ports in the <select>
+ * element of the input query, for example:
+ * <em>converter:atlasSlice</em>
+ * <li><path> is the index within the value associated to the
+ * port, for instance [1,2,3] or []. The inner Map structure
+ * accounts for multiple paths, so for example if the query asked
+ * for the provenance of elements [1,2] and [2,3] of the value bound
+ * to <em>converter:atlasSlice</em>, then the inner Map structure
+ * contains two entries, one for each of the two paths.
+ * <li>for each such path, the corresponding [{@link Dependencies}]
+ * is a list of {@link Dependencies}, each associated with one
+ * <it>target processor and port</it> mentioned in the input query.
+ * For example, for path [1,2] of value bound to
+ * <em>converter:atlasSlice</em>, you may see the following list of
+ * Dependencies:
+ * <ul>
+ * <li>converter:atlasSlice:[2]
+ * <li>slicer:atlasAverage[2]
+ * <li>slicer:atlasAverage[0]
+ * </ul>
+ * etc. <br/>
+ * Each of these elements is described by a Java bean,
+ * {@link LineageQueryResultRecord}, which represents a single
+ * provenance data record. This means that the particular value
+ * depends on each of these other values that are mentioned in the
+ * Dependencies list.
+ */
+ public Map<QueryPort, Map<String, List<Dependencies>>> getAnswer() {
+ return answer;
+ }
+
+ /**
+ * @param sets
+ * the query answer, in the format described in
+ * {@link #getAnswer()}
+ */
+ public void setAnswer(Map<QueryPort, Map<String, List<Dependencies>>> answer) {
+ this.answer = answer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java
new file mode 100644
index 0000000..2445ca0
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceAccess.java
@@ -0,0 +1,725 @@
+/*******************************************************************************
+ * Copyright (C) 2009 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ *
+ ******************************************************************************/
+package org.apache.taverna.provenance.api;
+
+import static java.sql.Connection.TRANSACTION_READ_UNCOMMITTED;
+import static javax.naming.Context.INITIAL_CONTEXT_FACTORY;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.taverna.invocation.InvocationContext;
+import org.apache.taverna.invocation.impl.InvocationContextImpl;
+import org.apache.taverna.provenance.Provenance;
+import org.apache.taverna.provenance.ProvenanceConnectorFactory;
+import org.apache.taverna.provenance.connector.AbstractProvenanceConnector;
+import org.apache.taverna.provenance.lineageservice.Dependencies;
+import org.apache.taverna.provenance.lineageservice.LineageQueryResultRecord;
+import org.apache.taverna.provenance.lineageservice.ProvenanceAnalysis;
+import org.apache.taverna.provenance.lineageservice.ProvenanceQuery;
+import org.apache.taverna.provenance.lineageservice.ProvenanceWriter;
+import org.apache.taverna.provenance.lineageservice.utils.Collection;
+import org.apache.taverna.provenance.lineageservice.utils.DataLink;
+import org.apache.taverna.provenance.lineageservice.utils.DataflowInvocation;
+import org.apache.taverna.provenance.lineageservice.utils.PortBinding;
+import org.apache.taverna.provenance.lineageservice.utils.ProcessorEnactment;
+import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor;
+import org.apache.taverna.provenance.lineageservice.utils.Port;
+import org.apache.taverna.provenance.lineageservice.utils.Workflow;
+import org.apache.taverna.provenance.lineageservice.utils.WorkflowRun;
+import org.apache.taverna.provenance.lineageservice.utils.WorkflowTree;
+import org.apache.taverna.reference.ReferenceService;
+import org.apache.taverna.reference.T2Reference;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.log4j.Logger;
+
+/**
+ * This API is the single access point into the Taverna provenance database. Its
+ * main functionality is to let clients query the content of the DB, either
+ * using dedicated methods that retrieve specific entity values from the DB, or
+ * through a more general XML-based query language. Examples of XML provenance
+ * queries can be found in the external package
+ * {@link net.sf.taverna.t2.provenance.apic.client.resources}. Class
+ * {@link net.sf.taverna.t2.provenance.api.client.ProvenanceAPISampleClient}
+ * provides an example of API client that third parties would use to interact
+ * with this API.
+ * <p/>
+ * The XML schema for the XML query language is {@code pquery.xsd} in
+ * {@link net.sf.taverna.t2.provenance.apic.client.resources}
+ *
+ * @author Paolo Missier
+ * @author Stuart Owen
+ */
+public class ProvenanceAccess implements Provenance {
+ private static Logger logger = Logger.getLogger(ProvenanceAccess.class);
+
+ private AbstractProvenanceConnector connector = null;
+ private ProvenanceAnalysis analyser = null;
+ private ProvenanceQuery querier;
+ private ProvenanceWriter writer;
+
+ private String connectorType;
+ private final List<ProvenanceConnectorFactory> provenanceConnectorFactories;
+
+ public ProvenanceAccess(String connectorType,
+ List<ProvenanceConnectorFactory> provenanceConnectorFactories) {
+ this.connectorType = connectorType;
+ this.provenanceConnectorFactories = provenanceConnectorFactories;
+ init();
+ }
+
+ public ProvenanceAccess(String connectorType, InvocationContext context,
+ List<ProvenanceConnectorFactory> provenanceConnectorFactories) {
+ this.connectorType = connectorType;
+ this.provenanceConnectorFactories = provenanceConnectorFactories;
+ init(context);
+ }
+
+ /**
+ * The recommended data source intitialisation method, where only a driver
+ * name and jdbc url are required.<p/>
+ * If the driver supports multiple connections, then a pool will be created
+ * of 10 min idle, 50 max idle, and 50 max active connections.
+ *
+ * @param driverClassName
+ * @param jdbcUrl
+ */
+ public static void initDataSource(String driverClassName, String jdbcUrl) {
+ initDataSource(driverClassName, jdbcUrl, null, null, 10, 50, 50);
+ }
+
+ /**
+ * Initialises a named JNDI DataSource if not already set up externally. The
+ * DataSource is named jdbc/taverna
+ *
+ * @param driverClassName
+ * - the classname for the driver to be used.
+ * @param jdbcUrl
+ * - the jdbc connection url
+ * @param username
+ * - the username, if required (otherwise null)
+ * @param password
+ * - the password, if required (oteherwise null)
+ * @param minIdle
+ * - if the driver supports multiple connections, then the
+ * minumum number of idle connections in the pool
+ * @param maxIdle
+ * - if the driver supports multiple connections, then the
+ * maximum number of idle connections in the pool
+ * @param maxActive
+ * - if the driver supports multiple connections, then the
+ * minumum number of connections in the pool
+ */
+ public static void initDataSource(String driverClassName, String jdbcUrl,
+ String username, String password, int minIdle, int maxIdle,
+ int maxActive) {
+ System.setProperty(INITIAL_CONTEXT_FACTORY,
+ "org.osjava.sj.memory.MemoryContextFactory");
+ System.setProperty("org.osjava.sj.jndi.shared", "true");
+
+ BasicDataSource ds = new BasicDataSource();
+ ds.setDriverClassName(driverClassName);
+ ds.setDefaultTransactionIsolation(TRANSACTION_READ_UNCOMMITTED);
+ ds.setMaxActive(maxActive);
+ ds.setMinIdle(minIdle);
+ ds.setMaxIdle(maxIdle);
+ ds.setDefaultAutoCommit(true);
+ if (username != null)
+ ds.setUsername(username);
+ if (password != null)
+ ds.setPassword(password);
+ ds.setUrl(jdbcUrl);
+
+ try {
+ new InitialContext().rebind("jdbc/taverna", ds);
+ } catch (NamingException ex) {
+ logger.error("Problem rebinding the jdbc context", ex);
+ }
+ }
+
+ /**
+ * Initialises a default Reference Service for storing data and their associated references.
+ * This creates a reference service using the named JNDI Data Source 'jdbc/taverna'.<br/>
+ * the new Reference Service is associated to the {@link AbstractProvenanceConnector}, enabling data references to be resolved
+ */
+ @Override
+ public InvocationContext initDefaultReferenceService() {
+ // FIXME
+ return initReferenceService("hibernateReferenceServiceContext.xml");
+ }
+
+ /**
+ * Initialises the Reference Service for a given hibernate context definition.
+ * This mapping file must be available in the root of the classpath.
+ * @see #initDefaultReferenceService()
+ * @param hibernateContext
+ */
+ @Override
+ public InvocationContext initReferenceService(String hibernateContext) {
+ // FIXME
+ return new InvocationContextImpl(refService, connector);
+ }
+
+ private ReferenceService refService;
+ /**
+ * Set the Reference Service for the connector of this ProvenanceAccess
+ * if you do not 'like' the default one created when ProvenanceAccess is created.
+ */
+ public void setReferenceService(ReferenceService refService) {
+ this.refService = refService;
+ if (connector != null)
+ connector.setReferenceService(refService);
+ }
+
+ @Override
+ public void init() {
+ init(initDefaultReferenceService());
+ }
+
+ @Override
+ public void init(InvocationContext context) {
+ for (ProvenanceConnectorFactory factory : provenanceConnectorFactories)
+ if (connectorType.equalsIgnoreCase(factory.getConnectorType()))
+ connector = factory.getProvenanceConnector();
+ logger.info("Provenance being captured using: " + connector);
+
+ //slight change, the init is outside but it also means that the init call has to ensure that the dbURL
+ //is set correctly
+ connector.init();
+
+ connector.setReferenceService(context.getReferenceService()); // CHECK context.getReferenceService());
+ connector.setInvocationContext(context);
+
+ analyser = connector.getProvenanceAnalysis();
+ analyser.setInvocationContext(context);
+
+ querier = connector.getQuery();
+ writer = connector.getWriter();
+ writer.setQuery(querier);
+
+ logger.info("using writer of type: " + writer.getClass());
+ }
+
+ /*
+ * main provenance query methods
+ */
+
+ /**
+ * Executes a provenance query. Please see separate doc. for the XML query language schema.
+ * @throws SQLException
+ */
+ @Override
+ public QueryAnswer executeQuery(Query pq) throws SQLException {
+ return analyser.lineageQuery(pq.getTargetPorts(), pq.getRunIDList().get(0),
+ pq.getSelectedProcessors());
+ }
+
+ /**
+ * Returns individal records from the provenance DB in response to a query
+ * that specifies specific elements within values associated with a
+ * processor port, in the context of a specific run of a workflow. <br/>
+ * This is used in the workbench to retrieve the "intermediate results" at
+ * various points during workflow execution, as opposed to a set of
+ * dependencies in response to a full-fledged provenance query.
+ *
+ * @param workflowRunId
+ * lineage scope -- a specific instance
+ * @param processorName
+ * for a specific processor [required]
+ * @param a
+ * specific (input or output) variable [optional]
+ * @param iteration
+ * and a specific iteration [optional]
+ * @return a list of @ LineageQueryResultRecord} , encapsulated in a
+ * {@link Dependencies} object
+ * @throws SQLException
+ */
+ @Override
+ public Dependencies fetchPortData(String workflowRunId, String workflowId,
+ String processorName, String portName, String iteration) {
+ logger.info("running fetchPortData on instance " + workflowRunId
+ + " workflow " + workflowId + " processor " + processorName
+ + " port " + portName + " iteration " + iteration);
+ // TODO add context workflowID to query
+ try {
+ return analyser.fetchIntermediateResult(workflowRunId, workflowId,
+ processorName, portName, iteration);
+ } catch (SQLException e) {
+ logger.error("Problem with fetching intermediate results", e);
+ return null;
+ }
+ }
+
+ /**
+ * @param record a record representing a single value -- possibly within a list hierarchy
+ * @return the URI for topmost containing collection when the input record is within a list hierarchy, or null otherwise
+ */
+ @Override
+ public String getContainingCollection(LineageQueryResultRecord record) {
+ return querier.getContainingCollection(record);
+ }
+
+ /*
+ * manage instances
+ */
+
+ /**
+ * @param workflowId
+ * defines the scope of the query - if null then the query runs
+ * on all available workflows
+ * @param conditions
+ * additional conditions to be defined. This is a placeholder as
+ * conditions are currently ignored
+ * @return a list of workflowRunId, each representing one run of the input
+ * workflowID
+ */
+ @Override
+ public List<WorkflowRun> listRuns(String workflowId,
+ Map<String, String> conditions) {
+ try {
+ return querier.getRuns(workflowId, conditions);
+ } catch (SQLException e) {
+ logger.error("Problem with listing runs", e);
+ return null;
+ }
+ }
+
+ @Override
+ public boolean isTopLevelDataflow(String workflowId) {
+ return querier.isTopLevelDataflow(workflowId);
+ }
+
+ @Override
+ public boolean isTopLevelDataflow(String workflowId, String workflowRunId) {
+ return querier.isTopLevelDataflow(workflowId, workflowRunId);
+ }
+
+ @Override
+ public String getLatestRunID() throws SQLException {
+ return querier.getLatestRunID();
+ }
+
+ /**
+ * Removes all records that pertain to a specific run (but not the static
+ * specification of the workflow run)
+ *
+ * @param runID
+ * the internal ID of a run. This can be obtained using
+ * {@link #listRuns(String, Map)}
+ * @return the set of data references that pertain to the deleted run. This
+ * can be used by the Data Manager to ensure that no dangling
+ * references are left in the main Taverna data repositorry
+ */
+ @Override
+ public Set<String> removeRun(String runID) {
+ // implement using clearDynamic() method or a variation. Collect references and forward
+ try {
+ Set<String> danglingDataRefs = writer.clearDBDynamic(runID);
+
+ if (logger.isDebugEnabled())
+ logger.debug("references collected during removeRun: " + danglingDataRefs);
+
+ // TODO send the list of dangling refs to the Data manager for removal of the corresponding data values
+ return danglingDataRefs;
+ } catch (SQLException e) {
+ logger.error("Problem while removing run : " + runID, e);
+ return null;
+ }
+ }
+
+ /**
+ * removes all records pertaining to the static structure of a workflow.
+ *
+ * @param workflowId
+ * the ID (not the external name) of the workflow whose static
+ * structure is to be deleted from the DB
+ */
+ @Override
+ public void removeWorkflow(String workflowId) {
+ try {
+ writer.clearDBStatic(workflowId);
+ } catch (SQLException e) {
+ logger.error("Problem with removing static workflow: " + workflowId, e);
+ }
+ }
+
+ /**
+ * returns a set of workflowIDs for a given runID. The set is a singleton if
+ * the workflow has no nesting, but in general the list contains one
+ * workflowID for each nested workflow involved in the run
+ *
+ * @param runID
+ * the internal ID for a specific workflow run
+ * @return a list of workflow IDs, one for each nested workflow involved in
+ * the input run
+ */
+ @Override
+ public List<String> getWorkflowID(String runID) {
+ try {
+ return querier.getWorkflowIdsForRun(runID);
+ } catch (SQLException e) {
+ logger.error("Problem getting workflow ID: " + runID, e);
+ return null;
+ }
+ }
+
+ /**
+ * @param runID
+ * the internal ID for a specific workflow run
+ * @return the ID of the top-level workflow that executed during the input
+ * run
+ */
+ @Override
+ public String getTopLevelWorkflowID(String runID) {
+ try {
+ return querier.getTopLevelWorkflowIdForRun(runID);
+ } catch (SQLException e) {
+ logger.error("Problem getting top level workflow: " + runID, e);
+ return null;
+ }
+ }
+
+ @Override
+ public List<Workflow> getWorkflowsForRun(String runID) {
+ try {
+ return querier.getWorkflowsForRun(runID);
+ } catch (SQLException e) {
+ logger.error("Problem getting workflows for run:" + runID, e);
+ return null;
+ }
+ }
+
+ /**
+ * @return a list of {@link WorkflowRun} beans, each representing the
+ * complete description of a workflow run (note that this is not
+ * just the ID of the run)
+ */
+ @Override
+ public List<WorkflowRun> getAllWorkflowIDs() {
+ try {
+ return querier.getRuns(null, null);
+ } catch (SQLException e) {
+ logger.error("Problem getting all workflow IDs", e);
+ return null;
+ }
+ }
+
+// / access static workflow structure
+
+ /**
+ * @param workflowID
+ * @return a Map: workflowID -> [ @ link ProvenanceProcessor} ] Each entry
+ * in the list pertains to one composing sub-workflow (if no nesting
+ * then this contains only one workflow, namely the top level one)
+ */
+ @Override
+ public Map<String, List<ProvenanceProcessor>> getProcessorsInWorkflow(
+ String workflowID) {
+ return querier.getProcessorsDeep(null, workflowID);
+ }
+
+ @Override
+ public List<Collection> getCollectionsForRun(String wfInstanceID) {
+ return querier.getCollectionsForRun(wfInstanceID);
+ }
+
+ @Override
+ public List<PortBinding> getPortBindings(Map<String, String> constraints)
+ throws SQLException {
+ return querier.getPortBindings(constraints);
+ }
+
+ /**
+ * lists all ports for a workflow
+ *
+ * @param workflowID
+ * @return a list of {@link Port} beans, each representing an input or
+ * output port for the workflow
+ */
+ @Override
+ public List<Port> getPortsForDataflow(String workflowID) {
+ return querier.getPortsForDataflow(workflowID);
+ }
+
+ /**
+ * lists all ports for a workflow
+ *
+ * @param workflowID
+ * @return a list of {@link Port} beans, each representing an input or
+ * output port for the workflow or a processor in the workflow
+ */
+ @Override
+ public List<Port> getAllPortsInDataflow(String workflowID) {
+ return querier.getAllPortsInDataflow(workflowID);
+ }
+
+ /**
+ * list all ports for a specific processor within a workflow
+ *
+ * @param workflowID
+ * @param processorName
+ * @return a list of {@link Port} beans, each representing an input or
+ * output port for the input processor
+ */
+ @Override
+ public List<Port> getPortsForProcessor(String workflowID,
+ String processorName) {
+ return querier.getPortsForProcessor(workflowID, processorName);
+ }
+
+ // PM added 5/2010
+ @Override
+ public String getWorkflowNameByWorkflowID(String workflowID) {
+ return querier.getWorkflow(workflowID).getExternalName();
+ }
+
+ @Override
+ public WorkflowTree getWorkflowNestingStructure(String workflowID)
+ throws SQLException {
+ return querier.getWorkflowNestingStructure(workflowID);
+ }
+
+// public List<ProvenanceProcessor> getSuccessors(String workflowID, String processorName, String portName) {
+// return null; // TODO
+// }
+
+// public List<String> getActivities(String workflowID, String processorName) {
+// return null; // TODO
+// }
+
+// / configure provenance query functionality
+
+ /**
+ * include valus of output ports in the query result? input port values are
+ * always included<br>
+ * default is FALSE
+ */
+ @Override
+ public void toggleIncludeProcessorOutputs(boolean active) {
+ analyser.setReturnOutputs(active);
+ }
+
+ @Override
+ public boolean isIncludeProcessorOutputs() {
+ return analyser.isReturnOutputs();
+ }
+
+ /**
+ * @return an instance of {@link InvocationContext} that can be used by a
+ * client to deref a Taverna data reference
+ */
+ @Override
+ public InvocationContext getInvocationContext() {
+ return getProvenanceConnector().getInvocationContext();
+ }
+
+// / OPM management
+
+ /**
+ * should an OPM graph be generated in response to a query?<br>
+ * default is TRUE
+ */
+ @Override
+ public void toggleOPMGeneration(boolean active) {
+ analyser.setGenerateOPMGraph(active);
+ }
+
+ /**
+ *
+ * @return true if OPM is set to be generated in response to a query
+ */
+ @Override
+ public boolean isOPMGenerationActive() {
+ return analyser.isGenerateOPMGraph();
+ }
+
+ /**
+ * should actual artifact values be attached to OPM artifact nodes?<br>
+ * default is FALSE<br/>
+ * THIS IS CURRENTLY UNSUPPORTED -- DEFAULTS TO FALSE
+ *
+ * @param active
+ */
+ @Override
+ public void toggleAttachOPMArtifactValues(boolean active) {
+ analyser.setRecordArtifactValues(active);
+ }
+
+ /**
+ * @return true if the OPM graph artifacts are annotated with actual values
+ */
+ @Override
+ public boolean isAttachOPMArtifactValues() {
+ return analyser.isRecordArtifactValues();
+ }
+
+ /**
+ * @deprecated as workflow 'names' are not globally unique, this method
+ * should not be used!
+ * @param workflowName
+ * @return
+ */
+ @Override
+ public String getWorkflowIDForExternalName(String workflowName) {
+ return querier.getWorkflowIdForExternalName(workflowName);
+ }
+
+ @Override
+ public List<ProvenanceProcessor> getProcessorsForWorkflowID(
+ String workflowID) {
+ return querier.getProcessorsForWorkflow(workflowID);
+ }
+
+ /**
+ * @return the singleton {@link AbstractProvenanceConnector} used by the API
+ * to operate on the DB. Currently we support MySQL
+ * {@link MySQLProvenanceConnector} and Derby
+ * {@link DerbyProvenanceConnector} connectors. The set of supported
+ * connectors is extensible. The available connectors are discovered
+ * automatically by the API upon startup, and it includes all the
+ * connectors that are mentioned in the <dependencies> section of
+ * pom.xml for Maven module
+ * {@code net.sf.taverna.t2.core.provenanceconnector}
+ */
+ @Override
+ public AbstractProvenanceConnector getProvenanceConnector() {
+ return connector;
+ }
+
+ /**
+ * @param provenanceConnector
+ * a specific provenanceConnector used by the API
+ */
+ public void setProvenanceConnector(
+ AbstractProvenanceConnector provenanceConnector) {
+ this.connector = provenanceConnector;
+ }
+
+ /**
+ * @return
+ */
+ @Override
+ public ProvenanceAnalysis getAnalysis() {
+ return analyser;
+ }
+
+ /**
+ * @param pa
+ * the pa to set
+ */
+ public void setPa(ProvenanceAnalysis pa) {
+ this.analyser = pa;
+ }
+
+ /**
+ * @return the pq
+ */
+ @Override
+ public ProvenanceQuery getQuery() {
+ return querier;
+ }
+
+ /**
+ * @param pq
+ * the pq to set
+ */
+ public void setPq(ProvenanceQuery pq) {
+ this.querier = pq;
+ }
+
+ @Override
+ public List<ProcessorEnactment> getProcessorEnactments(
+ String workflowRunId, String... processorPath) {
+ return querier.getProcessorEnactments(workflowRunId, processorPath);
+ }
+
+ @Override
+ public ProcessorEnactment getProcessorEnactmentByProcessId(
+ String workflowRunId, String processIdentifier, String iteration) {
+ return querier.getProcessorEnactmentByProcessId(workflowRunId,
+ processIdentifier, iteration);
+ }
+
+ @Override
+ public ProcessorEnactment getProcessorEnactment(String processorEnactmentId) {
+ return querier.getProcessorEnactment(processorEnactmentId);
+ }
+
+ @Override
+ public ProvenanceProcessor getProvenanceProcessor(String workflowId,
+ String processorNameRef) {
+ return querier.getProvenanceProcessorByName(workflowId, processorNameRef);
+ }
+
+ @Override
+ public ProvenanceProcessor getProvenanceProcessor(String processorId) {
+ return querier.getProvenanceProcessorById(processorId);
+ }
+
+ @Override
+ public Map<Port, T2Reference> getDataBindings(String dataBindingId) {
+ Map<Port, T2Reference> references = new HashMap<>();
+ for (Entry<Port, String> entry : querier.getDataBindings(dataBindingId)
+ .entrySet())
+ references.put(entry.getKey(), getProvenanceConnector()
+ .getReferenceService()
+ .referenceFromString(entry.getValue()));
+ return references;
+ }
+
+ @Override
+ public DataflowInvocation getDataflowInvocation(String workflowRunId) {
+ return querier.getDataflowInvocation(workflowRunId);
+ }
+
+ @Override
+ public DataflowInvocation getDataflowInvocation(
+ ProcessorEnactment processorEnactment) {
+ return querier.getDataflowInvocation(processorEnactment);
+ }
+
+ @Override
+ public List<DataflowInvocation> getDataflowInvocations(String workflowRunId) {
+ return querier.getDataflowInvocations(workflowRunId);
+ }
+
+ @Override
+ public List<DataLink> getDataLinks(String workflowId) {
+ try {
+ Map<String, String> queryConstraints = new HashMap<>();
+ queryConstraints.put("workflowId", workflowId);
+ return querier.getDataLinks(queryConstraints);
+ } catch (SQLException e) {
+ logger.error(
+ "Problem getting datalinks for workflow:" + workflowId, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java
new file mode 100644
index 0000000..4fb61dc
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/ProvenanceConnectorType.java
@@ -0,0 +1,16 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.taverna.provenance.api;
+
+/**
+ * Defines names for the common Provenance Connector types
+ *
+ * @author Stuart Owen
+ */
+public class ProvenanceConnectorType {
+ public static final String MYSQL = "mysql";
+ public static final String DERBY = "derby";
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java
new file mode 100644
index 0000000..1c3227a
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/Query.java
@@ -0,0 +1,105 @@
+/**
+ *
+ */
+package org.apache.taverna.provenance.api;
+
+import java.util.List;
+
+import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor;
+import org.apache.taverna.provenance.lineageservice.utils.QueryPort;
+
+/**
+ * Bean encapsulating one provenance query, consisting of the following
+ * elements:
+ * <ul>
+ * <li>static scope: the (single) name of the workflow whose run(s) are queried
+ * <li>dynamic scope: a list of workflow run IDs.
+ * <li>a list of <select> variables, encoded as List<{@link QueryPort}>
+ * <li>a list of <target> processors, encoded as List<
+ * {@link ProvenanceProcessor}>
+ * </ul>
+ *
+ * @author Paolo Missier
+ */
+public class Query {
+ private String workflowName;
+ private List<QueryPort> targetPorts;
+ private List<String> runIDList;
+ private List<ProvenanceProcessor> selectedProcessors;
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n **** QUERY SCOPE: ****\n").append("\tworkflow name: ")
+ .append(getWorkflowName()).append("\n\truns: ");
+ for (String r : getRunIDList())
+ sb.append("\n\t").append(r);
+ sb.append("\n**** TARGET PORTS: ****\n");
+ for (QueryPort v : getTargetPorts())
+ sb.append("\n\t").append(v);
+ sb.append("\n\n**** SELECTED PROCESSORS: **** ");
+ for (ProvenanceProcessor pp : getSelectedProcessors())
+ sb.append("\n\t").append(pp);
+ return sb.toString();
+ }
+
+ /**
+ * @return the targetVars
+ */
+ public List<QueryPort> getTargetPorts() {
+ return targetPorts;
+ }
+
+ /**
+ * @param targetVars
+ * the targetVars to set
+ */
+ public void setTargetPorts(List<QueryPort> targetVars) {
+ this.targetPorts = targetVars;
+ }
+
+ /**
+ * @return the selectedProcessors
+ */
+ public List<ProvenanceProcessor> getSelectedProcessors() {
+ return selectedProcessors;
+ }
+
+ /**
+ * @param selectedProcessors
+ * the selectedProcessors to set
+ */
+ public void setFocus(List<ProvenanceProcessor> selectedProcessors) {
+ this.selectedProcessors = selectedProcessors;
+ }
+
+ /**
+ * @return the runIDList
+ */
+ public List<String> getRunIDList() {
+ return runIDList;
+ }
+
+ /**
+ * @param runIDList
+ * the runIDList to set
+ */
+ public void setRunIDList(List<String> runIDList) {
+ this.runIDList = runIDList;
+ }
+
+ /**
+ * @return the workflowName
+ */
+ public String getWorkflowName() {
+ return workflowName;
+ }
+
+ /**
+ * @param workflowName
+ * the workflowName to set
+ */
+ public void setWorkflowName(String workflowName) {
+ this.workflowName = workflowName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java
new file mode 100644
index 0000000..e57b0b9
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/api/QueryAnswer.java
@@ -0,0 +1,47 @@
+/**
+ *
+ */
+package org.apache.taverna.provenance.api;
+
+/**
+ * Encapsulates a native Java data structure as a well as a String that holds
+ * the OPM graph that represents the query answer
+ *
+ * @author Paolo Missier
+ *
+ */
+public class QueryAnswer {
+ private NativeAnswer nativeAnswer;
+ private String _OPMAnswer_AsRDF;
+
+ /**
+ * @return the native Java part of the query answer
+ */
+ public NativeAnswer getNativeAnswer() {
+ return nativeAnswer;
+ }
+
+ /**
+ * @param sets
+ * the query answer
+ */
+ public void setNativeAnswer(NativeAnswer a) {
+ this.nativeAnswer = a;
+ }
+
+ /**
+ * @return the OPM graph as RDF/XML string, or null if OPM was inhibited
+ * {@see OPM.computeGraph in APIClient.properties}
+ */
+ public String getOPMAnswer_AsRDF() {
+ return _OPMAnswer_AsRDF;
+ }
+
+ /**
+ * @param set
+ * the OPM graph as RDF/XML string
+ */
+ public void setOPMAnswer_AsRDF(String asRDF) {
+ _OPMAnswer_AsRDF = asRDF;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java
new file mode 100644
index 0000000..dd6b488
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/AbstractProvenanceConnector.java
@@ -0,0 +1,646 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.connector;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import org.apache.taverna.invocation.InvocationContext;
+import net.sf.taverna.t2.provenance.item.ProvenanceItem;
+import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
+import org.apache.taverna.provenance.lineageservice.EventProcessor;
+import org.apache.taverna.provenance.lineageservice.LineageQueryResultRecord;
+import org.apache.taverna.provenance.lineageservice.Provenance;
+import org.apache.taverna.provenance.lineageservice.ProvenanceAnalysis;
+import org.apache.taverna.provenance.lineageservice.ProvenanceQuery;
+import org.apache.taverna.provenance.lineageservice.ProvenanceWriter;
+import org.apache.taverna.provenance.lineageservice.WorkflowDataProcessor;
+import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
+import org.apache.taverna.reference.ReferenceService;
+
+import org.apache.log4j.Logger;
+
+import org.apache.taverna.configuration.database.DatabaseManager;
+
+/**
+ * Collects {@link ProvenanceItem}s as it travels up and down the dispatch stack
+ * inside the InvocationContext
+ *
+ * @author Ian Dunlop
+ * @author Stuart Owen
+ *
+ */
+public abstract class AbstractProvenanceConnector implements ProvenanceReporter {
+
+ public static enum ActivityTable {
+ Activity, activityId, activityDefinition, workflowId;
+
+ public static String getCreateTable() {
+ return "CREATE TABLE " + Activity + "(\n"
+ + activityId + " varchar(36) NOT NULL,\n"
+ + activityDefinition + " blob NOT NULL,\n"
+ + workflowId + " varchar(100) NOT NULL, \n"
+ + "PRIMARY KEY (" + activityId + ")\n" + ")";
+ }
+ }
+
+ public static enum CollectionTable {
+ Collection, collID, parentCollIDRef, workflowRunId, processorNameRef, portName, iteration;
+ public static String getCreateTable() {
+ return "CREATE TABLE " + Collection + " (\n"
+ + collID + " varchar(100) NOT NULL,\n"
+ + parentCollIDRef + " varchar(100) NOT NULL ,\n"
+ + workflowRunId + " varchar(36) NOT NULL,\n"
+ + processorNameRef + " varchar(100) NOT NULL,\n"
+ + portName + " varchar(100) NOT NULL,\n"
+ + iteration + " varchar(2000) NOT NULL default '',\n"
+ + " PRIMARY KEY (\n"
+ + collID + "," + workflowRunId + "," + processorNameRef
+ + "," + portName + "," + parentCollIDRef + "," + iteration
+ + "))";
+ }
+ }
+
+ public static enum DataBindingTable {
+ DataBinding, dataBindingId, portId, t2Reference, workflowRunId;
+
+ public static String getCreateTable() {
+ return "CREATE TABLE " + DataBinding + "(\n"
+ + dataBindingId + " varchar(36) NOT NULL,\n"
+ + portId + " varchar(36) NOT NULL,\n"
+ + t2Reference + " varchar(100) NOT NULL,\n"
+ + workflowRunId + " varchar(100) NOT NULL, \n"
+ + "PRIMARY KEY (" + dataBindingId + "," + portId + ")\n" + ")";
+ }
+ }
+
+ public static enum DataflowInvocationTable {
+ DataflowInvocation, dataflowInvocationId,
+ workflowId,
+ invocationStarted, invocationEnded,
+ inputsDataBinding, outputsDataBinding,
+ parentProcessorEnactmentId, workflowRunId, completed;
+
+ public static String getCreateTable() {
+ return "CREATE TABLE " + DataflowInvocation + "(\n"
+ + dataflowInvocationId + " varchar(36) NOT NULL,\n"
+ + workflowId + " varchar(100) NOT NULL, \n"
+ + invocationStarted + " timestamp, \n"
+ + invocationEnded + " timestamp, \n"
+ + inputsDataBinding + " varchar(36),\n"
+ + outputsDataBinding + " varchar(36),\n"
+ + parentProcessorEnactmentId + " varchar(36), \n"
+ + workflowRunId + " varchar(100) NOT NULL, \n"
+ + completed + " smallint NOT NULL,\n"
+ + "PRIMARY KEY (" + dataflowInvocationId+ ")\n" + ")";
+ }
+ }
+
+ public static enum DataLinkTable {
+ Datalink, sourcePortName, sourcePortId, destinationPortId,
+ destinationPortName, sourceProcessorName, destinationProcessorName, workflowId;
+ public static String getCreateTable() {
+ return "CREATE TABLE " + Datalink + " (\n"
+ + sourcePortName + " varchar(100) NOT NULL ,\n"
+ + sourcePortId + " varchar(36) NOT NULL ,\n"
+ + destinationPortId + " varchar(36) NOT NULL ,\n"
+ + destinationPortName + " varchar(100) NOT NULL,\n"
+ + sourceProcessorName + " varchar(100) NOT NULL,\n"
+ + destinationProcessorName + " varchar(100) NOT NULL,\n"
+ + workflowId + " varchar(36) NOT NULL,"
+ + " PRIMARY KEY ("
+ + sourcePortId + "," + destinationPortId + "," + workflowId
+ + "))";
+ }
+ }
+
+ public static enum PortBindingTable {
+ PortBinding, portName, workflowRunId, value, collIDRef, positionInColl, processorNameRef, valueType, ref, iteration, workflowId;
+ public static String getCreateTable() {
+ return "CREATE TABLE " + PortBinding + " (\n"
+ + portName + " varchar(100) NOT NULL,\n"
+ + workflowRunId + " varchar(100) NOT NULL,\n"
+ + value + " varchar(100) default NULL,\n"
+ + collIDRef + " varchar(100),\n"
+ + positionInColl + " int NOT NULL,\n"
+ + processorNameRef + " varchar(100) NOT NULL,\n"
+ + valueType + " varchar(50) default NULL,\n"
+ + ref + " varchar(100) default NULL,\n"
+ + iteration + " varchar(2000) NOT NULL,\n"
+ + workflowId + " varchar(36),\n"
+ + "PRIMARY KEY (\n"
+ + portName + "," + workflowRunId + ","
+ + processorNameRef + "," + iteration + ", " + workflowId
+ + "))";
+ }
+ }
+
+ public static enum PortTable {
+ Port, portId, processorId, portName, isInputPort, processorName,
+ workflowId, depth, resolvedDepth, iterationStrategyOrder;
+ public static String getCreateTable() {
+ return "CREATE TABLE " + Port + " (\n"
+ + portId + " varchar(36) NOT NULL,\n"
+ + processorId + " varchar(36),\n"
+ + portName + " varchar(100) NOT NULL,\n"
+ + isInputPort + " smallint NOT NULL ,\n"
+ + processorName + " varchar(100) NOT NULL,\n"
+ + workflowId + " varchar(36) NOT NULL,\n"
+ + depth + " int,\n"
+ + resolvedDepth + " int,\n"
+ + iterationStrategyOrder + " smallint, \n"
+ + "PRIMARY KEY (" + "portId" + "),\n"
+ + "CONSTRAINT port_constraint UNIQUE (\n"
+ + portName + "," + isInputPort + "," + processorName + "," + workflowId + "\n"
+ + "))";
+ }
+ }
+
+ public static enum ProcessorEnactmentTable {
+ ProcessorEnactment, processEnactmentId, workflowRunId, processorId,
+ processIdentifier, iteration, parentProcessorEnactmentId,
+ enactmentStarted, enactmentEnded, initialInputsDataBindingId,
+ finalOutputsDataBindingId;
+
+ public static String getCreateTable() {
+ return "CREATE TABLE " + ProcessorEnactment + " (\n"
+ + processEnactmentId + " varchar(36) NOT NULL, \n"
+ + workflowRunId + " varchar(100) NOT NULL, \n"
+ + processorId + " varchar(36) NOT NULL, \n"
+ + processIdentifier + " varchar(2047) NOT NULL, \n"
+ + iteration + " varchar(100) NOT NULL, \n"
+ + parentProcessorEnactmentId + " varchar(36), \n"
+ + enactmentStarted + " timestamp, \n"
+ + enactmentEnded + " timestamp, \n"
+ + initialInputsDataBindingId + " varchar(36), \n"
+ + finalOutputsDataBindingId + " varchar(36), \n"
+ + " PRIMARY KEY (" + processEnactmentId + ")" + ")";
+ }
+ }
+
+ public static enum ProcessorTable {
+ Processor,processorId, processorName,workflowId,firstActivityClass,isTopLevel ;
+ public static String getCreateTable() {
+ return "CREATE TABLE "+ Processor +" (\n"
+ + processorId + " varchar(36) NOT NULL,\n"
+ + processorName + " varchar(100) NOT NULL,\n"
+ + workflowId + " varchar(36) NOT NULL ,\n\n"
+ + firstActivityClass + " varchar(100) default NULL,\n"
+ + isTopLevel + " smallint, \n"
+ + "PRIMARY KEY (" + processorId+ "),\n"
+ + "CONSTRAINT processor_constraint UNIQUE (\n"
+ + processorName + "," + workflowId + "))";
+ }
+ }
+
+ public static enum ServiceInvocationTable {
+ ServiceInvocation, processorEnactmentId, workflowRunId,
+ invocationNumber, invocationStarted, invocationEnded,
+ inputsDataBinding, outputsDataBinding, failureT2Reference,
+ activityId, initiatingDispatchLayer, finalDispatchLayer;
+
+ public static String getCreateTable() {
+ return "CREATE TABLE " + ServiceInvocation + "(\n"
+ + processorEnactmentId + " varchar(36) NOT NULL,\n"
+ + workflowRunId + " varchar(100) NOT NULL, \n"
+ + invocationNumber + " bigint NOT NULL,\n"
+ + invocationStarted + " timestamp, \n"
+ + invocationEnded + " timestamp, \n"
+ + inputsDataBinding + " varchar(36),\n"
+ + outputsDataBinding + " varchar(36),\n"
+ + failureT2Reference + " varchar(100) default NULL,\n"
+ + activityId + " varchar(36),\n"
+ + initiatingDispatchLayer + " varchar(250) NOT NULL,\n"
+ + finalDispatchLayer + " varchar(250) NOT NULL,\n"
+ + "PRIMARY KEY (" + processorEnactmentId + ", "
+ + invocationNumber + "))";
+ }
+ }
+
+ public static enum WorkflowRunTable {
+ WorkflowRun, workflowRunId, workflowId, timestamp;
+ public static String getCreateTable() {
+ return "CREATE TABLE " + WorkflowRun + " (\n"
+ + workflowRunId + " varchar(36) NOT NULL,\n"
+ + workflowId + " varchar(36) NOT NULL,\n"
+ + timestamp + " timestamp NOT NULL default CURRENT_TIMESTAMP,\n"
+ + " PRIMARY KEY (" + workflowRunId + ", " + workflowId + "))";
+ }
+ }
+
+ public static enum WorkflowTable {
+ WorkflowTable, workflowId, parentWorkflowId, externalName, dataflow;
+ public static String getCreateTable() {
+ return "CREATE TABLE " + "Workflow (\n" +
+ workflowId + " varchar(36) NOT NULL,\n"
+ + parentWorkflowId + " varchar(100),\n"
+ + externalName + " varchar(100),\n"
+ + dataflow + " blob, \n"
+ + "PRIMARY KEY (" + workflowId + "))";
+ }
+ }
+
+ private static Logger logger = Logger.getLogger(AbstractProvenanceConnector.class);
+ private String saveEvents;
+ private ProvenanceAnalysis provenanceAnalysis;
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private boolean finished = false;
+ private String sessionID;
+ private InvocationContext invocationContext;
+ private ReferenceService referenceService;
+
+ private Provenance provenance;
+ private ProvenanceWriter writer;
+ private ProvenanceQuery query;
+ private WorkflowDataProcessor wfdp;
+ private EventProcessor eventProcessor;
+ private final DatabaseManager databaseManager;
+
+ public AbstractProvenanceConnector(DatabaseManager databaseManager) {
+ this.databaseManager = databaseManager;
+ }
+
+ /**
+ * Set up the the {@link EventProcessor}, {@link ProvenanceWriter} &
+ * {@link ProvenanceQuery}. Since it is an SPI you don't want any code
+ * cluttering the default constructor. Call this method after instantiation
+ * and after the dbURL has been set.
+ */
+ public void init() {
+ createDatabase();
+ try {
+ setWfdp(new WorkflowDataProcessor());
+ getWfdp().setPq(getQuery());
+ getWfdp().setPw(getWriter());
+
+ setEventProcessor(new EventProcessor());
+ getEventProcessor().setPw(getWriter());
+ getEventProcessor().setPq(getQuery());
+ getEventProcessor().setWfdp(getWfdp());
+
+ setProvenanceAnalysis(new ProvenanceAnalysis(getQuery()));
+ setProvenance(new Provenance(getEventProcessor()));
+ } catch (InstantiationException | IllegalAccessException
+ | ClassNotFoundException | SQLException e) {
+ logger.error("Problem with provenance initialisation: ", e);
+ }
+ }
+
+ /**
+ * @return the invocationContext
+ */
+ @Override
+ public InvocationContext getInvocationContext() {
+ return invocationContext;
+ }
+
+ /**
+ * @param invocationContext
+ * the invocationContext to set
+ */
+ @Override
+ public void setInvocationContext(InvocationContext invocationContext) {
+ this.invocationContext = invocationContext;
+ }
+
+ /**
+ * @return the referenceService
+ */
+ @Override
+ public ReferenceService getReferenceService() {
+ return referenceService;
+ }
+
+ /**
+ * @param referenceService
+ * the referenceService to set
+ */
+ @Override
+ public void setReferenceService(ReferenceService referenceService) {
+ this.referenceService = referenceService;
+ }
+
+ /**
+ * Uses a {@link ScheduledThreadPoolExecutor} to process events in a Thread
+ * safe manner
+ */
+ @Override
+ public synchronized void addProvenanceItem(
+ final ProvenanceItem provenanceItem) {
+
+// Runnable runnable = new Runnable() {
+//
+// public void run() {
+ try {
+
+ getProvenance().acceptRawProvenanceEvent(
+ provenanceItem.getEventType(), provenanceItem);
+
+ } catch (SQLException e) {
+ logger.warn("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier(), e);
+ } catch (IOException e) {
+ logger.error("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier(), e);
+ } catch (RuntimeException e) {
+ logger.error("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier(), e);
+ }
+//
+// }
+// };
+// getExecutor().execute(runnable);
+
+ }
+
+ protected Connection getConnection() throws SQLException {
+ return databaseManager.getConnection();
+ }
+
+ /**
+ * Used by database backed provenance stores. Ask the implementation to
+ * create the database. Requires each datbase type to create all its own
+ * tables
+ */
+ public abstract void createDatabase();
+
+
+ public void clearDatabase() { clearDatabase(true); }
+
+ /**
+ * Clear all the values in the database but keep the db there
+ */
+ public void clearDatabase(boolean isClearDB) {
+ if (isClearDB) {
+ logger.info("clearing DB");
+ try {
+ getWriter().clearDBStatic();
+
+ Set<String> danglingDataRefs = getWriter().clearDBDynamic();
+
+ logger.info("references collected during removeRun:");
+ for (String s : danglingDataRefs)
+ logger.info(s);
+ } catch (SQLException e) {
+ logger.error("Problem clearing database", e);
+ }
+ } else {
+ logger.error("clearDB is FALSE: not clearing");
+ }
+
+// String q = null;
+// Connection connection = null;
+
+// Statement stmt = null;
+// try {
+// connection = getConnection();
+// stmt = connection.createStatement();
+// } catch (SQLException e) {
+// logger.warn("Could not create database statement :" + e);
+// } catch (InstantiationException e) {
+// logger.warn("Could not create database statement :" + e);
+// } catch (IllegalAccessException e) {
+// logger.warn("Could not create database statement :" + e);
+// } catch (ClassNotFoundException e) {
+// logger.warn("Could not create database statement :" + e);
+// }
+
+// q = "DELETE FROM Workflow";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+// q = "DELETE FROM Processor";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+// q = "DELETE FROM Datalink";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+// q = "DELETE FROM Port";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+// q = "DELETE FROM WorkflowRun";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+// q = "DELETE FROM ProcBinding";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+// q = "DELETE FROM PortBinding";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+// q = "DELETE FROM Collection";
+// try {
+// stmt.executeUpdate(q);
+// } catch (SQLException e) {
+// logger.warn("Could not execute statement " + q + " :" + e);
+// }
+
+
+// if (connection!=null) try {
+// connection.close();
+// } catch (SQLException ex) {
+// logger.error("Error closing connection",ex);
+// }
+ }
+
+ /**
+ * The name for this type of provenance connector. Is used by the workbench
+ * to ensure it adds the correct one to the InvocationContext
+ *
+ * @return
+ */
+ public abstract String getName();
+
+ /**
+ * A unique identifier for this run of provenance, should correspond to the
+ * initial {@link WorkflowProvenanceItem} idenifier that gets sent through
+ *
+ * @param identifier
+ */
+ @Override
+ public void setSessionID(String sessionID) {
+ this.sessionID = sessionID;
+ }
+
+ /**
+ * What is the unique identifier used by this connector
+ *
+ * @return
+ */
+ @Override
+ public String getSessionID() {
+ return sessionID;
+ }
+
+
+ public List<LineageQueryResultRecord> computeLineage(String workflowRun,
+ String port, String proc, String path, Set<String> selectedProcessors) {
+ return null;
+ }
+
+ public String getDataflowInstance(String dataflowId) {
+ try {
+ return (getProvenance()).getPq().getRuns(dataflowId, null).get(0)
+ .getWorkflowRunId();
+ } catch (SQLException e) {
+ logger.error("Error finding the dataflow instance", e);
+ return null;
+ }
+ }
+
+ /**
+ * @return the saveEvents
+ */
+ public String getSaveEvents() {
+ return saveEvents;
+ }
+
+ /**
+ * @param saveEvents
+ * the saveEvents to set
+ */
+ public void setSaveEvents(String saveEvents) {
+ this.saveEvents = saveEvents;
+ }
+
+ public void setProvenance(Provenance provenance) {
+ this.provenance = provenance;
+ }
+
+ public Provenance getProvenance() {
+ return provenance;
+ }
+
+ public void setFinished(boolean finished) {
+ this.finished = finished;
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ public synchronized ExecutorService getExecutor() {
+ return executor;
+ }
+
+ public void setProvenanceAnalysis(ProvenanceAnalysis provenanceAnalysis) {
+ this.provenanceAnalysis = provenanceAnalysis;
+ }
+
+ /**
+ * Use this {@link ProvenanceAnalysis} to carry out lineage queries on the
+ * provenance
+ *
+ * @return
+ */
+ public ProvenanceAnalysis getProvenanceAnalysis() {
+ return provenanceAnalysis;
+ }
+
+ /**
+ * @return the writer
+ */
+ public ProvenanceWriter getWriter() {
+ return writer;
+ }
+
+ /**
+ * @param writer the writer to set
+ */
+ protected void setWriter(ProvenanceWriter writer) {
+ this.writer = writer;
+ }
+
+ /**
+ * @return the query
+ */
+ public ProvenanceQuery getQuery() {
+ return query;
+ }
+
+ /**
+ * @param query the query to set
+ */
+ protected void setQuery(ProvenanceQuery query) {
+ this.query = query;
+ }
+
+ /**
+ * @return the wfdp
+ */
+ public WorkflowDataProcessor getWfdp() {
+ return wfdp;
+ }
+
+ /**
+ * @param wfdp the wfdp to set
+ */
+ public void setWfdp(WorkflowDataProcessor wfdp) {
+ this.wfdp = wfdp;
+ }
+
+ /**
+ * @return the eventProcessor
+ */
+ public EventProcessor getEventProcessor() {
+ return eventProcessor;
+ }
+
+ /**
+ * @param eventProcessor the eventProcessor to set
+ */
+ public void setEventProcessor(EventProcessor eventProcessor) {
+ this.eventProcessor = eventProcessor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java
new file mode 100644
index 0000000..6e78c0d
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/connector/ProvenanceSQL.java
@@ -0,0 +1,5 @@
+package org.apache.taverna.provenance.connector;
+
+public class ProvenanceSQL {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java
new file mode 100644
index 0000000..193d0e0
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/AnnotationsLoader.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+
+/**
+ * @author Paolo Missier
+ *
+ */
+public class AnnotationsLoader {
+ private static Logger logger = Logger.getLogger(AnnotationsLoader.class);
+
+ /**
+ * @param annotationFile
+ * by convention we use <workflow file name>+"annotations"
+ * @return a map pname -> annotation so that the lineage query alg can use
+ * the annotation when processing pname
+ */
+ @SuppressWarnings("unchecked")
+ public Map<String,List<String>> getAnnotations(String annotationFile) {
+ Map<String, List<String>> procAnnotations = new HashMap<>();
+
+ // load XML file as doc
+// parse the event into DOM
+ SAXBuilder b = new SAXBuilder();
+
+ try {
+ Document d = b.build(new FileReader(annotationFile));
+ if (d == null)
+ return null;
+
+ // look for all processor elements
+ for (Element el : (List<Element>) d.getRootElement().getChildren()) {
+ String pName = el.getAttributeValue("name");
+ logger.info("processor name: " + pName);
+
+ List<String> annotations = new ArrayList<>();
+ // extract all annotations for this pname
+
+ for (Element annotElement : (List<Element>) el.getChildren()) {
+ String annot = annotElement.getAttributeValue("type");
+ logger.info("annotation: " + annot);
+
+ // add this annotation
+ annotations.add(annot);
+ }
+
+ procAnnotations.put(pName, annotations);
+ }
+ } catch (JDOMException | IOException e) {
+ logger.error("Problem getting annotations from: " + annotationFile,
+ e);
+ }
+ return procAnnotations;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/550b5992/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java
new file mode 100644
index 0000000..f06de95
--- /dev/null
+++ b/taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/Dependencies.java
@@ -0,0 +1,127 @@
+/*******************************************************************************
+ * Copyright (C) 2007 The University of Manchester
+ *
+ * Modifications to the initial code base are copyright of their
+ * respective authors, or their employers as appropriate.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ ******************************************************************************/
+package org.apache.taverna.provenance.lineageservice;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * Java bean to hold a list of {@link LineageQueryResultRecord}, representing
+ * one record in the provenance DB at the finest possible level of granularity,
+ * i.e., a single value possibly within a collection, bound to a processor port
+ * and associated to a specific run of a specific workflow.
+ *
+ * @author Paolo Missier
+ * @see LineageQueryResultRecord
+ */
+public class Dependencies {
+ final public static String COLL_TYPE = "referenceSetCollection";
+ final public static String ATOM_TYPE = "referenceSet";
+
+ boolean printResolvedValue;
+
+ private List<LineageQueryResultRecord> records = new ArrayList<>();
+
+ public ListIterator<LineageQueryResultRecord> iterator() {
+ return getRecords().listIterator();
+ }
+
+ /**
+ * adds a single record to the list of dependencies
+ *
+ * @param workflowId
+ * @param pname
+ * @param vname
+ * @param workflowRun
+ * @param iteration
+ * @param collIdRef
+ * @param parentCollIDRef
+ * @param value
+ * @param resolvedValue
+ * @param type
+ * @param isInput
+ * @param isCollection
+ */
+ public void addLineageQueryResultRecord(String workflowId, String pname,
+ String vname, String workflowRun, String iteration,
+ String collIdRef, String parentCollIDRef, String value,
+ String resolvedValue, String type, boolean isInput,
+ boolean isCollection) {
+
+ LineageQueryResultRecord record = new LineageQueryResultRecord();
+
+ record.setWorkflowId(workflowId);
+ record.setWorkflowRunId(workflowRun);
+ record.setProcessorName(pname);
+ record.setValue(value);
+ record.setPortName(vname);
+ record.setIteration(iteration);
+ record.setResolvedValue(resolvedValue);
+ record.setIsInputPort(isInput);
+ record.setCollectionT2Reference(collIdRef);
+ record.setParentCollIDRef(parentCollIDRef);
+ record.setCollection(isCollection);
+
+ getRecords().add(record);
+ }
+
+ /**
+ * populates the bean with an entire list of {@link LineageQueryResultRecord} elements
+ * @param records
+ */
+ public void setRecords(List<LineageQueryResultRecord> records) {
+ this.records = records;
+ }
+
+ /**
+ * @return the entire set of records
+ */
+ public List<LineageQueryResultRecord> getRecords() {
+ return records;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (LineageQueryResultRecord record : getRecords()) {
+ record.setPrintResolvedValue(printResolvedValue);
+ sb.append("*** record: ****\n"+record.toString());
+ }
+ return sb.toString();
+ }
+
+ /**
+ * @return true is the records contain the actual values, in addition to the URI references to the values
+ * <br/>NOT YET SUPPORTED. This switch is currently ignored and no values are returned in the current version
+ */
+ public boolean isPrintResolvedValue() {
+ return printResolvedValue;
+ }
+
+ /**
+ * @param toggles insertion of values in addition to references to values in the records
+ * <br/>NOT YET SUPPORTED. This switch is currently ignored and no values are returned in the current version
+ */
+ public void setPrintResolvedValue(boolean printResolvedValue) {
+ this.printResolvedValue = printResolvedValue;
+ }
+}