You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/02/03 22:42:27 UTC
airavata git commit: Added TestWorkflow and fixed set of compilatin
issues
Repository: airavata
Updated Branches:
refs/heads/develop 14a566a47 -> d1bb38275
Added TestWorkflow and fixed set of compilatin issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d1bb3827
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d1bb3827
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d1bb3827
Branch: refs/heads/develop
Commit: d1bb38275f8caa2204ecbe1611b0fbe9c6e322b5
Parents: 14a566a
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Feb 3 16:42:21 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Feb 3 16:42:21 2016 -0500
----------------------------------------------------------------------
.../core/SimpleWorkflowInterpreter.java | 35 +++---
.../airavata/workflow/core/WorkflowFactory.java | 5 -
.../workflow/core/dag/edge/DirectedEdge.java | 5 +
.../airavata/workflow/core/dag/edge/Edge.java | 2 +
.../workflow/core/dag/port/OutPort.java | 2 +-
.../workflow/core/dag/port/OutPortImpl.java | 2 +-
.../core/parser/AiravataWorkflowBuilder.java | 117 -------------------
.../core/parser/JsonWorkflowParser.java | 6 +-
.../core/parser/JsonWorkflowParserTest.java | 4 +-
.../src/test/resources/TestWorkflow.json | 85 ++++++++++++++
10 files changed, 123 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
index 01ad6bb..7f8a8a5 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
@@ -67,6 +67,7 @@ class SimpleWorkflowInterpreter{
private String gatewayName;
+ private String workflowString;
private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
private Map<String, WorkflowContext> processingQueue = new ConcurrentHashMap<String, WorkflowContext>();
@@ -97,10 +98,12 @@ class SimpleWorkflowInterpreter{
* @throws Exception
*/
void launchWorkflow() throws Exception {
- WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null);
-
+// WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null);
+ workflowString = getWorkflow();
+ WorkflowParser workflowParser = WorkflowFactory.getWorkflowParser(workflowString);
log.debug("Initialized workflow parser");
- setInputNodes(workflowBuilder.build());
+ workflowParser.parse();
+ setInputNodes(workflowParser.getInputNodes());
log.debug("Parsed the workflow and got the workflow input nodes");
// process workflow input nodes
processWorkflowInputNodes(getInputNodes());
@@ -117,6 +120,12 @@ class SimpleWorkflowInterpreter{
processReadyList();
}
+ private String getWorkflow() throws AppCatalogException {
+ WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog();
+ //FIXME: parse workflowTemplateId or experimentId
+ workflowCatalog.getWorkflow("");
+ }
+
// try to remove synchronization tag
/**
* Package-Private method.
@@ -129,9 +138,9 @@ class SimpleWorkflowInterpreter{
}
for (WorkflowNode readyNode : readyList.values()) {
if (readyNode instanceof OutputNode) {
- OutputNode wfOutputNode = (OutputNode) readyNode;
- wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
- addToCompleteOutputNodeList(wfOutputNode);
+ OutputNode outputNode = (OutputNode) readyNode;
+ outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue());
+ addToCompleteOutputNodeList(outputNode);
continue;
}
WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
@@ -206,13 +215,13 @@ class SimpleWorkflowInterpreter{
}
- private void processWorkflowInputNodes(List<InputNode> wfInputNodes) {
+ private void processWorkflowInputNodes(List<InputNode> inputNodes) {
Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
- for (InputNode wfInputNode : wfInputNodes) {
- if (wfInputNode.isReady()) {
- log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
- for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
- edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
+ for (InputNode inputNode : inputNodes) {
+ if (inputNode.isReady()) {
+ log.debug("Workflow node : " + inputNode.getId() + " is ready to execute");
+ for (Edge edge : inputNode.getOutPort().getEdges()) {
+ edge.getToPort().getInputObject().setValue(inputNode.getInputObject().getValue());
if (edge.getToPort().getNode().isReady()) {
addToReadyQueue(edge.getToPort().getNode());
log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
@@ -315,7 +324,7 @@ class SimpleWorkflowInterpreter{
break;
}
}
- for (Edge edge : outPort.getOutEdges()) {
+ for (Edge edge : outPort.getEdges()) {
edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
if (edge.getToPort().getNode().isReady()) {
addToReadyQueue(edge.getToPort().getNode());
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
index f232efa..e06fab5 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
@@ -37,11 +37,6 @@ public class WorkflowFactory {
private static final Logger log = LoggerFactory.getLogger(WorkflowFactory.class);
-
- public static WorkflowBuilder getWorkflowBuilder(String experimentId, String credentialToken, String workflowString) throws Exception {
- return new AiravataWorkflowBuilder(experimentId, credentialToken, getWorkflowParser(workflowString));
- }
-
public static WorkflowParser getWorkflowParser(String workflowString) throws Exception {
WorkflowParser workflowParser = null;
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
index b1d79b1..3ad7afa 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
@@ -33,6 +33,11 @@ public class DirectedEdge implements Edge {
private EdgeModel edgeModel;
@Override
+ public String getId() {
+ return getEdgeModel().getEdgeId();
+ }
+
+ @Override
public void setEdgeModel(EdgeModel edgeModel) {
this.edgeModel = edgeModel;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
index d1c340e..2ad098e 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
@@ -32,6 +32,8 @@ import org.apache.airavata.workflow.core.dag.port.OutPort;
public interface Edge {
+ public String getId();
+
public void setEdgeModel(EdgeModel edgeModel);
public EdgeModel getEdgeModel();
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
index 7ae3220..d12666e 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
@@ -32,7 +32,7 @@ public interface OutPort extends Port {
public OutputDataObjectType getOutputObject();
- public List<Edge> getOutEdges();
+ public List<Edge> getEdges();
public void addEdge(Edge edge);
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
index 4d90308..c261279 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
@@ -52,7 +52,7 @@ public class OutPortImpl implements OutPort {
}
@Override
- public List<Edge> getOutEdges() {
+ public List<Edge> getEdges() {
return this.outEdges;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java
deleted file mode 100644
index e50b245..0000000
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/AiravataWorkflowBuilder.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.workflow.core.parser;
-
-import org.apache.airavata.model.application.io.InputDataObjectType;
-import org.apache.airavata.model.application.io.OutputDataObjectType;
-import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
-import org.apache.airavata.workflow.core.WorkflowBuilder;
-import org.apache.airavata.workflow.core.WorkflowParser;
-import org.apache.airavata.workflow.core.dag.edge.DirectedEdge;
-import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
-import org.apache.airavata.workflow.core.dag.nodes.ApplicationNodeImpl;
-import org.apache.airavata.workflow.core.dag.nodes.InputNode;
-import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
-import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
-import org.apache.airavata.workflow.core.dag.nodes.OutputNodeImpl;
-import org.apache.airavata.workflow.core.dag.port.*;
-
-import java.util.*;
-
-public class AiravataWorkflowBuilder implements WorkflowBuilder {
-
- private String credentialToken ;
- private WorkflowParser workflowParser;
- private ExperimentModel experiment;
-
-
- public AiravataWorkflowBuilder(String experimentId, String credentialToken, WorkflowParser workflowParser) throws RegistryException {
- this.experiment = getExperiment(experimentId);
- this.credentialToken = credentialToken;
- this.workflowParser = workflowParser;
- }
-
- public AiravataWorkflowBuilder(ExperimentModel experiment, String credentialToken , WorkflowParser workflowParser) {
- this.credentialToken = credentialToken;
- this.experiment = experiment;
- this.workflowParser = workflowParser;
- }
-
- @Override
- public List<InputNode> build() throws Exception {
- return parseWorkflow(getWorkflowFromExperiment(experiment));
- }
-
- @Override
- public List<InputNode> build(String workflow) throws Exception {
- return parseWorkflow(workflow);
- }
-
- public List<InputNode> parseWorkflow(String workflow) throws Exception {
-
- List<InputNode> inputNodes = workflowParser.getInputNodes();
- List<ApplicationNode> applicationNodes = workflowParser.getApplicationNodes();
- List<Port> ports = workflowParser.getPorts();
- List<Edge> edges = workflowParser.getEdges();
- List<OutputNode> outputNodes = workflowParser.getOutputNodes();
-
- // travel breath first and build relation between each workflow component
- Queue<WorkflowNode> queue = new LinkedList<>();
- List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
- Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>();
- for (InputDataObjectType dataObjectType : experimentInputs) {
- inputDataMap.put(dataObjectType.getName(), dataObjectType);
- }
-
- return inputNodes;
- }
-
-
- private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) {
- OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
- outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument());
- outputDataObjectType.setName(inputObject.getName());
- outputDataObjectType.setType(inputObject.getType());
- outputDataObjectType.setValue(inputObject.getValue());
- return outputDataObjectType;
- }
-
- private ExperimentModel getExperiment(String experimentId) throws RegistryException {
- Registry registry = RegistryFactory.getRegistry();
- return (ExperimentModel)registry.getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
- }
-
- private String getWorkflowFromExperiment(ExperimentModel experiment) throws RegistryException, AppCatalogException {
- WorkflowCatalog workflowCatalog = getWorkflowCatalog();
-
- // FIXME: return workflow string
- return null;
- }
-
- private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException {
- return RegistryFactory.getAppCatalog().getWorkflowCatalog();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
index 59576b3..ede69e3 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParser.java
@@ -16,12 +16,16 @@ import java.util.List;
public class JsonWorkflowParser implements WorkflowParser{
private final String workflow;
+ private List<InputNode> inputs;
+ private List<OutputNode> outputs;
+ private List<ApplicationNode> applications;
+ private List<Port> ports;
+ private List<Edge> edges;
public JsonWorkflowParser(String jsonWorkflowString) {
workflow = jsonWorkflowString;
}
-
@Override
public void parse() throws Exception {
// TODO parse json string and construct components
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
index 95a2579..712944d 100644
--- a/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
+++ b/modules/workflow/workflow-core/src/test/java/org/apache/airavata/workflow/core/parser/JsonWorkflowParserTest.java
@@ -109,8 +109,8 @@ public class JsonWorkflowParserTest {
Assert.assertNotNull(node.getInputPorts().get(1).getEdge());
Assert.assertEquals(1, node.getOutputPorts().size());
- Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size());
- Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0));
+ Assert.assertEquals(1, node.getOutputPorts().get(0).getEdges().size());
+ Assert.assertNotNull(node.getOutputPorts().get(0).getEdges().get(0));
} else if (wfNode instanceof OutputNode) {
OutputNode outputNode = (OutputNode) wfNode;
Assert.assertNotNull(outputNode.getInPort());
http://git-wip-us.apache.org/repos/asf/airavata/blob/d1bb3827/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json b/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json
new file mode 100644
index 0000000..5b64172
--- /dev/null
+++ b/modules/workflow/workflow-core/src/test/resources/TestWorkflow.json
@@ -0,0 +1,85 @@
+{
+ "workflow" : {
+ "name" : "name",
+ "id" : "default_id",
+ "description" : "default description",
+ "version" : "version",
+ "applications" : [
+ { "applicationId" : "appId_1",
+ "name" : "App Name",
+ "description" : "My app description",
+ "appType" : "MPI",
+ "inputs" : [
+ { "name" : "appInputName_1",
+ "id" : "appInputNode_Id_1",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue",
+ "description" : "App Input Description" },
+ { "name" : "appInputName_2",
+ "id" : "appInputNode_Id_2",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue",
+ "description" : "App Input Description" }],
+ "outputs" : [
+ { "name" : "appOutputName_1",
+ "id" : "appOutputNode_Id_1",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue",
+ "description" : "App Output Description" },
+ { "name" : "appOutputName_2",
+ "id" : "appOutputNode_Id_2",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue" }],
+ "position" : { "x" : 124 , "y" : 643 },
+ "nodeId" : "applicationNodeId",
+ "parallelExecution" : "true",
+ "properties" : null }
+ ],
+ "workflowInputs" : [
+ { "name" : "inputName_1",
+ "id" : "inputNode_Id_1",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue",
+ "description" : "Input Description",
+ "position" : { "x" : 23 , "y" : 43 },
+ "nodeId" : "defaultNodeId_1" },
+ { "name" : "inputName_2",
+ "id" : "inputNode_Id_2",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue",
+ "description" : "Input Description",
+ "position" : { "x" : 23 , "y" : 103 },
+ "nodeId" : "defaultNodeId_2" }
+ ],
+ "workflowOutputs" : [
+ { "name" : "outputName_1",
+ "id" : "outputNode_Id_1",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue",
+ "description" : "Output Description",
+ "position" : { "x" : 423 , "y" : 43 },
+ "nodeId" : "defaultOutputNodeId_1" },
+ { "name" : "outputName_2",
+ "id" : "outputNode_Id_2",
+ "dataType" : "STRING",
+ "defaultValue" : "defaultValue",
+ "description" : "Output Description",
+ "position" : { "x" : 423 , "y" : 103 },
+ "nodeId" : "defaultOutputNodeId_2" }
+ ],
+ "links" : [
+ { "description" : "link desc",
+ "from" : { "nodeId" : "" , "ouputId" : "" },
+ "to" : { "nodeId" : "" , "outputId" : "" }},
+ { "description" : "link desc",
+ "from" : { "nodeId" : "" , "ouputId" : "" },
+ "to" : { "nodeId" : "" , "outputId" : "" }},
+ { "description" : "link desc",
+ "from" : { "nodeId" : "" , "ouputId" : "" },
+ "to" : { "nodeId" : "" , "outputId" : "" }},
+ { "description" : "link desc",
+ "from" : { "nodeId" : "" , "ouputId" : "" },
+ "to" : { "nodeId" : "" , "outputId" : "" }}
+ ]
+ }
+}