You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:12 UTC
[13/50] [abbrv] airavata git commit: Fixed AIRAVATA-1571,
and refactored the code
Fixed AIRAVATA-1571, and refactored the code
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8835097e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8835097e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8835097e
Branch: refs/heads/master
Commit: 8835097ed697583601bde6039801a95dbdd33422
Parents: 55319c9
Author: shamrath <sh...@gmail.com>
Authored: Mon Feb 23 14:14:26 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Mon Feb 23 14:14:26 2015 -0500
----------------------------------------------------------------------
.../engine/SimpleWorkflowInterpreter.java | 50 +++++++++++---------
.../engine/dag/nodes/ApplicationNodeImpl.java | 28 ++++++-----
.../engine/dag/nodes/WorkflowInputNodeImpl.java | 13 ++---
.../workflow/engine/dag/nodes/WorkflowNode.java | 10 ++--
.../dag/nodes/WorkflowOutputNodeImpl.java | 15 +++---
.../engine/parser/AiravataDefaultParser.java | 14 +++---
6 files changed, 70 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index 3c2596d..93b3bc0 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -102,9 +102,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
setWorkflowInputNodes(workflowParser.parse());
log.debug("Parsed the workflow and got the workflow input nodes");
processWorkflowInputNodes(getWorkflowInputNodes());
-// processReadyList();
- // process workflow application nodes
- // process workflow output nodes
}
// try to remove synchronization tag
@@ -113,7 +110,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
try {
if (readyNode instanceof WorkflowOutputNode) {
WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
- completeWorkflowOutputs.add(wfOutputNode);
+ wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+ addToCompleteOutputNodeList(wfOutputNode);
continue;
}
WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
@@ -128,6 +126,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
}
}
+
private void publishToProcessQueue(TaskDetails process) {
Thread thread = new Thread(new TempPublisher(process, eventBus));
thread.start();
@@ -140,13 +139,13 @@ public class SimpleWorkflowInterpreter implements Runnable{
if (workflowNode instanceof ApplicationNode) {
ApplicationNode applicationNode = (ApplicationNode) workflowNode;
List<InPort> inputPorts = applicationNode.getInputPorts();
- if (applicationNode.getNodeName().equals("Add")) {
+ if (applicationNode.getName().equals("Add")) {
applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
- } else if (applicationNode.getNodeName().equals("Multiply")) {
+ } else if (applicationNode.getName().equals("Multiply")) {
applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
- } else if (applicationNode.getNodeName().equals("Subtract")) {
+ } else if (applicationNode.getName().equals("Subtract")) {
applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
} else {
@@ -178,7 +177,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
}
private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
- WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getNodeId(), null);
+ WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
String executionData = null;
if (readyNode instanceof ApplicationNode) {
@@ -218,15 +217,15 @@ public class SimpleWorkflowInterpreter implements Runnable{
Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
for (WorkflowInputNode wfInputNode : wfInputNodes) {
if (wfInputNode.isReady()) {
- log.debug("Workflow node : " + wfInputNode.getNodeId() + " is ready to execute");
+ log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
if (edge.getToPort().getNode().isReady()) {
addToReadyQueue(edge.getToPort().getNode());
- log.debug("Added workflow node : " + edge.getToPort().getNode().getNodeId() + " to the readyQueue");
+ log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
} else {
addToWaitingQueue(edge.getToPort().getNode());
- log.debug("Added workflow node " + edge.getToPort().getNode().getNodeId() + " to the waitingQueue");
+ log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
}
}
@@ -311,25 +310,25 @@ public class SimpleWorkflowInterpreter implements Runnable{
case STARTED:
break;
case PRE_PROCESSING:
- processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+ processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
break;
case INPUT_DATA_STAGING:
- processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+ processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
break;
case EXECUTING:
- processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING);
+ processPack.getWorkflowNode().setState(NodeState.EXECUTING);
break;
case OUTPUT_DATA_STAGING:
- processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+ processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
break;
case POST_PROCESSING:
- processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+ processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
break;
case COMPLETED:
- processPack.getWorkflowNode().setNodeState(NodeState.EXECUTED);
+ processPack.getWorkflowNode().setState(NodeState.EXECUTED);
break;
case FAILED:
- processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
+ processPack.getWorkflowNode().setState(NodeState.FAILED);
break;
case UNKNOWN:
break;
@@ -337,7 +336,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
break;
case CANCELED:
case CANCELING:
- processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
+ processPack.getWorkflowNode().setState(NodeState.FAILED);
break;
default:
break;
@@ -358,12 +357,12 @@ public class SimpleWorkflowInterpreter implements Runnable{
* @param workflowNode - Workflow Node
*/
private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
- waitingList.remove(workflowNode.getNodeId());
- readList.put(workflowNode.getNodeId(), workflowNode);
+ waitingList.remove(workflowNode.getId());
+ readList.put(workflowNode.getId(), workflowNode);
}
private void addToWaitingQueue(WorkflowNode workflowNode) {
- waitingList.put(workflowNode.getNodeId(), workflowNode);
+ waitingList.put(workflowNode.getId(), workflowNode);
}
/**
@@ -372,7 +371,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
* @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails
*/
private synchronized void addToProcessingQueue(ProcessPack processPack) {
- readList.remove(processPack.getWorkflowNode().getNodeId());
+ readList.remove(processPack.getWorkflowNode().getId());
processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack);
}
@@ -382,6 +381,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
}
+ private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
+ completeWorkflowOutputs.add(wfOutputNode);
+ readList.remove(wfOutputNode.getId());
+ }
+
@Override
public void run() {
// TODO: Auto generated method body.
http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
index dd4415a..1282dd0 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
@@ -34,40 +34,46 @@ public class ApplicationNodeImpl implements ApplicationNode {
private String applicationId;
private List<InPort> inPorts = new ArrayList<InPort>();
private List<OutPort> outPorts = new ArrayList<OutPort>();
+ private String applicationName;
- public ApplicationNodeImpl(String nodeId) {
- this(nodeId, null);
- }
+// public ApplicationNodeImpl(String nodeId) {
+// this(nodeId, null);
+// }
+//
+// public ApplicationNodeImpl(String nodeId, String applicationId) {
+// this(nodeId, null, applicationId);
+// }
- public ApplicationNodeImpl(String nodeId, String applicationId) {
+ public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) {
this.nodeId = nodeId;
+ this.applicationName = applicationName;
this.applicationId = applicationId;
}
@Override
- public String getNodeId() {
+ public String getId() {
return this.nodeId;
}
@Override
- public String getNodeName() {
- return this.getNodeName();
+ public String getName() {
+ return applicationName;
}
@Override
- public NodeType getNodeType() {
+ public NodeType getType() {
return NodeType.APPLICATION;
}
@Override
- public NodeState getNodeState() {
+ public NodeState getState() {
return myState;
}
@Override
- public void setNodeState(NodeState newNodeState) {
+ public void setState(NodeState newState) {
// TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
- myState = newNodeState;
+ myState = newState;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
index f419ae2..a015909 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -29,6 +29,7 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode {
private String nodeName;
private OutPort outPort;
private InputDataObjectType inputDataObjectType;
+ private String name;
public WorkflowInputNodeImpl(String nodeId) {
this(nodeId, null);
@@ -40,29 +41,29 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode {
}
@Override
- public String getNodeId() {
+ public String getId() {
return this.nodeId;
}
@Override
- public String getNodeName() {
+ public String getName() {
return this.nodeName;
}
@Override
- public NodeType getNodeType() {
+ public NodeType getType() {
return NodeType.WORKFLOW_INPUT;
}
@Override
- public NodeState getNodeState() {
+ public NodeState getState() {
return myState;
}
@Override
- public void setNodeState(NodeState newNodeState) {
+ public void setState(NodeState newState) {
// TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
- myState = newNodeState;
+ myState = newState;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
index f8b0e0c..f875674 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
@@ -23,15 +23,15 @@ package org.apache.ariavata.simple.workflow.engine.dag.nodes;
public interface WorkflowNode {
- public String getNodeId();
+ public String getId();
- public String getNodeName();
+ public String getName();
- public NodeType getNodeType();
+ public NodeType getType();
- public NodeState getNodeState();
+ public NodeState getState();
- public void setNodeState(NodeState newNodeState);
+ public void setState(NodeState newState);
public boolean isReady();
http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
index aa7f0a3..a44c05f 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
@@ -42,34 +42,35 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
}
@Override
- public String getNodeId() {
+ public String getId() {
return this.nodeId;
}
@Override
- public String getNodeName() {
+ public String getName() {
return this.nodeName;
}
@Override
- public NodeType getNodeType() {
+ public NodeType getType() {
return NodeType.WORKFLOW_OUTPUT;
}
@Override
- public NodeState getNodeState() {
+ public NodeState getState() {
return myState;
}
@Override
- public void setNodeState(NodeState newNodeState) {
+ public void setState(NodeState newState) {
// TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
- myState = newNodeState;
+ myState = newState;
}
@Override
public boolean isReady() {
- return this.outputDataObjectType.getValue() != null && !this.outputDataObjectType.getValue().equals("");
+ return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null
+ || inPort.getInputObject().getValue().equals(""));
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
index e7ac5cb..2961fde 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
@@ -21,7 +21,6 @@
package org.apache.ariavata.simple.workflow.engine.parser;
-import com.sun.corba.se.pept.encoding.OutputObject;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.airavata.appcatalog.cpi.WorkflowCatalog;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
@@ -104,7 +103,7 @@ public class AiravataDefaultParser implements WorkflowParser {
}
for (Node gNode : gNodes) {
wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
- wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getNodeName()));
+ wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName()));
if (wfInputNode.getInputObject() == null) {
// TODO: throw an error and exit.
}
@@ -120,7 +119,7 @@ public class AiravataDefaultParser implements WorkflowParser {
private void buildModel(List<PortContainer> portContainerList) {
// end condition of recursive call.
- if (portContainerList == null || portContainerList.size() == 0) {
+ if (portContainerList == null || portContainerList.isEmpty()) {
return ;
}
DataPort dataPort = null;
@@ -132,13 +131,12 @@ public class AiravataDefaultParser implements WorkflowParser {
dataPort = portContainer.getDataPort();
inPort = portContainer.getInPort();
Node node = dataPort.getNode();
-// inPort.setInputObject(getInputDataObject(dataPort));
if (node instanceof WSNode) {
WSNode wsNode = (WSNode) node;
WorkflowNode wfNode = wfNodes.get(wsNode.getID());
if (wfNode == null) {
wfApplicationNode = createApplicationNode(wsNode);
- wfNodes.put(wfApplicationNode.getNodeId(), wfApplicationNode);
+ wfNodes.put(wfApplicationNode.getId(), wfApplicationNode);
nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
} else if (wfNode instanceof ApplicationNode) {
wfApplicationNode = (ApplicationNode) wfNode;
@@ -152,7 +150,8 @@ public class AiravataDefaultParser implements WorkflowParser {
OutputNode oNode = (OutputNode) node;
wfOutputNode = createWorkflowOutputNode(oNode);
wfOutputNode.setInPort(inPort);
- wfNodes.put(wfOutputNode.getNodeId(), wfOutputNode);
+ inPort.setNode(wfOutputNode);
+ wfNodes.put(wfOutputNode.getId(), wfOutputNode);
}
}
buildModel(nextPortContainerList);
@@ -169,8 +168,8 @@ public class AiravataDefaultParser implements WorkflowParser {
private ApplicationNode createApplicationNode(WSNode wsNode) {
ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
+ wsNode.getComponent().getApplication().getName(),
wsNode.getComponent().getApplication().getApplicationId());
-// wsNode.getComponent().getInputPorts()
return applicationNode;
}
@@ -197,7 +196,6 @@ public class AiravataDefaultParser implements WorkflowParser {
} else if (wfNode instanceof ApplicationNode) {
ApplicationNode applicationNode = ((ApplicationNode) wfNode);
applicationNode.addOutPort(outPort);
-// applicationNode.addInPort(inPort);
}
}
return portContainers;