You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:15 UTC

[16/50] [abbrv] airavata git commit: Renamed the wrong package name

Renamed the wrong package name


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6bfb9563
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6bfb9563
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6bfb9563

Branch: refs/heads/master
Commit: 6bfb9563a049b943c2ab2287f468a38f8eaf9b78
Parents: 8835097
Author: shamrath <sh...@gmail.com>
Authored: Mon Feb 23 16:51:21 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Mon Feb 23 16:51:21 2015 -0500

----------------------------------------------------------------------
 .../simple/workflow/engine/ProcessPack.java     |  62 +++
 .../engine/SimpleWorkflowInterpreter.java       | 470 +++++++++++++++++++
 .../simple/workflow/engine/WorkflowFactory.java |  31 ++
 .../workflow/engine/WorkflowFactoryImpl.java    |  66 +++
 .../simple/workflow/engine/WorkflowParser.java  |  32 ++
 .../simple/workflow/engine/WorkflowUtil.java    |  63 +++
 .../workflow/engine/dag/edge/DirectedEdge.java  |  52 ++
 .../simple/workflow/engine/dag/edge/Edge.java   |  43 ++
 .../engine/dag/nodes/ApplicationNode.java       |  41 ++
 .../engine/dag/nodes/ApplicationNodeImpl.java   | 113 +++++
 .../workflow/engine/dag/nodes/NodeState.java    |  34 ++
 .../workflow/engine/dag/nodes/NodeType.java     |  28 ++
 .../engine/dag/nodes/WorkflowInputNode.java     |  37 ++
 .../engine/dag/nodes/WorkflowInputNodeImpl.java |  96 ++++
 .../workflow/engine/dag/nodes/WorkflowNode.java |  38 ++
 .../engine/dag/nodes/WorkflowOutputNode.java    |  37 ++
 .../dag/nodes/WorkflowOutputNodeImpl.java       |  97 ++++
 .../simple/workflow/engine/dag/port/InPort.java |  41 ++
 .../workflow/engine/dag/port/InputPortIml.java  |  90 ++++
 .../workflow/engine/dag/port/OutPort.java       |  39 ++
 .../workflow/engine/dag/port/OutPortImpl.java   |  83 ++++
 .../simple/workflow/engine/dag/port/Port.java   |  36 ++
 .../engine/parser/AiravataDefaultParser.java    | 293 ++++++++++++
 .../workflow/engine/parser/PortContainer.java   |  53 +++
 .../simple/workflow/engine/ProcessPack.java     |  62 ---
 .../engine/SimpleWorkflowInterpreter.java       | 470 -------------------
 .../simple/workflow/engine/WorkflowFactory.java |  31 --
 .../workflow/engine/WorkflowFactoryImpl.java    |  66 ---
 .../simple/workflow/engine/WorkflowParser.java  |  32 --
 .../simple/workflow/engine/WorkflowUtil.java    |  63 ---
 .../workflow/engine/dag/edge/DirectedEdge.java  |  52 --
 .../simple/workflow/engine/dag/edge/Edge.java   |  43 --
 .../engine/dag/nodes/ApplicationNode.java       |  41 --
 .../engine/dag/nodes/ApplicationNodeImpl.java   | 113 -----
 .../workflow/engine/dag/nodes/NodeState.java    |  34 --
 .../workflow/engine/dag/nodes/NodeType.java     |  28 --
 .../engine/dag/nodes/WorkflowInputNode.java     |  37 --
 .../engine/dag/nodes/WorkflowInputNodeImpl.java |  96 ----
 .../workflow/engine/dag/nodes/WorkflowNode.java |  38 --
 .../engine/dag/nodes/WorkflowOutputNode.java    |  37 --
 .../dag/nodes/WorkflowOutputNodeImpl.java       |  97 ----
 .../simple/workflow/engine/dag/port/InPort.java |  41 --
 .../workflow/engine/dag/port/InputPortIml.java  |  90 ----
 .../workflow/engine/dag/port/OutPort.java       |  39 --
 .../workflow/engine/dag/port/OutPortImpl.java   |  83 ----
 .../simple/workflow/engine/dag/port/Port.java   |  36 --
 .../engine/parser/AiravataDefaultParser.java    | 293 ------------
 .../workflow/engine/parser/PortContainer.java   |  53 ---
 .../simple/workflow/engine/WorkflowDAGTest.java |  46 ++
 .../parser/AiravataDefaultParserTest.java       | 119 +++++
 .../simple/workflow/engine/WorkflowDAGTest.java |  46 --
 .../parser/AiravataDefaultParserTest.java       | 119 -----
 52 files changed, 2140 insertions(+), 2140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
new file mode 100644
index 0000000..b58b947
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/ProcessPack.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class ProcessPack {
+    private WorkflowNode workflowNode;
+    private WorkflowNodeDetails wfNodeDetails;
+    private TaskDetails taskDetails;
+
+    public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
+        this.workflowNode = workflowNode;
+        this.wfNodeDetails = wfNodeDetails;
+        this.taskDetails = taskDetails;
+    }
+
+    public WorkflowNode getWorkflowNode() {
+        return workflowNode;
+    }
+
+    public void setWorkflowNode(WorkflowNode workflowNode) {
+        this.workflowNode = workflowNode;
+    }
+
+    public WorkflowNodeDetails getWfNodeDetails() {
+        return wfNodeDetails;
+    }
+
+    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+        this.wfNodeDetails = wfNodeDetails;
+    }
+
+    public TaskDetails getTaskDetails() {
+        return taskDetails;
+    }
+
+    public void setTaskDetails(TaskDetails taskDetails) {
+        this.taskDetails = taskDetails;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
new file mode 100644
index 0000000..6dcb8bd
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -0,0 +1,470 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.NodeState;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SimpleWorkflowInterpreter implements Runnable{
+
+    private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
+
+    private List<WorkflowInputNode> workflowInputNodes;
+
+    private Experiment experiment;
+
+    private String credentialToken;
+
+    private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>();
+    private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
+    private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>();
+    private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>();
+    private Registry registry;
+    private EventBus eventBus = new EventBus();
+    private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
+
+    public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException {
+        setExperiment(experimentId);
+        this.credentialToken = credentialToken;
+    }
+
+    public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) {
+        // read the workflow file and build the topology to a DAG. Then execute that dag
+        // get workflowInputNode list and start processing
+        // next() will return ready task and block the thread if no task in ready state.
+        this.experiment = experiment;
+        this.credentialToken = credentialStoreToken;
+    }
+
+
+    public void launchWorkflow() throws Exception {
+        // process workflow input nodes
+//        WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
+//        WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+        WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken);
+        log.debug("Initialized workflow parser");
+        setWorkflowInputNodes(workflowParser.parse());
+        log.debug("Parsed the workflow and got the workflow input nodes");
+        processWorkflowInputNodes(getWorkflowInputNodes());
+    }
+
+    // try to remove synchronization tag
+    private synchronized void processReadyList() {
+        for (WorkflowNode readyNode : readList.values()) {
+            try {
+                if (readyNode instanceof WorkflowOutputNode) {
+                    WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
+                    wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+                    addToCompleteOutputNodeList(wfOutputNode);
+                    continue;
+                }
+                WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
+                TaskDetails process = getProcess(workflowNodeDetails);
+                ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process);
+                addToProcessingQueue(processPack);
+//                publishToProcessQueue(process);
+                publishToProcessQueue(processPack);
+            } catch (RegistryException e) {
+                // FIXME : handle this exception
+            }
+        }
+    }
+
+
+    private void publishToProcessQueue(TaskDetails process) {
+        Thread thread = new Thread(new TempPublisher(process, eventBus));
+        thread.start();
+        //TODO: publish to process queue.
+    }
+
+    // TODO : remove this test method
+    private void publishToProcessQueue(ProcessPack process) {
+        WorkflowNode workflowNode = process.getWorkflowNode();
+        if (workflowNode instanceof ApplicationNode) {
+            ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+            List<InPort> inputPorts = applicationNode.getInputPorts();
+            if (applicationNode.getName().equals("Add")) {
+                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+            } else if (applicationNode.getName().equals("Multiply")) {
+                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+            } else if (applicationNode.getName().equals("Subtract")) {
+                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
+                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
+            } else {
+                throw new RuntimeException("Invalid Application name");
+            }
+
+            for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) {
+                WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject());
+                if (edge.getToPort().getNode().isReady()) {
+                    addToReadyQueue(edge.getToPort().getNode());
+                } else {
+                    addToWaitingQueue(edge.getToPort().getNode());
+                }
+            }
+        } else if (workflowNode instanceof WorkflowOutputNode) {
+            WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode;
+            throw new RuntimeException("Workflow output node in processing queue");
+        }
+
+        processingQueue.remove(process.getTaskDetails().getTaskID());
+    }
+
+    private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
+        // create workflow taskDetails from workflowNodeDetails
+        TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
+        taskDetails.setTaskID(getRegistry()
+                .add(ChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString());
+        return taskDetails;
+    }
+
+    private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
+        WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
+        ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
+        String executionData = null;
+        if (readyNode instanceof ApplicationNode) {
+            executionUnit = ExecutionUnit.APPLICATION;
+            executionData = ((ApplicationNode) readyNode).getApplicationId();
+        } else if (readyNode instanceof WorkflowInputNode) {
+            executionUnit = ExecutionUnit.INPUT;
+        } else if (readyNode instanceof WorkflowOutputNode) {
+            executionUnit = ExecutionUnit.OUTPUT;
+        }
+        wfNodeDetails.setExecutionUnit(executionUnit);
+        wfNodeDetails.setExecutionUnitData(executionData);
+        setupNodeDetailsInput(readyNode, wfNodeDetails);
+        wfNodeDetails.setNodeInstanceId((String) getRegistry()
+                .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
+//        nodeInstanceList.put(node, wfNodeDetails);
+        return wfNodeDetails;
+    }
+
+    private void setupNodeDetailsInput(WorkflowNode readyNode, WorkflowNodeDetails wfNodeDetails) {
+        if (readyNode instanceof ApplicationNode) {
+            ApplicationNode applicationNode = (ApplicationNode) readyNode;
+            if (applicationNode.isReady()) {
+                for (InPort inPort : applicationNode.getInputPorts()) {
+                    wfNodeDetails.addToNodeInputs(inPort.getInputObject());
+                }
+            } else {
+                // TODO: handle this scenario properly.
+            }
+        } else {
+            // TODO: do we support for other type of workflow nodes ?
+        }
+    }
+
+
+    private void processWorkflowInputNodes(List<WorkflowInputNode> wfInputNodes) {
+        Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
+        for (WorkflowInputNode wfInputNode : wfInputNodes) {
+            if (wfInputNode.isReady()) {
+                log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
+                for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
+                    edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
+                    if (edge.getToPort().getNode().isReady()) {
+                        addToReadyQueue(edge.getToPort().getNode());
+                        log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
+                    } else {
+                        addToWaitingQueue(edge.getToPort().getNode());
+                        log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
+
+                    }
+                }
+            }
+        }
+    }
+
+
+    public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception {
+        return workflowInputNodes;
+    }
+
+    public void setWorkflowInputNodes(List<WorkflowInputNode> workflowInputNodes) {
+        this.workflowInputNodes = workflowInputNodes;
+    }
+
+
+    private List<WorkflowInputNode> parseWorkflowDescription(){
+        return null;
+    }
+
+
+    private Registry getRegistry() throws RegistryException {
+        if (registry==null){
+            registry = RegistryFactory.getDefaultRegistry();
+        }
+        return registry;
+    }
+
+    public Experiment getExperiment() {
+        return experiment;
+    }
+
+    private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{
+        WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state);
+        wfNodeDetails.setWorkflowNodeStatus(status);
+        getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
+    }
+
+    @Subscribe
+    public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
+        String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
+        log.debug("Task Output changed event received for workflow node : " +
+                taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+        ProcessPack processPack = processingQueue.get(taskId);
+        Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+        if (processPack != null) {
+            WorkflowNode workflowNode = processPack.getWorkflowNode();
+            if (workflowNode instanceof ApplicationNode) {
+                ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+                // Workflow node can have one to many output ports and each output port can have one to many links
+                for (OutPort outPort : applicationNode.getOutputPorts()) {
+                    for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) {
+                        if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+                            outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+                            break;
+                        }
+                    }
+                    for (Edge edge : outPort.getOutEdges()) {
+                        WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject());
+                        if (edge.getToPort().getNode().isReady()) {
+                            addToReadyQueue(edge.getToPort().getNode());
+                        }
+                    }
+                }
+            }
+            processingQueue.remove(taskId);
+            log.debug("removed task from processing queue : " + taskId);
+        }
+
+    }
+
+    @Subscribe
+    public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
+        String taskId = taskStatus.getTaskIdentity().getTaskId();
+        ProcessPack processPack = processingQueue.get(taskId);
+        if (processPack != null) {
+            WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
+            switch (taskStatus.getState()) {
+                case WAITING:
+                    break;
+                case STARTED:
+                    break;
+                case PRE_PROCESSING:
+                    processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                    break;
+                case INPUT_DATA_STAGING:
+                    processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                    break;
+                case EXECUTING:
+                    processPack.getWorkflowNode().setState(NodeState.EXECUTING);
+                    break;
+                case OUTPUT_DATA_STAGING:
+                    processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                    break;
+                case POST_PROCESSING:
+                    processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                    break;
+                case COMPLETED:
+                    processPack.getWorkflowNode().setState(NodeState.EXECUTED);
+                    break;
+                case FAILED:
+                    processPack.getWorkflowNode().setState(NodeState.FAILED);
+                    break;
+                case UNKNOWN:
+                    break;
+                case CONFIGURING_WORKSPACE:
+                    break;
+                case CANCELED:
+                case CANCELING:
+                    processPack.getWorkflowNode().setState(NodeState.FAILED);
+                    break;
+                default:
+                    break;
+            }
+            if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+                try {
+                    updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState);
+                } catch (RegistryException e) {
+                    // TODO: handle this.
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Remove the workflow node from waiting queue and add it to the ready queue.
+     * @param workflowNode - Workflow Node
+     */
+    private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+        waitingList.remove(workflowNode.getId());
+        readList.put(workflowNode.getId(), workflowNode);
+    }
+
+    private void addToWaitingQueue(WorkflowNode workflowNode) {
+        waitingList.put(workflowNode.getId(), workflowNode);
+    }
+
+    /**
+     * First remove the node from ready list and then add the WfNodeContainer to the process queue.
+     * Note that underline data structure of the process queue is a Map.
+     * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails
+     */
+    private synchronized void addToProcessingQueue(ProcessPack processPack) {
+        readList.remove(processPack.getWorkflowNode().getId());
+        processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack);
+    }
+
+    private synchronized void addToCompleteQueue(ProcessPack processPack) {
+        processingQueue.remove(processPack.getTaskDetails().getTaskID());
+        completeList.put(processPack.getTaskDetails().getTaskID(), processPack);
+    }
+
+
+    private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
+        completeWorkflowOutputs.add(wfOutputNode);
+        readList.remove(wfOutputNode.getId());
+    }
+
+    @Override
+    public void run() {
+        // TODO: Auto generated method body.
+        try {
+            log.debug("Launching workflow");
+            launchWorkflow();
+            while (!(waitingList.isEmpty() && readList.isEmpty())) {
+                processReadyList();
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void setExperiment(String experimentId) throws RegistryException {
+        experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId);
+        log.debug("Retrieve Experiment for experiment id : " + experimentId);
+    }
+
+
+    class TempPublisher implements Runnable {
+        private TaskDetails tempTaskDetails;
+        private EventBus tempEventBus;
+
+        public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) {
+            this.tempTaskDetails = tempTaskDetails;
+            this.tempEventBus = tempEventBus;
+        }
+
+        @Override
+        public void run() {
+            try {
+                TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null);
+                TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+
+                List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs();
+                List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs();
+                log.info("**************   Task output change event fired for application id :" + tempTaskDetails.getApplicationId());
+                if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) {
+                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) +
+                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                } else if (tempTaskDetails.getApplicationId().equals("Subtract")) {
+                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) -
+                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                } else if (tempTaskDetails.getApplicationId().equals("Multiply")) {
+                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) *
+                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                }
+                TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier);
+                eventBus.post(taskOutputChangeEvent);
+
+            } catch (InterruptedException e) {
+                log.error("Thread was interrupted while sleeping");
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java
new file mode 100644
index 0000000..3de90f2
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+/**
+ * All classes implement this WorkflowFactory interface, should be abstract or singleton.
+ */
+public interface WorkflowFactory {
+
+    public WorkflowParser getWorkflowParser(String experimentId, String credentialToken);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
new file mode 100644
index 0000000..116a10d
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
+
+/**
+ * Singleton class, only one instance can exist in runtime.
+ */
+public class WorkflowFactoryImpl implements WorkflowFactory {
+
+    private static WorkflowFactoryImpl workflowFactoryImpl;
+
+    private WorkflowParser workflowParser;
+
+    private static final String synch = "sync";
+
+    private WorkflowFactoryImpl(){
+
+    }
+
+    public static WorkflowFactoryImpl getInstance() {
+        if (workflowFactoryImpl == null) {
+            synchronized (synch) {
+                if (workflowFactoryImpl == null) {
+                    workflowFactoryImpl = new WorkflowFactoryImpl();
+                }
+            }
+        }
+        return workflowFactoryImpl;
+    }
+
+
+    @Override
+    public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) {
+        if (workflowParser == null) {
+            try {
+                workflowParser = new AiravataDefaultParser(experimentId, credentialToken);
+            } catch (RegistryException e) {
+                // TODO : handle this scenario
+            }
+        }
+        return workflowParser;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java
new file mode 100644
index 0000000..6c4d6f2
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowParser.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+
+import java.util.List;
+
+public interface WorkflowParser {
+
+    public List<WorkflowInputNode> parse() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
new file mode 100644
index 0000000..a2b69ae
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowUtil.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
+
+public class WorkflowUtil {
+
+    public static InputDataObjectType copyValues(InputDataObjectType fromInputObj, InputDataObjectType toInputObj){
+        if (toInputObj == null) {
+            // TODO : throw an error
+        }
+        toInputObj.setValue(fromInputObj.getValue());
+        if (fromInputObj.getApplicationArgument() != null
+                && !fromInputObj.getApplicationArgument().trim().equals("")) {
+            toInputObj.setApplicationArgument(fromInputObj.getApplicationArgument());
+        }
+        if (toInputObj.getType() == null) {
+            toInputObj.setType(fromInputObj.getType());
+        }
+        return fromInputObj;
+    }
+
+    public static InputDataObjectType copyValues(OutputDataObjectType outputData, InputDataObjectType inputData) {
+        inputData.setValue(outputData.getValue());
+        return inputData;
+    }
+
+
+    public static OutputDataObjectType copyValues(InputDataObjectType inputObject, OutputDataObjectType outputObject) {
+        if (outputObject == null) {
+            outputObject = new OutputDataObjectType();
+        }
+        outputObject.setValue(inputObject.getValue());
+        return outputObject;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
new file mode 100644
index 0000000..3bc380d
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/DirectedEdge.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.edge;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+
+
+public class DirectedEdge implements Edge {
+
+    private InPort inPort;
+    private OutPort outPort;
+
+    @Override
+    public InPort getToPort() {
+        return inPort;
+    }
+
+    @Override
+    public void setToPort(InPort inPort) {
+        this.inPort = inPort;
+    }
+
+    @Override
+    public OutPort getFromPort() {
+        return outPort;
+    }
+
+    @Override
+    public void setFromPort(OutPort outPort) {
+        this.outPort = outPort;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java
new file mode 100644
index 0000000..e8bce2e
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/edge/Edge.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.edge;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+/**
+ * Edge is a link to one node to another, basically edge should have outPort of a workflow node ,
+ * which is starting point and inPort of a workflow node, which is end point of the edge.
+ */
+
+public interface Edge {
+
+    public InPort getToPort();
+
+    public void setToPort(InPort inPort);
+
+    public OutPort getFromPort();
+
+    public void setFromPort(OutPort outPort);
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java
new file mode 100644
index 0000000..37efded
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNode.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+import java.util.List;
+
+public interface ApplicationNode extends WorkflowNode {
+
+    public String getApplicationId();
+
+    public void addInPort(InPort inPort);
+
+    public List<InPort> getInputPorts();
+
+    public void addOutPort(OutPort outPort);
+
+    public List<OutPort> getOutputPorts();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
new file mode 100644
index 0000000..52b0595
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/ApplicationNodeImpl.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ApplicationNodeImpl implements ApplicationNode {
+
+    private final String nodeId;
+    private NodeState myState = NodeState.WAITING;
+    private String applicationId;
+    private List<InPort> inPorts = new ArrayList<InPort>();
+    private List<OutPort> outPorts = new ArrayList<OutPort>();
+    private String applicationName;
+
+//    public ApplicationNodeImpl(String nodeId) {
+//        this(nodeId, null);
+//    }
+//
+//    public ApplicationNodeImpl(String nodeId, String applicationId) {
+//        this(nodeId, null, applicationId);
+//    }
+
+    public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) {
+        this.nodeId = nodeId;
+        this.applicationName = applicationName;
+        this.applicationId = applicationId;
+    }
+
+    @Override
+    public String getId() {
+        return this.nodeId;
+    }
+
+    @Override
+    public String getName() {
+        return applicationName;
+    }
+
+    @Override
+    public NodeType getType() {
+        return NodeType.APPLICATION;
+    }
+
+    @Override
+    public NodeState getState() {
+        return myState;
+    }
+
+    @Override
+    public void setState(NodeState newState) {
+        // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
+        myState = newState;
+    }
+
+    @Override
+    public boolean isReady() {
+        for (InPort inPort : getInputPorts()) {
+            if (!inPort.isReady()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String getApplicationId() {
+        return this.applicationId;
+    }
+
+    @Override
+    public void addInPort(InPort inPort) {
+        this.inPorts.add(inPort);
+    }
+
+    @Override
+    public List<InPort> getInputPorts() {
+        return this.inPorts;
+    }
+
+    @Override
+    public void addOutPort(OutPort outPort) {
+        this.outPorts.add(outPort);
+    }
+
+    @Override
+    public List<OutPort> getOutputPorts() {
+        return this.outPorts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
new file mode 100644
index 0000000..333fcb2
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeState.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+public enum NodeState {
+    WAITING, // waiting on inputs
+    READY, // all inputs are available and ready to execute
+    QUEUED, //
+    PRE_PROCESSING, //
+    EXECUTING, // task has been submitted , not yet finish
+    EXECUTED, // task executed
+    POST_PROCESSING, //
+    FAILED,
+    COMPLETE // all works done
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java
new file mode 100644
index 0000000..95710fb
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/NodeType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+public enum NodeType {
+    APPLICATION,
+    WORKFLOW_INPUT,
+    WORKFLOW_OUTPUT
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java
new file mode 100644
index 0000000..9ac800a
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+public interface WorkflowInputNode extends WorkflowNode {
+
+    public InputDataObjectType getInputObject();
+
+    public void setInputObject(InputDataObjectType inputObject);
+
+    public OutPort getOutPort();
+
+    public void setOutPort(OutPort outPort);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
new file mode 100644
index 0000000..b3dfa62
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+public class WorkflowInputNodeImpl implements WorkflowInputNode {
+
+    private NodeState myState = NodeState.READY;
+    private final String nodeId;
+    private String nodeName;
+    private OutPort outPort;
+    private InputDataObjectType inputDataObjectType;
+    private String name;
+
+    public WorkflowInputNodeImpl(String nodeId) {
+        this(nodeId, null);
+    }
+
+    public WorkflowInputNodeImpl(String nodeId, String nodeName) {
+        this.nodeId = nodeId;
+        this.nodeName = nodeName;
+    }
+
+    @Override
+    public String getId() {
+        return this.nodeId;
+    }
+
+    @Override
+    public String getName() {
+        return this.nodeName;
+    }
+
+    @Override
+    public NodeType getType() {
+        return NodeType.WORKFLOW_INPUT;
+    }
+
+    @Override
+    public NodeState getState() {
+        return myState;
+    }
+
+    @Override
+    public void setState(NodeState newState) {
+        // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
+        myState = newState;
+    }
+
+    @Override
+    public boolean isReady() {
+        return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+                || !inputDataObjectType.isIsRequired();
+    }
+
+    @Override
+    public InputDataObjectType getInputObject() {
+        return this.inputDataObjectType;
+    }
+
+    @Override
+    public void setInputObject(InputDataObjectType inputObject) {
+        this.inputDataObjectType = inputObject;
+    }
+
+    @Override
+    public OutPort getOutPort() {
+        return this.outPort;
+    }
+
+    @Override
+    public void setOutPort(OutPort outPort) {
+        this.outPort = outPort;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
new file mode 100644
index 0000000..efcf9c7
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowNode.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+public interface WorkflowNode {
+
+    public String getId();
+
+    public String getName();
+
+    public NodeType getType();
+
+    public NodeState getState();
+
+    public void setState(NodeState newState);
+
+    public boolean isReady();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java
new file mode 100644
index 0000000..14e4519
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+
+public interface WorkflowOutputNode extends WorkflowNode {
+
+    public OutputDataObjectType getOutputObject();
+
+    public void setOutputObject(OutputDataObjectType outputObject);
+
+    public InPort getInPort();
+
+    public void setInPort(InPort inPort);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
new file mode 100644
index 0000000..5924212
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/nodes/WorkflowOutputNodeImpl.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+
+public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
+
+    private NodeState myState = NodeState.WAITING;
+    private final String nodeId;
+    private String nodeName;
+    private OutputDataObjectType outputDataObjectType;
+    private InPort inPort;
+
+    public WorkflowOutputNodeImpl(String nodeId) {
+        this(nodeId, null);
+    }
+
+    public WorkflowOutputNodeImpl(String nodeId, String nodeName) {
+        this.nodeId = nodeId;
+        this.nodeName = nodeName;
+    }
+
+    @Override
+    public String getId() {
+        return this.nodeId;
+    }
+
+    @Override
+    public String getName() {
+        return this.nodeName;
+    }
+
+    @Override
+    public NodeType getType() {
+        return NodeType.WORKFLOW_OUTPUT;
+    }
+
+    @Override
+    public NodeState getState() {
+        return myState;
+    }
+
+    @Override
+    public void setState(NodeState newState) {
+        // TODO: node state can't be reversed , correct order WAITING --> READY --> EXECUTING --> EXECUTED --> COMPLETE
+        myState = newState;
+    }
+
+    @Override
+    public boolean isReady() {
+        return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null
+                || inPort.getInputObject().getValue().equals(""));
+    }
+
+    @Override
+    public OutputDataObjectType getOutputObject() {
+        return this.outputDataObjectType;
+    }
+
+    @Override
+    public void setOutputObject(OutputDataObjectType outputObject) {
+        this.outputDataObjectType = outputObject;
+    }
+
+    @Override
+    public InPort getInPort() {
+        return this.inPort;
+    }
+
+    @Override
+    public void setInPort(InPort inPort) {
+        this.inPort = inPort;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java
new file mode 100644
index 0000000..bb4a112
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InPort.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+
+public interface InPort extends Port {
+
+    public void setInputObject(InputDataObjectType inputObject);
+
+    public InputDataObjectType getInputObject();
+
+    public Edge getEdge();
+
+    public void addEdge(Edge edge);
+
+    public String getDefaultValue();
+
+    public void setDefaultValue(String defaultValue);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java
new file mode 100644
index 0000000..c78dc86
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/InputPortIml.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class InputPortIml implements InPort {
+
+    private InputDataObjectType inputDataObjectType;
+    private boolean ready = false;
+    private String portId;
+    private Edge edge;
+    private WorkflowNode node;
+    private String defaultValue;
+
+    public InputPortIml(String portId) {
+        this.portId = portId;
+    }
+
+    @Override
+    public void setInputObject(InputDataObjectType inputObject) {
+        this.inputDataObjectType = inputObject;
+        ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+                || !inputDataObjectType.isIsRequired();
+    }
+
+    @Override
+    public InputDataObjectType getInputObject() {
+        return this.inputDataObjectType;
+    }
+
+    @Override
+    public Edge getEdge() {
+        return this.edge;
+    }
+
+    @Override
+    public void addEdge(Edge edge) {
+        this.edge = edge;
+    }
+
+    @Override
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(String defaultValue) {
+        this.defaultValue = defaultValue;
+    }
+
+    @Override
+    public boolean isReady() {
+        return getInputObject() != null && inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("");
+    }
+
+    @Override
+    public WorkflowNode getNode() {
+        return this.node;
+    }
+
+    @Override
+    public void setNode(WorkflowNode workflowNode) {
+        this.node = workflowNode;
+    }
+
+    @Override
+    public String getId() {
+        return this.portId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java
new file mode 100644
index 0000000..0332f81
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPort.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+
+import java.util.List;
+
+public interface OutPort extends Port {
+
+    public void setOutputObject(OutputDataObjectType outputObject);
+
+    public OutputDataObjectType getOutputObject();
+
+    public List<Edge> getOutEdges();
+
+    public void addEdge(Edge edge);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java
new file mode 100644
index 0000000..4e26cb3
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/OutPortImpl.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OutPortImpl implements OutPort {
+
+    private OutputDataObjectType outputDataObjectType;
+    private List<Edge> outEdges = new ArrayList<Edge>();
+    private boolean isSatisfy = false;
+    private String portId;
+    private WorkflowNode node;
+
+    public OutPortImpl(String portId) {
+        this.portId = portId;
+    }
+
+    @Override
+    public void setOutputObject(OutputDataObjectType outputObject) {
+        this.outputDataObjectType = outputObject;
+    }
+
+    @Override
+    public OutputDataObjectType getOutputObject() {
+        return this.outputDataObjectType;
+    }
+
+    @Override
+    public List<Edge> getOutEdges() {
+        return this.outEdges;
+    }
+
+    @Override
+    public void addEdge(Edge edge) {
+        this.outEdges.add(edge);
+    }
+
+    @Override
+    public boolean isReady() {
+        return this.outputDataObjectType.getValue() != null
+                && !this.outputDataObjectType.getValue().equals("");
+    }
+
+    @Override
+    public WorkflowNode getNode() {
+        return this.node;
+    }
+
+    @Override
+    public void setNode(WorkflowNode workflowNode) {
+        this.node = workflowNode;
+    }
+
+    @Override
+    public String getId() {
+        return portId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6bfb9563/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java
new file mode 100644
index 0000000..2b27ea0
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/dag/port/Port.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.dag.port;
+
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public interface Port {
+
+    public boolean isReady();
+
+    public WorkflowNode getNode();
+
+    public void setNode(WorkflowNode workflowNode);
+
+    public String getId();
+    
+}