You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:11 UTC

[12/50] [abbrv] airavata git commit: Implemented execution logic of the workflow data model

Implemented execution logic of the workflow data model


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/55319c96
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/55319c96
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/55319c96

Branch: refs/heads/master
Commit: 55319c96d2c7ace7ab7b458dbd4fd259649d2e8e
Parents: 27f6f1b
Author: shamrath <sh...@gmail.com>
Authored: Sun Feb 22 16:55:55 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Sun Feb 22 16:55:55 2015 -0500

----------------------------------------------------------------------
 .../engine/SimpleWorkflowInterpreter.java       | 92 +++++++++++++++++---
 .../workflow/engine/WorkflowFactoryImpl.java    |  4 +-
 .../simple/workflow/engine/WorkflowUtil.java    | 10 +++
 .../simple/workflow/engine/dag/port/InPort.java |  4 +
 .../workflow/engine/dag/port/InputPortIml.java  | 16 +++-
 .../engine/parser/AiravataDefaultParser.java    | 71 ++++++++++++---
 6 files changed, 169 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index e122fa6..3c2596d 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -49,6 +49,7 @@ import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
 import org.apache.ariavata.simple.workflow.engine.dag.port.InPort;
 import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort;
+import org.apache.ariavata.simple.workflow.engine.parser.AiravataDefaultParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class SimpleWorkflowInterpreter implements Runnable{
 
@@ -69,12 +71,18 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     private String credentialToken;
 
-    private Map<String, WorkflowNode> readList = new HashMap<String, WorkflowNode>();
-    private Map<String, WorkflowNode> waitingList = new HashMap<String, WorkflowNode>();
-    private Map<String, ProcessPack> processingQueue = new HashMap<String, ProcessPack>();
+    private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>();
+    private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
+    private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>();
     private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>();
     private Registry registry;
     private EventBus eventBus = new EventBus();
+    private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
+
+    public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException {
+        setExperiment(experimentId);
+        this.credentialToken = credentialToken;
+    }
 
     public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) {
         // read the workflow file and build the topology to a DAG. Then execute that dag
@@ -87,11 +95,14 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     public void launchWorkflow() throws Exception {
         // process workflow input nodes
-        WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
-        WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+//        WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
+//        WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+        WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken);
+        log.debug("Initialized workflow parser");
         setWorkflowInputNodes(workflowParser.parse());
+        log.debug("Parsed the workflow and got the workflow input nodes");
         processWorkflowInputNodes(getWorkflowInputNodes());
-        processReadyList();
+//        processReadyList();
         // process workflow application nodes
         // process workflow output nodes
     }
@@ -100,10 +111,17 @@ public class SimpleWorkflowInterpreter implements Runnable{
     private synchronized void processReadyList() {
         for (WorkflowNode readyNode : readList.values()) {
             try {
+                if (readyNode instanceof WorkflowOutputNode) {
+                    WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
+                    completeWorkflowOutputs.add(wfOutputNode);
+                    continue;
+                }
                 WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
                 TaskDetails process = getProcess(workflowNodeDetails);
-                addToProcessingQueue(new ProcessPack(readyNode, workflowNodeDetails, process));
-                publishToProcessQueue(process);
+                ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process);
+                addToProcessingQueue(processPack);
+//                publishToProcessQueue(process);
+                publishToProcessQueue(processPack);
             } catch (RegistryException e) {
                 // FIXME : handle this exception
             }
@@ -116,6 +134,41 @@ public class SimpleWorkflowInterpreter implements Runnable{
         //TODO: publish to process queue.
     }
 
+    // TODO : remove this test method
+    private void publishToProcessQueue(ProcessPack process) {
+        WorkflowNode workflowNode = process.getWorkflowNode();
+        if (workflowNode instanceof ApplicationNode) {
+            ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+            List<InPort> inputPorts = applicationNode.getInputPorts();
+            if (applicationNode.getNodeName().equals("Add")) {
+                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+            } else if (applicationNode.getNodeName().equals("Multiply")) {
+                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+            } else if (applicationNode.getNodeName().equals("Subtract")) {
+                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+            } else {
+                throw new RuntimeException("Invalid Application name");
+            }
+
+            for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) {
+                WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject());
+                if (edge.getToPort().getNode().isReady()) {
+                    addToReadyQueue(edge.getToPort().getNode());
+                } else {
+                    addToWaitingQueue(edge.getToPort().getNode());
+                }
+            }
+        } else if (workflowNode instanceof WorkflowOutputNode) {
+            WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode;
+            throw new RuntimeException("Workflow output node in processing queue");
+        }
+
+        processingQueue.remove(process.getTaskDetails().getTaskID());
+    }
+
     private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
         // create workflow taskDetails from workflowNodeDetails
         TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
@@ -165,13 +218,16 @@ public class SimpleWorkflowInterpreter implements Runnable{
         Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
         for (WorkflowInputNode wfInputNode : wfInputNodes) {
             if (wfInputNode.isReady()) {
+                log.debug("Workflow node : " + wfInputNode.getNodeId() + " is ready to execute");
                 for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
-                    edge.getToPort().setInputObject(
-                            WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getToPort().getInputObject()));
+                    edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
                     if (edge.getToPort().getNode().isReady()) {
                         addToReadyQueue(edge.getToPort().getNode());
+                        log.debug("Added workflow node : " + edge.getToPort().getNode().getNodeId() + " to the readyQueue");
                     } else {
                         addToWaitingQueue(edge.getToPort().getNode());
+                        log.debug("Added workflow node " + edge.getToPort().getNode().getNodeId() + " to the waitingQueue");
+
                     }
                 }
             }
@@ -213,6 +269,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
     @Subscribe
     public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
         String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+        log.debug("Task Output changed event received for workflow node : " +
+                taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
         ProcessPack processPack = processingQueue.get(taskId);
         Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
         if (processPack != null) {
@@ -236,6 +294,7 @@ public class SimpleWorkflowInterpreter implements Runnable{
                 }
             }
             processingQueue.remove(taskId);
+            log.debug("removed task from processing queue : " + taskId);
         }
 
     }
@@ -257,12 +316,12 @@ public class SimpleWorkflowInterpreter implements Runnable{
                 case INPUT_DATA_STAGING:
                     processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
                     break;
-                case OUTPUT_DATA_STAGING:
-                    processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
-                    break;
                 case EXECUTING:
                     processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING);
                     break;
+                case OUTPUT_DATA_STAGING:
+                    processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+                    break;
                 case POST_PROCESSING:
                     processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
                     break;
@@ -327,15 +386,22 @@ public class SimpleWorkflowInterpreter implements Runnable{
     public void run() {
         // TODO: Auto generated method body.
         try {
+            log.debug("Launching workflow");
             launchWorkflow();
             while (!(waitingList.isEmpty() && readList.isEmpty())) {
                 processReadyList();
+                Thread.sleep(1000);
             }
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
 
+    private void setExperiment(String experimentId) throws RegistryException {
+        experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId);
+        log.debug("Retrieve Experiment for experiment id : " + experimentId);
+    }
+
 
     class TempPublisher implements Runnable {
         private TaskDetails tempTaskDetails;

http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java
index a6173ac..dd84df0 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowFactoryImpl.java
@@ -33,13 +33,15 @@ public class WorkflowFactoryImpl implements WorkflowFactory {
 
     private WorkflowParser workflowParser;
 
+    private static final String synch = "sync";
+
     private WorkflowFactoryImpl(){
 
     }
 
     public static WorkflowFactoryImpl getInstance() {
         if (workflowFactoryImpl == null) {
-            synchronized (workflowFactoryImpl) {
+            synchronized (synch) {
                 if (workflowFactoryImpl == null) {
                     workflowFactoryImpl = new WorkflowFactoryImpl();
                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
index d4bbad3..688b170 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
@@ -41,6 +41,9 @@ public class WorkflowUtil {
                 && !fromInputObj.getApplicationArgument().trim().equals("")) {
             toInputObj.setApplicationArgument(fromInputObj.getApplicationArgument());
         }
+        if (toInputObj.getType() == null) {
+            toInputObj.setType(fromInputObj.getType());
+        }
         return fromInputObj;
     }
 
@@ -50,4 +53,11 @@ public class WorkflowUtil {
     }
 
 
+    public static OutputDataObjectType copyValues(InputDataObjectType inputObject, OutputDataObjectType outputObject) {
+        if (outputObject == null) {
+            outputObject = new OutputDataObjectType();
+        }
+        outputObject.setValue(inputObject.getValue());
+        return outputObject;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java
index c635bef..bac10ee 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InPort.java
@@ -34,4 +34,8 @@ public interface InPort extends Port {
 
     public void addEdge(Edge edge);
 
+    public String getDefaultValue();
+
+    public void setDefaultValue(String defaultValue);
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java
index 1971a1d..82160a9 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/port/InputPortIml.java
@@ -26,10 +26,11 @@ import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
 public class InputPortIml implements InPort {
 
     private InputDataObjectType inputDataObjectType;
-    private boolean isSatisfy = false;
+    private boolean ready = false;
     private String portId;
     private Edge edge;
     private WorkflowNode node;
+    private String defaultValue;
 
     public InputPortIml(String portId) {
         this.portId = portId;
@@ -38,6 +39,8 @@ public class InputPortIml implements InPort {
     @Override
     public void setInputObject(InputDataObjectType inputObject) {
         this.inputDataObjectType = inputObject;
+        ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+                || !inputDataObjectType.isIsRequired();
     }
 
     @Override
@@ -56,8 +59,17 @@ public class InputPortIml implements InPort {
     }
 
     @Override
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(String defaultValue) {
+        this.defaultValue = defaultValue;
+    }
+
+    @Override
     public boolean isReady() {
-        return inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("");
+        return getInputObject() != null && inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/55319c96/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
index 39e422a..e7ac5cb 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/parser/AiravataDefaultParser.java
@@ -21,6 +21,7 @@
 
 package org.apache.ariavata.simple.workflow.engine.parser;
 
+import com.sun.corba.se.pept.encoding.OutputObject;
 import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.airavata.appcatalog.cpi.WorkflowCatalog;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
@@ -41,6 +42,7 @@ import org.apache.airavata.workflow.model.graph.GraphException;
 import org.apache.airavata.workflow.model.graph.Node;
 import org.apache.airavata.workflow.model.graph.impl.NodeImpl;
 import org.apache.airavata.workflow.model.graph.system.OutputNode;
+import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
 import org.apache.airavata.workflow.model.graph.ws.WSNode;
 import org.apache.airavata.workflow.model.graph.ws.WSPort;
 import org.apache.airavata.workflow.model.wf.Workflow;
@@ -100,9 +102,6 @@ public class AiravataDefaultParser implements WorkflowParser {
         for (InputDataObjectType dataObjectType : experimentInputs) {
             inputDataMap.put(dataObjectType.getName(), dataObjectType);
         }
-        OutPort outPort = null;
-        InPort inPort = null;
-        Edge edge = null;
         for (Node gNode : gNodes) {
             wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
             wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getNodeName()));
@@ -133,13 +132,12 @@ public class AiravataDefaultParser implements WorkflowParser {
             dataPort = portContainer.getDataPort();
             inPort = portContainer.getInPort();
             Node node = dataPort.getNode();
-            inPort.setInputObject(getInputDataObject(dataPort));
+//            inPort.setInputObject(getInputDataObject(dataPort));
             if (node instanceof WSNode) {
                 WSNode wsNode = (WSNode) node;
                 WorkflowNode wfNode = wfNodes.get(wsNode.getID());
                 if (wfNode == null) {
-                    wfApplicationNode = new ApplicationNodeImpl(wsNode.getID(),
-                            wsNode.getComponent().getApplication().getApplicationId());
+                    wfApplicationNode = createApplicationNode(wsNode);
                     wfNodes.put(wfApplicationNode.getNodeId(), wfApplicationNode);
                     nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
                 } else if (wfNode instanceof ApplicationNode) {
@@ -152,7 +150,7 @@ public class AiravataDefaultParser implements WorkflowParser {
 
             }else if (node instanceof OutputNode) {
                 OutputNode oNode = (OutputNode) node;
-                wfOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName());
+                wfOutputNode = createWorkflowOutputNode(oNode);
                 wfOutputNode.setInPort(inPort);
                 wfNodes.put(wfOutputNode.getNodeId(), wfOutputNode);
             }
@@ -161,18 +159,33 @@ public class AiravataDefaultParser implements WorkflowParser {
 
     }
 
+    private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) {
+        WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName());
+        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+        outputDataObjectType.setType(oNode.getParameterType());
+        workflowOutputNode.setOutputObject(outputDataObjectType);
+        return workflowOutputNode;
+    }
+
+    private ApplicationNode createApplicationNode(WSNode wsNode) {
+        ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
+                wsNode.getComponent().getApplication().getApplicationId());
+//        wsNode.getComponent().getInputPorts()
+        return applicationNode;
+    }
+
     private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) {
         OutPort outPort ;
         Edge edge;
         InPort inPort = null;
         List<PortContainer> portContainers = new ArrayList<PortContainer>();
         for (DataPort dataPort : node.getOutputPorts()) {
-            outPort = new OutPortImpl(dataPort.getID());
+            outPort = createOutPort(dataPort);
             for (DataEdge dataEdge : dataPort.getEdges()) {
                 edge = new DirectedEdge();
                 edge.setFromPort(outPort);
                 outPort.addEdge(edge);
-                inPort = getInPort(dataEdge.getToPort());
+                inPort = createInPort(dataEdge.getToPort());
                 edge.setToPort(inPort);
                 inPort.addEdge(edge);
                 portContainers.add(new PortContainer(dataEdge.getToPort(), inPort));
@@ -181,7 +194,7 @@ public class AiravataDefaultParser implements WorkflowParser {
             if (wfNode instanceof WorkflowInputNode) {
                 WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode;
                 workflowInputNode.setOutPort(outPort);
-            }else if (wfNode instanceof ApplicationNode) {
+            } else if (wfNode instanceof ApplicationNode) {
                 ApplicationNode applicationNode = ((ApplicationNode) wfNode);
                 applicationNode.addOutPort(outPort);
 //                applicationNode.addInPort(inPort);
@@ -190,8 +203,42 @@ public class AiravataDefaultParser implements WorkflowParser {
         return portContainers;
     }
 
-    private InPort getInPort(DataPort toPort) {
-        return new InputPortIml(toPort.getID());
+    private OutPort createOutPort(DataPort dataPort) {
+        OutPortImpl outPort = new OutPortImpl(dataPort.getID());
+        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+        if (dataPort instanceof WSPort) {
+            WSPort wsPort = (WSPort) dataPort;
+            outputDataObjectType.setName(wsPort.getFromNode().getName());
+            outputDataObjectType.setType(wsPort.getType());
+        }else if (dataPort instanceof SystemDataPort) {
+            SystemDataPort sysPort = (SystemDataPort) dataPort;
+            outputDataObjectType.setName(sysPort.getFromNode().getName());
+            outputDataObjectType.setType(sysPort.getType());
+        }
+
+        outPort.setOutputObject(outputDataObjectType);
+        return outPort;
+    }
+
+    private InPort createInPort(DataPort toPort) {
+        InPort inPort = new InputPortIml(toPort.getID());
+        InputDataObjectType inputDataObjectType = new InputDataObjectType();
+        if (toPort instanceof WSPort) {
+            WSPort wsPort = (WSPort) toPort;
+            inputDataObjectType.setName(wsPort.getName());
+            inputDataObjectType.setType(wsPort.getType());
+            inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument());
+            inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional());
+            inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder());
+
+            inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue());
+        }else if (toPort instanceof SystemDataPort) {
+            SystemDataPort sysPort = (SystemDataPort) toPort;
+            inputDataObjectType.setName(sysPort.getName());
+            inputDataObjectType.setType(sysPort.getType());
+        }
+        inPort.setInputObject(inputDataObjectType);
+        return inPort;
     }
 
     private InputDataObjectType getInputDataObject(DataPort dataPort) {