You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@taverna.apache.org by st...@apache.org on 2015/02/17 21:43:18 UTC
[14/51] [partial] incubator-taverna-engine git commit: temporarily
empty repository
http://git-wip-us.apache.org/repos/asf/incubator-taverna-engine/blob/3ecb1291/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/ProvenanceAnalysis.java
----------------------------------------------------------------------
diff --git a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/ProvenanceAnalysis.java b/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/ProvenanceAnalysis.java
deleted file mode 100644
index 990ca30..0000000
--- a/taverna-provenanceconnector/src/main/java/net/sf/taverna/t2/provenance/lineageservice/ProvenanceAnalysis.java
+++ /dev/null
@@ -1,1200 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2007 The University of Manchester
- *
- * Modifications to the initial code base are copyright of their
- * respective authors, or their employers as appropriate.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- ******************************************************************************/
-package net.sf.taverna.t2.provenance.lineageservice;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import net.sf.taverna.t2.invocation.InvocationContext;
-import net.sf.taverna.t2.provenance.api.NativeAnswer;
-import net.sf.taverna.t2.provenance.api.QueryAnswer;
-import net.sf.taverna.t2.provenance.lineageservice.utils.DataLink;
-import net.sf.taverna.t2.provenance.lineageservice.utils.Port;
-import net.sf.taverna.t2.provenance.lineageservice.utils.PortBinding;
-import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor;
-import net.sf.taverna.t2.provenance.lineageservice.utils.QueryPort;
-import net.sf.taverna.t2.provenance.lineageservice.utils.WorkflowRun;
-import net.sf.taverna.t2.provenance.opm.OPMManager;
-import net.sf.taverna.t2.reference.T2Reference;
-
-import org.apache.log4j.Logger;
-import org.tupeloproject.kernel.OperatorException;
-import org.tupeloproject.provenance.ProvenanceArtifact;
-import org.tupeloproject.provenance.ProvenanceException;
-import org.tupeloproject.provenance.ProvenanceRole;
-
-/**
- * The main class for querying the lineage DB.
- * Assumes a provenance DB ready to be queried.
- * @author paolo
- */
-public class ProvenanceAnalysis {
-
- private static Logger logger = Logger.getLogger(ProvenanceAnalysis.class);
-
- private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_";
- public static final String ALL_PATHS_KEYWORD = "ALL";
-
- private ProvenanceQuery pq = null;
- private AnnotationsLoader al = new AnnotationsLoader(); // FIXME singleton
-
- // paths collected by lineageQuery and to be used by naive provenance query
- private Map<ProvenanceProcessor, List<List<String>>> validPaths = new HashMap<>();
-
- private List<String> currentPath;
- private Map<String,List<String>> annotations = null; // user-made annotations to processors
-
- private boolean ready = false; // set to true as soon as init succeeds. this means pa is ready to answer queries
-
- private boolean returnOutputs = false; // by default only return input bindings
-
- private boolean includeDataValue = false; // forces the lineage queries to return de-referenced data values
-
- private boolean generateOPMGraph = true;
-
- // TODO extract this to prefs -- selects which OPMManager is to be used to export to OPM
- private String OPMManagerClass = "net.sf.taverna.t2.provenance.lineageservice.ext.pc3.PANSTARRSOPMManager";
-
- private OPMManager aOPMManager = null;
-
- private boolean recordArtifactValues = false;
-
- private InvocationContext ic = null;
-
- public ProvenanceAnalysis() {
- }
-
- public ProvenanceAnalysis(ProvenanceQuery pq)
- throws InstantiationException, IllegalAccessException,
- ClassNotFoundException, SQLException {
- this.pq = pq;
- setReady(tryInit());
- }
-
- private boolean tryInit() throws SQLException {
- List<?> wris = getWorkflowRunIds();
- if (wris != null && !wris.isEmpty()) {
- initGraph(); // init OPM provenance graph
- return true;
- } else
- return false;
- }
-
- /**
- * Call to create the opm graph and annotation loader. this may fail due to
- * queries being issued before DB is populated, minimally with workflowRunId
- */
- public void initGraph() {
- // OPM management
- try {
- aOPMManager = (OPMManager) Class.forName(OPMManagerClass).newInstance();
- } catch (InstantiationException e1) {
- logger.error("Problem initialising opm graph: ", e1);
- } catch (IllegalAccessException e1) {
- logger.error("Problem initialising opm graph: ", e1);
- } catch (ClassNotFoundException e1) {
- logger.info("chosen OPMmanager: "+OPMManagerClass+" not available, reverting to default");
- aOPMManager = new OPMManager();
- }
-
- try {
- aOPMManager.createAccount(getWorkflowRunIds().get(0).getWorkflowRunId());
- } catch (SQLException e) {
- logger.error("Could not initialise OPM Manager: ", e);
- } catch (ProvenanceException e) {
- logger.warn("Could not add account", e);
- }
- }
-
-
- /**
- * asks the OPM manager to convert its current RDF OPMGraph to XML
- * @return the filename of the OPM XML file
- * @throws OperatorException
- * @throws IOException
- * @throws JAXBException
- */
- // public String OPMRdf2Xml() throws OperatorException, IOException, JAXBException {
- // if (isReady()) {
- // return aOPMManager.Rdf2Xml();
- // }
- // return null;
- // }
-
- /**
- * asks the OPM manager to create a dot file representing its current RDF OPMGraph<br/>
- * needs fixing
- * @return
- * @throws IOException
- * @throws OperatorException
- */
- /*
- public String OPMRdf2Dot() throws OperatorException, IOException {
- if (aOPMManager != null && aOPMManager.isActive() && isReady()) {
- return aOPMManager.Rdf2Dot();
- }
- return null;
- }
- */
-
- public void setAnnotationFile(String annotationFile) {
- annotations = al.getAnnotations(annotationFile);
- if (annotations == null) {
- logger.warn("no annotations have been loaded");
- return;
- }
-
- logger.info("processor annotations for lineage refinement: ");
- for (Map.Entry<String, List<String>> entry : annotations.entrySet()) {
- logger.info("annotations for proc " + entry.getKey());
- for (String ann : entry.getValue())
- logger.info(ann);
- }
- }
-
- /**
- * returns all available instances across all workflows
- *
- * @return
- * @throws SQLException
- */
- public List<WorkflowRun> getWorkflowRunIds() throws SQLException {
- return getPq().getRuns(null, null);
- }
-
- /**
- * returns all available instances for workflow workflowId
- *
- * @param workflowId
- * @return
- * @throws SQLException
- */
- public List<WorkflowRun> getWorkflowRunsForWorkflow(String workflowId)
- throws SQLException {
- return getPq().getRuns(workflowId, null);
- }
-
- /**
- * @param workflowRun
- * lineage scope -- a specific instance
- * @param pname
- * for a specific processor [required]
- * @param a
- * specific (input or output) variable [optional]
- * @param iteration
- * and a specific iteration [optional]
- * @param workflowId
- * @return a lineage query ready to be executed, or null if we cannot return
- * an answer because we are not ready (for instance the DB is not
- * yet populated)
- * @throws SQLException
- */
- public Dependencies fetchIntermediateResult(String workflowRun,
- String workflowId, String pname, String vname, String iteration)
- throws SQLException {
- if (!isReady()) {
- setReady(tryInit());
- if (!isReady())
- return null;
- }
-
- LineageSQLQuery lq = getPq().simpleLineageQuery(workflowRun,
- workflowId, pname, vname, iteration);
-
- return getPq().runLineageQuery(lq, isIncludeDataValue());
- }
-
- public QueryAnswer lineageQuery(List<QueryPort> qvList, String workflowRun,
- List<ProvenanceProcessor> selectedProcessors) throws SQLException {
- QueryAnswer completeAnswer = new QueryAnswer();
- NativeAnswer nativeAnswer = new NativeAnswer();
-
- Map<QueryPort, Map<String, List<Dependencies>>> answerContent = new HashMap<>();
-
- // launch a lineage query for each target variable
- for (QueryPort qv : qvList) {
-
- // full lineage query
- logger.info(String.format(
- "************\n lineage query: [instance, workflow, proc, port, path] = "
- + "[%s,%s,%s,%s,[%s]]\n***********", workflowRun,
- qv.getWorkflowId(), qv.getProcessorName(),
- qv.getPortName(), qv.getPath()));
-
- // the OPM manager builds an OPM graph behind the scenes as a side-effect
- Map<String, List<Dependencies>> a = computeLineageSingleVar(
- workflowRun, qv.getWorkflowId(), qv.getPortName(),
- qv.getProcessorName(), qv.getPath(), selectedProcessors);
-
- answerContent.put(qv, a);
- }
-
- nativeAnswer.setAnswer(answerContent);
- completeAnswer.setNativeAnswer(nativeAnswer);
-
- if (aOPMManager != null && aOPMManager.isActive()) {
- // String _OPM_asXML_File;
- // try {
-
- // _OPM_asXML_File = aOPMManager.Rdf2Xml();
- String _OPM_asRDF_File = aOPMManager.writeGraph();
-
- completeAnswer.setOPMAnswer_AsRDF(_OPM_asRDF_File);
- // completeAnswer.setOPMAnswer_AsXML(_OPM_asXML_File);
-
- // } catch (OperatorException e) {
- // logger.error("Problem running query: " + e);
- // } catch (IOException e) {
- // logger.error("Problem running query: " + e);
- // } catch (JAXBException e) {
- // logger.error("Problem running query: " + e);
- // }
- }
- return completeAnswer;
- }
-
- /**
- * facade for computeLineage: if path == ALL then it retrieves all VBs for
- * (proc,var) ignoring path (i.e., all values within the collection bound to
- * var) and invokes computeLineageSingleBinding() on each path</br> if path
- * is specified, however, this just passes the request to
- * computeLineageSingleBinding. in this case the result map only contains
- * one entry
- *
- * @param workflowRun
- * @param var
- * @param proc
- * @param path
- * @param string
- * @param selectedProcessors
- * @return a map <tt>{ path -> List<LineageQueryResult> }</tt>, one entry for each path
- * @throws SQLException
- */
- public Map<String, List<Dependencies>> computeLineageSingleVar(
- String workflowRun, // dynamic scope
- String workflowId, // static scope
- String var, // target var
- String proc, // qualified with its processor name
- String path, // possibly empty when no collections or no granular lineage required
- List<ProvenanceProcessor> selectedProcessors) throws SQLException {
- if (!isReady()) {
- setReady(tryInit());
- if (!isReady())
- return null;
- }
-
- // are we returning all outputs in addition to the inputs?
- logger.debug("return outputs: " + isReturnOutputs());
-
- Map<String, List<Dependencies>> qa = new HashMap<>();
-
- // run a query for each variable in the entire workflow graph
- if (path.equals(ALL_PATHS_KEYWORD)) {
- Map<String, String> vbConstraints = new HashMap<>();
- vbConstraints.put("VB.processorNameRef", proc);
- vbConstraints.put("VB.portName", var);
- vbConstraints.put("VB.workflowRunId", workflowRun);
-
- List<PortBinding> vbList = getPq().getPortBindings(vbConstraints); // DB
-
- if (vbList.isEmpty())
- logger.warn(ALL_PATHS_KEYWORD
- + " specified for paths but no varBindings found. nothing to compute");
-
- for (PortBinding vb : vbList) {
- // path is of the form [x,y..] we need it as x,y...
- path = vb.getIteration().substring(1,
- vb.getIteration().length() - 1);
-
- List<Dependencies> result = computeLineageSingleBinding(
- workflowRun, workflowId, var, proc, path,
- selectedProcessors);
- qa.put(vb.getIteration(), result);
- }
- } else {
- qa.put(path,
- computeLineageSingleBinding(workflowRun, workflowId, var,
- proc, path, selectedProcessors));
- }
- return qa;
- }
-
- /**
- * main lineage query method. queries the provenance DB with a single
- * originating proc/var/path and a set of selected Processors
- *
- * @param workflowRunId
- * @param var
- * @param proc
- * @param path
- * @param path2
- * @param selectedProcessors
- * @return a list of bindings. each binding involves an input var for one of
- * the selectedProcessors. Note each var can contribute multiple
- * bindings, i.e., when all elements in a collection bound to the
- * var are retrieved. Note also that bindings for input vars are
- * returned as well, when the query is configured with returnOutputs
- * = true {@link ProvenanceAnalysis#isReturnOutputs() }
- * @throws SQLException
- */
- public List<Dependencies> computeLineageSingleBinding(
- String workflowRunId, // dynamic scope
- String workflowId, // static scope
- String var, // target var
- String proc, // qualified with its processor name
- String path, // possibly empty when no collections or no granular lineage required
- List<ProvenanceProcessor> selectedProcessors) throws SQLException {
- long start = System.currentTimeMillis();
- List<LineageSQLQuery> lqList = searchDataflowGraph(workflowRunId,
- workflowId, var, proc, path, selectedProcessors);
- long stop = System.currentTimeMillis();
-
- long gst = stop - start;
-
- // execute queries in the LineageSQLQuery list
- logger.debug("\n**************** executing lineage queries: (includeDataValue is "
- + isIncludeDataValue() + "**************\n");
-
- start = System.currentTimeMillis();
- List<Dependencies> results = getPq().runLineageQueries(lqList,
- isIncludeDataValue());
- stop = System.currentTimeMillis();
-
- long qrt = stop - start;
- logger.debug("search time: " + gst
- + "ms\nlineage query response time: " + qrt + " ms");
- logger.debug("total exec time " + (gst + qrt) + "ms");
-
- return results;
- }
-
- /**
- * compute lineage queries using path projections
- * @param workflowRunId the (single) instance defines the scope of a query<br/>
- * added 2/9: collect a list of paths in the process to be used by the naive query. In practice
- * we use this as the graph search phase that is needed by the naive query anyway
- * @param var
- * @param proc
- * @param path within var (can be empty but not null)
- * @param selectedProcessors pairs (wfID, proceName), encoded as a Map. only report lineage when you reach any of these processors
- * @throws SQLException
- */
- public List<LineageSQLQuery> searchDataflowGraph(
- String workflowRunId, // dymamic scope
- String workflowId, // static scope
- String var, // target var
- String proc, // qualified with its processor name
- String path, // can be empty but not null
- List<ProvenanceProcessor> selectedProcessors) throws SQLException {
- List<LineageSQLQuery> lqList = new ArrayList<>();
-
- // TODO we are ignoring the wfId context information in the list of selected processors!!
-
- // init paths accumulation. here "path" is a path in the graph, not within a collection!
- // associate an empty list of paths to each selected processor
- for (ProvenanceProcessor s : selectedProcessors)
- validPaths.put(s, new ArrayList<List<String>>());
-
- currentPath = new ArrayList<>();
-
- // start with xfer or xform depending on whether initial var is output or input
-
- // get (var, proc) from Port to see if it's input/output
- Map<String, String> varQueryConstraints = new HashMap<>();
- varQueryConstraints.put("V.processorName", proc);
- varQueryConstraints.put("V.portName", var);
- varQueryConstraints.put("V.workflowId", workflowId);
-
- List<Port> vars = getPq().getPorts(varQueryConstraints);
-
- if (vars.isEmpty()) {
- logger.info("variable ("+var+","+proc+") not found, lineage query terminated, constraints: " + varQueryConstraints);
- return null;
- }
-
- logger.info("Found " + vars);
- Port v = vars.get(0); // expect exactly one record
- // CHECK there can be multiple (pname, portName) pairs, i.e., in case of nested workflows
- // here we pick the first that turns up -- we would need to let users choose, or process all of them...
-
- if (v.isInputPort() || v.getProcessorId() == null) {
- // if vName is input, then do a xfer() step
-
- // rec. accumulates SQL queries into lqList
- xferStep(workflowRunId, workflowId, v, path, selectedProcessors,
- lqList);
- } else { // start with xform
- // rec. accumulates SQL queries into lqList
- xformStep(workflowRunId, workflowId, v, proc, path,
- selectedProcessors, lqList);
- }
-
- return lqList;
- } // end searchDataflowGraph
-
- /**
- * accounts for an inverse transformation from one output to all inputs of a
- * processor
- *
- * @param workflowRunId
- * @param var
- * the output var
- * @param proc
- * the processor
- * @param selectedProcessors
- * the processors for which we are interested in producing
- * lineage
- * @param path
- * iteration vector within a PortBinding collection
- * @param lqList
- * partial list of spot lineage queries, to be added to
- * @throws SQLException
- */
- @SuppressWarnings("deprecation")
- private void xformStep(
- String workflowRunId,
- String workflowId,
- Port outputVar, // we need the dnl from this output var
- String proc, String path,
- List<ProvenanceProcessor> selectedProcessors,
- List<LineageSQLQuery> lqList) throws SQLException {
- // retrieve input vars for current processor
- Map<String, String> varsQueryConstraints = new HashMap<>();
-
- List<Port> inputVars = null;
-
- /*
- * here we fetch the input vars for the current proc. however, it may be
- * the case that we are looking at a dataflow port (for the entire
- * dataflow or for a subdataflow) rather than a real processor. in this
- * case we treat this as a special processor that does nothing -- so we
- * "input var" in this case is a copy of the port, and we are ready to
- * go for the next xfer step. in this way we can seamlessly traverse the
- * graph over intermediate I/O that are part of nested dataflows
- */
-
- if (getPq().isDataflow(proc)) { // if we are looking at the output of an entire dataflow
- // force the "input vars" for this step to be the output var itself
- // this causes the following xfer step to trace back to the next processor _within_ proc
- inputVars = new ArrayList<>();
- inputVars.add(outputVar);
- } else if (proc.equals(OUTPUT_CONTAINER_PROCESSOR)) { // same action as prev case, but may change in the future
- inputVars = new ArrayList<>();
- inputVars.add(outputVar);
- } else {
- varsQueryConstraints.put("W.workflowId", workflowId);
- varsQueryConstraints.put("processorName", proc);
- varsQueryConstraints.put("isInputPort", "1");
-
- inputVars = getPq().getPorts(varsQueryConstraints);
- }
-
- ///////////
- /// path projections
- ///////////
- // maps each var to its projected path
- Map<Port,String> var2Path = new HashMap<>();
- Map<Port,Integer> var2delta = new HashMap<>();
-
- if (path == null) { // nothing to split
- for (Port inputVar : inputVars)
- var2Path.put(inputVar, null);
- } else {
- int minPathLength = 0; // if input path is shorter than this we give up granularity altogether
- for (Port inputVar : inputVars) {
- int resolvedDepth = 0;
- if (inputVar.getResolvedDepth() != null)
- resolvedDepth = inputVar.getResolvedDepth();
- int delta = resolvedDepth - inputVar.getDepth();
- var2delta.put(inputVar, delta);
- minPathLength += delta;
- }
-
- String iterationVector[] = path.split(",");
-
- if (iterationVector.length < minPathLength) { // no path is propagated
- for (Port inputVar: inputVars)
- var2Path.put(inputVar, null);
- } else { // compute projected paths
- String[] projectedPath;
-
- int start = 0;
- for (Port inputVar: inputVars) {
- // 24/7/08 get DNL (declared nesting level) and ANL (actual nesting level) from VAR
- // TODO account for empty paths
- int projectedPathLength = var2delta.get(inputVar); // this is delta
-
- if (projectedPathLength == 0) {
- // associate empty path to this var
- var2Path.put(inputVar, null);
- continue;
- }
-
- // this var is involved in iteration
- projectedPath = new String[projectedPathLength];
- for (int i = 0; i < projectedPathLength; i++)
- projectedPath[i] = iterationVector[start + i];
- start += projectedPathLength;
-
- StringBuilder iterationFragment = new StringBuilder();
- for (String s : projectedPath)
- iterationFragment.append(s + ",");
- iterationFragment
- .deleteCharAt(iterationFragment.length() - 1);
-
- var2Path.put(inputVar, iterationFragment.toString());
- }
- }
- }
-
- // accumulate this proc to current path
- currentPath.add(proc);
-
- /*
- * if this is a selected processor, add a copy of the current path to
- * the list of paths for the processor
- */
-
- // is <workflowId, proc> in selectedProcessors?
- boolean isSelected = false;
- for (ProvenanceProcessor pp : selectedProcessors)
- if (pp.getWorkflowId().equals(workflowId)
- && pp.getProcessorName().equals(proc)) {
- List<List<String>> paths = validPaths.get(pp);
-
- // copy the path since the original will change
- // also remove spurious dataflow processors at this point
- List<String> pathCopy = new ArrayList<>();
- for (String s : currentPath)
- if (!getPq().isDataflow(s))
- pathCopy.add(s);
- paths.add(pathCopy);
- isSelected = true;
- break;
- }
-
- ///////////
- /// generate SQL if necessary -- for all input vars, based on the current path
- /// the projected paths are required to determine the level in the collection at which
- /// we look at the value assignment
- ///////////
-
- Map<String, ProvenanceArtifact> var2Artifact = new HashMap<>();
- Map<String, ProvenanceRole> var2ArtifactRole = new HashMap<>();
-
- // if this transformation is important to the user, produce an output and also an OPM graph fragment
- if (selectedProcessors.isEmpty() || isSelected) {
- List<LineageSQLQuery> newLqList = getPq().lineageQueryGen(
- workflowRunId, proc, var2Path, outputVar, path,
- isReturnOutputs() || var2Path.isEmpty());
- lqList.addAll(newLqList);
-
- // BEGIN OPM update section
- //
- // create OPM artifact and role for the output var of this xform
- //
- boolean doOPM = (aOPMManager != null && aOPMManager.isActive()); // any problem below will set this to false
-
- if (doOPM) {
- // fetch value for this variable and assert it as an Artifact in the OPM graph
- Map<String, String> vbConstraints = new HashMap<>();
- vbConstraints.put("VB.processorNameRef",
- outputVar.getProcessorName());
- vbConstraints.put("VB.portName", outputVar.getPortName());
- vbConstraints.put("VB.workflowRunId", workflowRunId);
-
- if (path != null) {
- /*
- * account for x,y,.. format as well as [x,y,...] depending
- * on where the request is coming from
- */
- // TODO this is just irritating must be removed
- if (path.startsWith("["))
- vbConstraints.put("VB.iteration", path);
- else
- vbConstraints.put("VB.iteration", "[" + path + "]");
- }
-
- List<PortBinding> vbList = getPq().getPortBindings(vbConstraints); // DB
-
- /*
- * use only the first result (expect only one) -- in this method
- * we assume path is not null
- */
-
- // map the resulting varBinding to an Artifact
- if (vbList == null || vbList.size() == 0) {
- logger.debug("no entry corresponding to conditions: proc="
- + outputVar.getProcessorName() + " var = "
- + outputVar.getPortName() + " iteration = " + path);
- doOPM = false;
- } else {
- PortBinding vb = vbList.get(0);
-
- if (aOPMManager != null && !pq.isDataflow(proc)) {
- if (isRecordArtifactValues()) {
- T2Reference ref = getInvocationContext()
- .getReferenceService().referenceFromString(
- vb.getValue());
-
- Object data = ic.getReferenceService()
- .renderIdentifier(ref, Object.class, ic);
-
- // ReferenceSetImpl o = (ReferenceSetImpl) ic.getReferenceService().resolveIdentifier(ref, null, ic);
- logger.debug("deref value for ref: " + ref + " "
- + data + " of class "
- + data.getClass().getName());
-
- try {
- aOPMManager.addArtifact(vb.getValue(), data);
- } catch (ProvenanceException e) {
- logger.warn("Could not add artifact", e);
- }
- } else {
- try {
- aOPMManager.addArtifact(vb.getValue());
- } catch (ProvenanceException e) {
- logger.warn("Could not add artifact", e);
- }
- }
- aOPMManager.createRole(vb.getWorkflowRunId(),
- vb.getWorkflowId(), vb.getProcessorName(),
- vb.getIteration());
- }
-
- /*
- * assert proc as Process -- include iteration vector to
- * separate different activations of the same process
- */
- try {
- aOPMManager.addProcess(proc, vb.getIteration(),
- workflowId, vb.getWorkflowRunId());
- } catch (ProvenanceException e) {
- logger.warn("Could not add process", e);
- }
-
- /*
- * create OPM generatedBy property between output value and
- * this process node avoid the pathological case where a
- * dataflow generates its own inputs
- */
- try {
- aOPMManager.assertGeneratedBy(
- aOPMManager.getCurrentArtifact(),
- aOPMManager.getCurrentProcess(),
- aOPMManager.getCurrentRole(),
- aOPMManager.getCurrentAccount(), true);
- } catch (ProvenanceException e) {
- logger.warn("Could not add assertion", e);
- }
- }
- }
- //
- // create OPM process for this xform
- //
- for (LineageSQLQuery lq : newLqList) {
- // if OPM is on, execute the query so we get the value we need for the Artifact node
- Dependencies inputs = getPq().runLineageQuery(lq,
- isIncludeDataValue());
-
- if (doOPM && inputs.getRecords().size() > 0) { // && !pq.isDataflow(proc)) {
- // update OPM graph with inputs and used properties
- for (LineageQueryResultRecord resultRecord: inputs.getRecords()) {
- // process inputs only
- if (!resultRecord.isInputPort())
- continue;
-
- // map each input var in the resultRecord to an Artifact
- // create new Resource for the resultRecord
- // use the value as URI for the Artifact, and resolvedValue as the actual value
-
- //
- // create OPM artifact and role for the input var obtained by path projection
- //
- if (resultRecord.isCollection()) {
- try {
- aOPMManager.addArtifact(resultRecord
- .getCollectionT2Reference());
- } catch (ProvenanceException e) {
- logger.warn("Could not add artifact", e);
- }
- } else if (isRecordArtifactValues()) {
- T2Reference ref = getInvocationContext()
- .getReferenceService().referenceFromString(
- resultRecord.getValue());
- Object data = ic.getReferenceService()
- .renderIdentifier(ref, Object.class, ic);
- logger.debug("deref value for ref: " + ref + " "
- + data + " of class "
- + data.getClass().getName());
- try {
- aOPMManager.addArtifact(
- resultRecord.getValue(), data);
- } catch (ProvenanceException e) {
- logger.warn("Could not add artifact", e);
- }
- } else {
- try {
- aOPMManager
- .addArtifact(resultRecord.getValue());
- } catch (ProvenanceException e) {
- logger.warn("Could not add artifact", e);
- }
- var2Artifact.put(resultRecord.getPortName(),
- aOPMManager.getCurrentArtifact());
-
- aOPMManager.createRole(
- resultRecord.getWorkflowRunId(),
- resultRecord.getworkflowId(),
- resultRecord.getProcessorName(),
- resultRecord.getIteration());
- var2ArtifactRole.put(resultRecord.getPortName(),
- aOPMManager.getCurrentRole());
-
- //
- // create OPM used property between process and the input var obtained by path projection
- //
- // avoid output variables, it would assert that P used one of its outputs!
-
- try {
- aOPMManager.assertUsed(
- aOPMManager.getCurrentArtifact(),
- aOPMManager.getCurrentProcess(),
- aOPMManager.getCurrentRole(),
- aOPMManager.getCurrentAccount(), true);
- } catch (ProvenanceException e) {
- logger.warn("Could not add artifact", e);
- }
-
- // true -> prevent duplicates CHECK
- }
- }
- }
- // END OPM update section
- }
-
- // recursion -- xfer path is next up
- for (Port inputVar : inputVars)
- xferStep(workflowRunId, workflowId, inputVar,
- var2Path.get(inputVar), selectedProcessors, lqList);
- }
- currentPath.remove(currentPath.size()-1); // CHECK
- } // end xformStep
-
- private void xferStep(String workflowRunId, String workflowId, Port port,
- String path, List<ProvenanceProcessor> selectedProcessors,
- List<LineageSQLQuery> lqList) throws SQLException {
-
- // retrieve all Datalinks ending with (var,proc) -- ideally there is exactly one
- // (because multiple incoming datalinks are disallowed)
- Map<String, String> datalinksQueryConstraints = new HashMap<>();
- datalinksQueryConstraints
- .put("destinationPortId", port.getIdentifier());
- List<DataLink> datalinks = getPq().getDataLinks(
- datalinksQueryConstraints);
-
- if (datalinks.isEmpty())
- return; // CHECK
-
- DataLink a = datalinks.get(0);
-
- // get source node
- String sourceProcName = a.getSourceProcessorName();
-// String sourcePortName = a.getSourcePortName();
-
- // CHECK transfer same path with only exception: when anl(sink) > anl(source)
- // in this case set path to null
-
- // retrieve full record for var:
- // retrieve input vars for current processor
- Map<String, String> varsQueryConstraints = new HashMap<>();
-
-// varsQueryConstraints.put("W.workflowId", workflowRunId);
- varsQueryConstraints.put("portId", a.getSourcePortId());
-// varsQueryConstraints.put("processorNameRef", sourceProcName);
-// varsQueryConstraints.put("portName", sourcePortName);
- List<Port> varList = getPq().getPorts(varsQueryConstraints);
-
- Port outputVar = varList.get(0);
-
- // recurse on xform
- xformStep(workflowRunId, workflowId, outputVar, sourceProcName, path,
- selectedProcessors, lqList);
- } // end xferStep2
-
- /**
- * this class represents the annotation (single or sequence, to be
- * determined) that are produced upon visiting the graph structure and that
- * drive the generation of a pinpoint lineage query<br/>
- * this is still a placeholder
- */
- class LineageAnnotation {
- private List<String> path = new ArrayList<>();
-
- private boolean isXform = true;
-
- private String iteration = ""; // this is the iteration projected on a single variable. Used for propagation upwards default is no iteration --
- private String iterationVector = ""; // iteration vector accounts for cross-products. Used to be matched exactly in queries.
- private int iic = 0; // index in collection -- default is 0
- private int collectionNesting = 0; // n indicates granularity is n levels from leaf.
- // This quantifies loss of lineage precision when working with collections
- private String collectionRef = null;
- private String proc;
- private String var;
- private String varType = null; // dtring, XML,... see Taverna type system
-
- private int DNL = 0; // declared nesting level -- copied from VAR
- private int ANL = 0; // actual nesting level -- copied from Port
-
- private String workflowRun; // TODO generalize to list / time interval?
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- if (isXform)
- sb.append(" xform: ");
- else
- sb.append(" xfer: ");
- sb.append("<PROC/VAR/VARTYPE, IT, IIC, ITVECTOR, COLLNESTING> = "
- + proc + "/" + var + "/" + varType + "," + "[" + iteration
- + "]" + "," + iic + ", [" + iterationVector + "]" + ","
- + collectionNesting);
- return sb.toString();
- }
-
- public void addStep(String step) {
- path.add(step);
- }
-
- public void removeLastStep() {
- path.remove(path.size() - 1);
- }
-
- /**
- * @return the path
- */
- public List<String> getPath() {
- return path;
- }
-
- /**
- * @param path
- * the path to set
- */
- public void setPath(List<String> path) {
- this.path = path;
- }
-
- /**
- * @return the iteration
- */
- public String getIteration() {
- return iteration;
- }
-
- /**
- * @param iteration
- * the iteration to set
- */
- public void setIteration(String iteration) {
- this.iteration = iteration;
- }
-
- /**
- * @return the iic
- */
- public int getIic() {
- return iic;
- }
-
- /**
- * @param iic
- * the iic to set
- */
- public void setIic(int iic) {
- this.iic = iic;
- }
-
- /**
- * @return the collectionRef
- */
- public String getCollectionRef() {
- return collectionRef;
- }
-
- /**
- * @param collectionRef
- * the collectionRef to set
- */
- public void setCollectionRef(String collectionRef) {
- this.collectionRef = collectionRef;
- }
-
- /**
- * @return the proc
- */
- public String getProc() {
- return proc;
- }
-
- /**
- * @param proc
- * the proc to set
- */
- public void setProc(String proc) {
- this.proc = proc;
- }
-
- /**
- * @return the var
- */
- public String getVar() {
- return var;
- }
-
- /**
- * @param var
- * the var to set
- */
- public void setVar(String var) {
- this.var = var;
- }
-
- /**
- * @return the varType
- */
- public String getVarType() {
- return varType;
- }
-
- /**
- * @param varType
- * the varType to set
- */
- public void setVarType(String varType) {
- this.varType = varType;
- }
-
- /**
- * @return the workflowRun
- */
- public String getWorkflowRun() {
- return workflowRun;
- }
-
- /**
- * @param workflowRun
- * the workflowRun to set
- */
- public void setWorkflowRun(String workflowRun) {
- this.workflowRun = workflowRun;
- }
-
- /**
- * @return the isXform
- */
- public boolean isXform() {
- return isXform;
- }
-
- /**
- * @param isXform
- * the isXform to set
- */
- public void setXform(boolean isXform) {
- this.isXform = isXform;
- }
-
- /**
- * @return the collectionNesting
- */
- public int getCollectionNesting() {
- return collectionNesting;
- }
-
- /**
- * @param collectionNesting
- * the collectionNesting to set
- */
- public void setCollectionNesting(int collectionNesting) {
- this.collectionNesting = collectionNesting;
- }
-
- /**
- * @return the iterationVector
- */
- public String getIterationVector() {
- return iterationVector;
- }
-
- /**
- * @param iterationVector
- * the iterationVector to set
- */
- public void setIterationVector(String iterationVector) {
- this.iterationVector = iterationVector;
- }
-
- /**
- * @return the dNL
- */
- public int getDNL() {
- return DNL;
- }
-
- /**
- * @param dnl
- * the dNL to set
- */
- public void setDNL(int dnl) {
- DNL = dnl;
- }
-
- /**
- * @return the aNL
- */
- public int getANL() {
- return ANL;
- }
-
- /**
- * @param anl
- * the aNL to set
- */
- public void setANL(int anl) {
- ANL = anl;
- }
- }
-
- /**
- * @return the validPaths
- */
- public Map<ProvenanceProcessor, List<List<String>>> getValidPaths() {
- return validPaths;
- }
-
- /**
- * @param validPaths
- * the validPaths to set
- */
- public void setValidPaths(
- Map<ProvenanceProcessor, List<List<String>>> validPaths) {
- this.validPaths = validPaths;
- }
-
- public void setPq(ProvenanceQuery pq) {
- this.pq = pq;
- }
-
- public ProvenanceQuery getPq() {
- return pq;
- }
-
- /**
- * @return the ready
- */
- public boolean isReady() {
- return ready;
- }
-
- /**
- * @param ready
- * the ready to set
- */
- public void setReady(boolean ready) {
- this.ready = ready;
- }
-
- /**
- * @return the returnOutputs
- */
- public boolean isReturnOutputs() {
- return returnOutputs;
- }
-
- /**
- * @param returnOutputs
- * the returnOutputs to set
- */
- public void setReturnOutputs(boolean returnOutputs) {
- this.returnOutputs = returnOutputs;
- }
-
- /**
- * @return the recordArtifactValues
- */
- public boolean isRecordArtifactValues() {
- return recordArtifactValues;
- }
-
- /**
- * @param recordArtifactValues
- * the recordArtifactValues to set
- */
- public void setRecordArtifactValues(boolean recordArtifactValues) {
- this.recordArtifactValues = recordArtifactValues;
-
- }
-
- /**
- * @return the includeDataValue
- */
- public boolean isIncludeDataValue() {
- return includeDataValue;
- }
-
- /**
- * @param includeDataValue
- * the includeDataValue to set
- */
- public void setIncludeDataValue(boolean includeDataValue) {
- this.includeDataValue = includeDataValue;
- }
-
- /**
- * @return the generateOPMGraph
- */
- public boolean isGenerateOPMGraph() {
- return generateOPMGraph;
- }
-
- /**
- * @param generateOPMGraph
- * the generateOPMGraph to set
- */
- public void setGenerateOPMGraph(boolean generateOPMGraph) {
- this.generateOPMGraph = generateOPMGraph;
- if (aOPMManager != null)
- aOPMManager.setActive(generateOPMGraph);
- }
-
- public void setInvocationContext(InvocationContext context) {
- this.ic = context;
- }
-
- public InvocationContext getInvocationContext() {
- return this.ic;
- }
-}