You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:12 UTC

[13/50] [abbrv] airavata git commit: Fixed AIRAVATA-1571, and refactored the code

Fixed AIRAVATA-1571, and refactored the code


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8835097e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8835097e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8835097e

Branch: refs/heads/master
Commit: 8835097ed697583601bde6039801a95dbdd33422
Parents: 55319c9
Author: shamrath <sh...@gmail.com>
Authored: Mon Feb 23 14:14:26 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Mon Feb 23 14:14:26 2015 -0500

----------------------------------------------------------------------
 .../engine/SimpleWorkflowInterpreter.java       | 50 +++++++++++---------
 .../engine/dag/nodes/ApplicationNodeImpl.java   | 28 ++++++-----
 .../engine/dag/nodes/WorkflowInputNodeImpl.java | 13 ++---
 .../workflow/engine/dag/nodes/WorkflowNode.java | 10 ++--
 .../dag/nodes/WorkflowOutputNodeImpl.java       | 15 +++---
 .../engine/parser/AiravataDefaultParser.java    | 14 +++---
 6 files changed, 70 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index 3c2596d..93b3bc0 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -102,9 +102,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
         setWorkflowInputNodes(workflowParser.parse());
         log.debug("Parsed the workflow and got the workflow input nodes");
         processWorkflowInputNodes(getWorkflowInputNodes());
-//        processReadyList();
-        // process workflow application nodes
-        // process workflow output nodes
     }
 
     // try to remove synchronization tag
@@ -113,7 +110,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
             try {
                 if (readyNode instanceof WorkflowOutputNode) {
                     WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
-                    completeWorkflowOutputs.add(wfOutputNode);
+                    wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+                    addToCompleteOutputNodeList(wfOutputNode);
                     continue;
                 }
                 WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
@@ -128,6 +126,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
         }
     }
 
+
     private void publishToProcessQueue(TaskDetails process) {
         Thread thread = new Thread(new TempPublisher(process, eventBus));
         thread.start();
@@ -140,13 +139,13 @@ public class SimpleWorkflowInterpreter implements Runnable{
         if (workflowNode instanceof ApplicationNode) {
             ApplicationNode applicationNode = (ApplicationNode) workflowNode;
             List<InPort> inputPorts = applicationNode.getInputPorts();
-            if (applicationNode.getNodeName().equals("Add")) {
+            if (applicationNode.getName().equals("Add")) {
                 applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
                         Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
-            } else if (applicationNode.getNodeName().equals("Multiply")) {
+            } else if (applicationNode.getName().equals("Multiply")) {
                 applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
                         Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
-            } else if (applicationNode.getNodeName().equals("Subtract")) {
+            } else if (applicationNode.getName().equals("Subtract")) {
                 applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
                         Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
             } else {
@@ -178,7 +177,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
     }
 
     private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
-        WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getNodeId(), null);
+        WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
         ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
         String executionData = null;
         if (readyNode instanceof ApplicationNode) {
@@ -218,15 +217,15 @@ public class SimpleWorkflowInterpreter implements Runnable{
         Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
         for (WorkflowInputNode wfInputNode : wfInputNodes) {
             if (wfInputNode.isReady()) {
-                log.debug("Workflow node : " + wfInputNode.getNodeId() + " is ready to execute");
+                log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
                 for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
                     edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
                     if (edge.getToPort().getNode().isReady()) {
                         addToReadyQueue(edge.getToPort().getNode());
-                        log.debug("Added workflow node : " + edge.getToPort().getNode().getNodeId() + " to the readyQueue");
+                        log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
                     } else {
                         addToWaitingQueue(edge.getToPort().getNode());
-                        log.debug("Added workflow node " + edge.getToPort().getNode().getNodeId() + " to the waitingQueue");
+                        log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
 
                     }
                 }
@@ -311,25 +310,25 @@ public class SimpleWorkflowInterpreter implements Runnable{
                 case STARTED:
                     break;
                 case PRE_PROCESSING:
-                    processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+                    processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
                     break;
                 case INPUT_DATA_STAGING:
-                    processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+                    processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
                     break;
                 case EXECUTING:
-                    processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING);
+                    processPack.getWorkflowNode().setState(NodeState.EXECUTING);
                     break;
                 case OUTPUT_DATA_STAGING:
-                    processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+                    processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
                     break;
                 case POST_PROCESSING:
-                    processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+                    processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
                     break;
                 case COMPLETED:
-                    processPack.getWorkflowNode().setNodeState(NodeState.EXECUTED);
+                    processPack.getWorkflowNode().setState(NodeState.EXECUTED);
                     break;
                 case FAILED:
-                    processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
+                    processPack.getWorkflowNode().setState(NodeState.FAILED);
                     break;
                 case UNKNOWN:
                     break;
@@ -337,7 +336,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
                     break;
                 case CANCELED:
                 case CANCELING:
-                    processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
+                    processPack.getWorkflowNode().setState(NodeState.FAILED);
                     break;
                 default:
                     break;
@@ -358,12 +357,12 @@ public class SimpleWorkflowInterpreter implements Runnable{
      * @param workflowNode - Workflow Node
      */
     private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
-        waitingList.remove(workflowNode.getNodeId());
-        readList.put(workflowNode.getNodeId(), workflowNode);
+        waitingList.remove(workflowNode.getId());
+        readList.put(workflowNode.getId(), workflowNode);
     }
 
     private void addToWaitingQueue(WorkflowNode workflowNode) {
-        waitingList.put(workflowNode.getNodeId(), workflowNode);
+        waitingList.put(workflowNode.getId(), workflowNode);
     }
 
     /**
@@ -372,7 +371,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
      * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails
      */
     private synchronized void addToProcessingQueue(ProcessPack processPack) {
-        readList.remove(processPack.getWorkflowNode().getNodeId());
+        readList.remove(processPack.getWorkflowNode().getId());
         processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack);
     }
 
@@ -382,6 +381,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
     }
 
 
+    private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
+        completeWorkflowOutputs.add(wfOutputNode);
+        readList.remove(wfOutputNode.getId());
+    }
+
     @Override
     public void run() {
         // TODO: Auto generated method body.

http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
index dd4415a..1282dd0 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
@@ -34,40 +34,46 @@ public class ApplicationNodeImpl implements ApplicationNode {
     private String applicationId;
     private List<InPort> inPorts = new ArrayList<InPort>();
     private List<OutPort> outPorts = new ArrayList<OutPort>();
+    private String applicationName;
 
-    public ApplicationNodeImpl(String nodeId) {
-        this(nodeId, null);
-    }
+//    public ApplicationNodeImpl(String nodeId) {
+//        this(nodeId, null);
+//    }
+//
+//    public ApplicationNodeImpl(String nodeId, String applicationId) {
+//        this(nodeId, null, applicationId);
+//    }
 
-    public ApplicationNodeImpl(String nodeId, String applicationId) {
+    public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) {
         this.nodeId = nodeId;
+        this.applicationName = applicationName;
         this.applicationId = applicationId;
     }
 
     @Override
-    public String getNodeId() {
+    public String getId() {
         return this.nodeId;
     }
 
     @Override
-    public String getNodeName() {
-        return this.getNodeName();
+    public String getName() {
+        return applicationName;
     }
 
     @Override
-    public NodeType getNodeType() {
+    public NodeType getType() {
         return NodeType.APPLICATION;
     }
 
     @Override
-    public NodeState getNodeState() {
+    public NodeState getState() {
         return myState;
     }
 
     @Override
-    public void setNodeState(NodeState newNodeState) {
+    public void setState(NodeState newState) {
         // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
-        myState = newNodeState;
+        myState = newState;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
index f419ae2..a015909 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -29,6 +29,7 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode {
     private String nodeName;
     private OutPort outPort;
     private InputDataObjectType inputDataObjectType;
+    private String name;
 
     public WorkflowInputNodeImpl(String nodeId) {
         this(nodeId, null);
@@ -40,29 +41,29 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode {
     }
 
     @Override
-    public String getNodeId() {
+    public String getId() {
         return this.nodeId;
     }
 
     @Override
-    public String getNodeName() {
+    public String getName() {
         return this.nodeName;
     }
 
     @Override
-    public NodeType getNodeType() {
+    public NodeType getType() {
         return NodeType.WORKFLOW_INPUT;
     }
 
     @Override
-    public NodeState getNodeState() {
+    public NodeState getState() {
         return myState;
     }
 
     @Override
-    public void setNodeState(NodeState newNodeState) {
+    public void setState(NodeState newState) {
         // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
-        myState = newNodeState;
+        myState = newState;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
index f8b0e0c..f875674 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
@@ -23,15 +23,15 @@ package org.apache.ariavata.simple.workflow.engine.dag.nodes;
 
 public interface WorkflowNode {
 
-    public String getNodeId();
+    public String getId();
 
-    public String getNodeName();
+    public String getName();
 
-    public NodeType getNodeType();
+    public NodeType getType();
 
-    public NodeState getNodeState();
+    public NodeState getState();
 
-    public void setNodeState(NodeState newNodeState);
+    public void setState(NodeState newState);
 
     public boolean isReady();
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
index aa7f0a3..a44c05f 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
@@ -42,34 +42,35 @@ public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
     }
 
     @Override
-    public String getNodeId() {
+    public String getId() {
         return this.nodeId;
     }
 
     @Override
-    public String getNodeName() {
+    public String getName() {
         return this.nodeName;
     }
 
     @Override
-    public NodeType getNodeType() {
+    public NodeType getType() {
         return NodeType.WORKFLOW_OUTPUT;
     }
 
     @Override
-    public NodeState getNodeState() {
+    public NodeState getState() {
         return myState;
     }
 
     @Override
-    public void setNodeState(NodeState newNodeState) {
+    public void setState(NodeState newState) {
         // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
-        myState = newNodeState;
+        myState = newState;
     }
 
     @Override
     public boolean isReady() {
-        return this.outputDataObjectType.getValue() != null && !this.outputDataObjectType.getValue().equals("");
+        return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null
+                || inPort.getInputObject().getValue().equals(""));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/8835097e/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
index e7ac5cb..2961fde 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
@@ -21,7 +21,6 @@
 
 package org.apache.ariavata.simple.workflow.engine.parser;
 
-import com.sun.corba.se.pept.encoding.OutputObject;
 import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.airavata.appcatalog.cpi.WorkflowCatalog;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
@@ -104,7 +103,7 @@ public class AiravataDefaultParser implements WorkflowParser {
         }
         for (Node gNode : gNodes) {
             wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
-            wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getNodeName()));
+            wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName()));
             if (wfInputNode.getInputObject() == null) {
                 // TODO: throw an error and exit.
             }
@@ -120,7 +119,7 @@ public class AiravataDefaultParser implements WorkflowParser {
 
     private void buildModel(List<PortContainer> portContainerList) {
         // end condition of recursive call.
-        if (portContainerList == null || portContainerList.size() == 0) {
+        if (portContainerList == null || portContainerList.isEmpty()) {
             return ;
         }
         DataPort dataPort = null;
@@ -132,13 +131,12 @@ public class AiravataDefaultParser implements WorkflowParser {
             dataPort = portContainer.getDataPort();
             inPort = portContainer.getInPort();
             Node node = dataPort.getNode();
-//            inPort.setInputObject(getInputDataObject(dataPort));
             if (node instanceof WSNode) {
                 WSNode wsNode = (WSNode) node;
                 WorkflowNode wfNode = wfNodes.get(wsNode.getID());
                 if (wfNode == null) {
                     wfApplicationNode = createApplicationNode(wsNode);
-                    wfNodes.put(wfApplicationNode.getNodeId(), wfApplicationNode);
+                    wfNodes.put(wfApplicationNode.getId(), wfApplicationNode);
                     nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
                 } else if (wfNode instanceof ApplicationNode) {
                     wfApplicationNode = (ApplicationNode) wfNode;
@@ -152,7 +150,8 @@ public class AiravataDefaultParser implements WorkflowParser {
                 OutputNode oNode = (OutputNode) node;
                 wfOutputNode = createWorkflowOutputNode(oNode);
                 wfOutputNode.setInPort(inPort);
-                wfNodes.put(wfOutputNode.getNodeId(), wfOutputNode);
+                inPort.setNode(wfOutputNode);
+                wfNodes.put(wfOutputNode.getId(), wfOutputNode);
             }
         }
         buildModel(nextPortContainerList);
@@ -169,8 +168,8 @@ public class AiravataDefaultParser implements WorkflowParser {
 
     private ApplicationNode createApplicationNode(WSNode wsNode) {
         ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
+                wsNode.getComponent().getApplication().getName(),
                 wsNode.getComponent().getApplication().getApplicationId());
-//        wsNode.getComponent().getInputPorts()
         return applicationNode;
     }
 
@@ -197,7 +196,6 @@ public class AiravataDefaultParser implements WorkflowParser {
             } else if (wfNode instanceof ApplicationNode) {
                 ApplicationNode applicationNode = ((ApplicationNode) wfNode);
                 applicationNode.addOutPort(outPort);
-//                applicationNode.addInPort(inPort);
             }
         }
         return portContainers;