You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/25 21:24:42 UTC

[5/8] airavata git commit: Renamed simple-workflow module to workflow and created a new workflow-core module which will keep all the core code

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/pom.xml
----------------------------------------------------------------------
diff --git a/modules/workflow/pom.xml b/modules/workflow/pom.xml
new file mode 100644
index 0000000..4fb8e09
--- /dev/null
+++ b/modules/workflow/pom.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>airavata</artifactId>
+        <version>0.15-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    
+    <artifactId>workflow</artifactId>
+    <packaging>pom</packaging>
+    <version>0.15-SNAPSHOT</version>
+    <name>Airavata Workflow</name>
+    <modules>
+        <module>workflow-core</module>
+    </modules>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/pom.xml b/modules/workflow/workflow-core/pom.xml
new file mode 100644
index 0000000..1cb8e0d
--- /dev/null
+++ b/modules/workflow/workflow-core/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>workflow</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.15-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>workflow-core</artifactId>
+    <name>Airavata Workflow Core</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-data-models</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-cpi</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-model-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-jpa-registry</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Airavata default parser dependency -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-workflow-model-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>app-catalog-data</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>app-catalog-cpi</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- Messaging dependency -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+        </dependency>
+
+        <!--test-->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java
new file mode 100644
index 0000000..e0c2651
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/ProcessContext.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+public class ProcessContext {
+    private WorkflowNode workflowNode;
+    private WorkflowNodeDetails wfNodeDetails;
+    private TaskDetails taskDetails;
+
+    public ProcessContext(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
+        this.workflowNode = workflowNode;
+        this.wfNodeDetails = wfNodeDetails;
+        this.taskDetails = taskDetails;
+    }
+
+    public WorkflowNode getWorkflowNode() {
+        return workflowNode;
+    }
+
+    public void setWorkflowNode(WorkflowNode workflowNode) {
+        this.workflowNode = workflowNode;
+    }
+
+    public WorkflowNodeDetails getWfNodeDetails() {
+        return wfNodeDetails;
+    }
+
+    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+        this.wfNodeDetails = wfNodeDetails;
+    }
+
+    public TaskDetails getTaskDetails() {
+        return taskDetails;
+    }
+
+    public void setTaskDetails(TaskDetails taskDetails) {
+        this.taskDetails = taskDetails;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
new file mode 100644
index 0000000..a674aad
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/SimpleWorkflowInterpreter.java
@@ -0,0 +1,400 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
+import org.apache.airavata.workflow.core.dag.nodes.NodeState;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Package-Private class
+ */
+class SimpleWorkflowInterpreter{
+
+    private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
+    private List<WorkflowInputNode> workflowInputNodes;
+
+    private Experiment experiment;
+
+    private String credentialToken;
+
+    private String gatewayName;
+
+    private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
+    private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
+    private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>();
+    private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>();
+    private Registry registry;
+    private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
+    private RabbitMQProcessPublisher publisher;
+    private RabbitMQStatusConsumer statusConsumer;
+    private String consumerId;
+    private boolean continueWorkflow = true;
+
+    public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException {
+        this.gatewayName = gatewayName;
+        setExperiment(experimentId);
+        this.credentialToken = credentialToken;
+        this.publisher = publisher;
+    }
+
+    public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) {
+        this.gatewayName = gatewayName;
+        this.experiment = experiment;
+        this.credentialToken = credentialStoreToken;
+        this.publisher = publisher;
+    }
+
+    /**
+     * Package-Private method.
+     * @throws Exception
+     */
+    void launchWorkflow() throws Exception {
+        WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
+        WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+        log.debug("Initialized workflow parser");
+        setWorkflowInputNodes(workflowParser.parse());
+        log.debug("Parsed the workflow and got the workflow input nodes");
+        // process workflow input nodes
+        processWorkflowInputNodes(getWorkflowInputNodes());
+        if (readyList.isEmpty()) {
+            StringBuilder sb = new StringBuilder();
+            for (WorkflowInputNode workflowInputNode : workflowInputNodes) {
+                sb.append(", ");
+                sb.append(workflowInputNode.getInputObject().getName());
+                sb.append("=");
+                sb.append(workflowInputNode.getInputObject().getValue());
+            }
+            throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString());
+        }
+        processReadyList();
+    }
+
+    // try to remove synchronization tag
+    /**
+     * Package-Private method.
+     * @throws RegistryException
+     * @throws AiravataException
+     */
+    void processReadyList() throws RegistryException, AiravataException {
+        if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) {
+            throw new AiravataException("No workflow application node is in ready state to run");
+        }
+        for (WorkflowNode readyNode : readyList.values()) {
+            if (readyNode instanceof WorkflowOutputNode) {
+                WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
+                wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
+                addToCompleteOutputNodeList(wfOutputNode);
+                continue;
+            }
+            WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
+            TaskDetails process = getProcess(workflowNodeDetails);
+            ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
+            addToProcessingQueue(processContext);
+            publishToProcessQueue(process);
+        }
+    }
+
+
+    private void publishToProcessQueue(TaskDetails process) throws AiravataException {
+        ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+        processSubmitEvent.setCredentialToken(credentialToken);
+        processSubmitEvent.setTaskId(process.getTaskID());
+        MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
+        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        publisher.publish(messageContext);
+    }
+
+    private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
+        // create workflow taskDetails from workflowNodeDetails
+        TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
+        taskDetails.setTaskID(getRegistry()
+                .add(ChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString());
+        return taskDetails;
+    }
+
+    private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
+        WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
+        ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
+        String executionData = null;
+        if (readyNode instanceof ApplicationNode) {
+            executionUnit = ExecutionUnit.APPLICATION;
+            executionData = ((ApplicationNode) readyNode).getApplicationId();
+            setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails);
+        } else if (readyNode instanceof WorkflowInputNode) {
+            executionUnit = ExecutionUnit.INPUT;
+        } else if (readyNode instanceof WorkflowOutputNode) {
+            executionUnit = ExecutionUnit.OUTPUT;
+        }
+        wfNodeDetails.setExecutionUnit(executionUnit);
+        wfNodeDetails.setExecutionUnitData(executionData);
+        wfNodeDetails.setNodeInstanceId((String) getRegistry()
+                .add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
+        return wfNodeDetails;
+    }
+
+    private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) {
+        if (readyAppNode.isReady()) {
+            for (InPort inPort : readyAppNode.getInputPorts()) {
+                wfNodeDetails.addToNodeInputs(inPort.getInputObject());
+            }
+        } else {
+            throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " +
+                    "workflow node details, nodeId = " + readyAppNode.getId());
+        }
+    }
+
+
+    private void processWorkflowInputNodes(List<WorkflowInputNode> wfInputNodes) {
+        Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
+        for (WorkflowInputNode wfInputNode : wfInputNodes) {
+            if (wfInputNode.isReady()) {
+                log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
+                for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
+                    edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
+                    if (edge.getToPort().getNode().isReady()) {
+                        addToReadyQueue(edge.getToPort().getNode());
+                        log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
+                    } else {
+                        addToWaitingQueue(edge.getToPort().getNode());
+                        log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
+
+                    }
+                }
+            }
+        }
+    }
+
+
+    public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception {
+        return workflowInputNodes;
+    }
+
+    public void setWorkflowInputNodes(List<WorkflowInputNode> workflowInputNodes) {
+        this.workflowInputNodes = workflowInputNodes;
+    }
+
+    private Registry getRegistry() throws RegistryException {
+        if (registry==null){
+            registry = RegistryFactory.getDefaultRegistry();
+        }
+        return registry;
+    }
+
+    public Experiment getExperiment() {
+        return experiment;
+    }
+
+    private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{
+        WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state);
+        wfNodeDetails.setWorkflowNodeStatus(status);
+        getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
+    }
+
+    /**
+     * Package-Private method.
+     * Remove the workflow node from waiting queue and add it to the ready queue.
+     * @param workflowNode - Workflow Node
+     */
+    synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+        waitingList.remove(workflowNode.getId());
+        readyList.put(workflowNode.getId(), workflowNode);
+    }
+
+    private void addToWaitingQueue(WorkflowNode workflowNode) {
+        waitingList.put(workflowNode.getId(), workflowNode);
+    }
+
+    /**
+     * First remove the node from ready list and then add the WfNodeContainer to the process queue.
+     * Note that underline data structure of the process queue is a Map.
+     * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails
+     */
+    private synchronized void addToProcessingQueue(ProcessContext processContext) {
+        readyList.remove(processContext.getWorkflowNode().getId());
+        processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext);
+    }
+
+    private synchronized void addToCompleteQueue(ProcessContext processContext) {
+        processingQueue.remove(processContext.getTaskDetails().getTaskID());
+        completeList.put(processContext.getTaskDetails().getTaskID(), processContext);
+    }
+
+
+    private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
+        completeWorkflowOutputs.add(wfOutputNode);
+        readyList.remove(wfOutputNode.getId());
+    }
+
+    boolean isAllDone() {
+        return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty());
+    }
+
+    private void setExperiment(String experimentId) throws RegistryException {
+        experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId);
+        log.debug("Retrieve Experiment for experiment id : " + experimentId);
+    }
+
+    synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+
+        String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
+        log.debug("Task Output changed event received for workflow node : " +
+                taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+        ProcessContext processContext = processingQueue.get(taskId);
+        Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+        if (processContext != null) {
+            WorkflowNode workflowNode = processContext.getWorkflowNode();
+            if (workflowNode instanceof ApplicationNode) {
+                ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+                // Workflow node can have one to many output ports and each output port can have one to many links
+                for (OutPort outPort : applicationNode.getOutputPorts()) {
+                    for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
+                        if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+                            outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+                            break;
+                        }
+                    }
+                    for (Edge edge : outPort.getOutEdges()) {
+                        edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
+                        if (edge.getToPort().getNode().isReady()) {
+                            addToReadyQueue(edge.getToPort().getNode());
+                        }
+                    }
+                }
+            }
+            addToCompleteQueue(processContext);
+            log.debug("removed task from processing queue : " + taskId);
+            try {
+                processReadyList();
+            } catch (Exception e) {
+                log.error("Error while processing ready workflow nodes", e);
+                continueWorkflow = false;
+            }
+        }
+    }
+
+    void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
+        TaskState taskState = taskStatusChangeEvent.getState();
+        TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
+        String taskId = taskIdentity.getTaskId();
+        ProcessContext processContext = processingQueue.get(taskId);
+        if (processContext != null) {
+            WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
+            switch (taskState) {
+                case WAITING:
+                    break;
+                case STARTED:
+                    break;
+                case PRE_PROCESSING:
+                    wfNodeState = WorkflowNodeState.INVOKED;
+                    processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                    break;
+                case INPUT_DATA_STAGING:
+                    wfNodeState = WorkflowNodeState.INVOKED;
+                    processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                    break;
+                case EXECUTING:
+                    wfNodeState = WorkflowNodeState.EXECUTING;
+                    processContext.getWorkflowNode().setState(NodeState.EXECUTING);
+                    break;
+                case OUTPUT_DATA_STAGING:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                    break;
+                case POST_PROCESSING:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                    break;
+                case COMPLETED:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    processContext.getWorkflowNode().setState(NodeState.EXECUTED);
+                    break;
+                case FAILED:
+                    wfNodeState = WorkflowNodeState.FAILED;
+                    processContext.getWorkflowNode().setState(NodeState.FAILED);
+                    break;
+                case UNKNOWN:
+                    wfNodeState = WorkflowNodeState.UNKNOWN;
+                    break;
+                case CONFIGURING_WORKSPACE:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    break;
+                case CANCELED:
+                case CANCELING:
+                    wfNodeState = WorkflowNodeState.CANCELED;
+                    processContext.getWorkflowNode().setState(NodeState.FAILED);
+                    break;
+                default:
+                    break;
+            }
+            if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+                try {
+                    updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
+                } catch (RegistryException e) {
+                    log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
+                            + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
+                            + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
new file mode 100644
index 0000000..7795296
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class WorkflowEnactmentService {
+
+    private static WorkflowEnactmentService workflowEnactmentService;
+    private final RabbitMQStatusConsumer statusConsumer;
+    private String consumerId;
+    private ExecutorService executor;
+    private Map<String,SimpleWorkflowInterpreter> workflowMap;
+
+    private WorkflowEnactmentService () throws AiravataException {
+        executor = Executors.newFixedThreadPool(getThreadPoolSize());
+        workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>();
+        statusConsumer = new RabbitMQStatusConsumer();
+        consumerId = statusConsumer.listen(new TaskMessageHandler());
+        // register the shutdown hook to un-bind status consumer.
+        Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
+    }
+
+    public static WorkflowEnactmentService getInstance() throws AiravataException {
+        if (workflowEnactmentService == null) {
+            synchronized (WorkflowEnactmentService.class) {
+                if (workflowEnactmentService == null) {
+                    workflowEnactmentService = new WorkflowEnactmentService();
+                }
+            }
+        }
+        return workflowEnactmentService;
+    }
+
+    public void submitWorkflow(String experimentId,
+                                  String credentialToken,
+                                  String gatewayName,
+                                  RabbitMQProcessPublisher publisher) throws Exception {
+
+        SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(
+                experimentId, credentialToken,gatewayName, publisher);
+        workflowMap.put(experimentId, simpleWorkflowInterpreter);
+        simpleWorkflowInterpreter.launchWorkflow();
+
+    }
+
+    private int getThreadPoolSize() {
+        return ServerSettings.getEnactmentThreadPoolSize();
+    }
+
+    private class TaskMessageHandler implements MessageHandler {
+
+        @Override
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            String gatewayId = "*";
+            String experimentId = "*";
+            List<String> routingKeys = new ArrayList<String>();
+            routingKeys.add(gatewayId);
+            routingKeys.add(gatewayId + "." + experimentId);
+            routingKeys.add(gatewayId + "." + experimentId+ ".*");
+            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+            return props;
+        }
+
+        @Override
+        public void onMessage(MessageContext msgCtx) {
+            StatusHandler statusHandler = new StatusHandler(msgCtx);
+            executor.execute(statusHandler);
+        }
+
+
+    }
+
+    private class StatusHandler implements Runnable{
+        private final Logger log = LoggerFactory.getLogger(StatusHandler.class);
+
+        private final MessageContext msgCtx;
+
+        public StatusHandler(MessageContext msgCtx) {
+            this.msgCtx = msgCtx;
+        }
+
+        @Override
+        public void run() {
+            process();
+        }
+
+        private void process() {
+            String message;
+            SimpleWorkflowInterpreter simpleWorkflowInterpreter;
+            if (msgCtx.getType() == MessageType.TASK) {
+                TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+                TaskIdentifier taskIdentifier = event.getTaskIdentity();
+                simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+                if (simpleWorkflowInterpreter != null) {
+                    simpleWorkflowInterpreter.handleTaskStatusChangeEvent(event);
+                } else {
+                    // this happens when Task status messages comes after the Taskoutput messages,as we have worked on
+                    // output changes it is ok to ignore this.
+                }
+                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+                log.debug(message);
+            }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+                TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+                TaskIdentifier taskIdentifier = event.getTaskIdentity();
+                simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+                if (simpleWorkflowInterpreter != null) {
+                    simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
+                    if (simpleWorkflowInterpreter.isAllDone()) {
+                        workflowMap.remove(taskIdentifier.getExperimentId());
+                    }
+                } else {
+                    throw new IllegalArgumentException("Error while processing TaskOutputChangeEvent, " +
+                            "There is no registered workflow for experiment Id : " + taskIdentifier.getExperimentId());
+                }
+                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+                log.debug(message);
+            } else {
+                // not interested, ignores
+            }
+        }
+
+        private SimpleWorkflowInterpreter getInterpreter(String experimentId){
+            return workflowMap.get(experimentId);
+        }
+    }
+
+
+    private class EnactmentShutDownHook extends Thread {
+        private final Logger log = LoggerFactory.getLogger(EnactmentShutDownHook.class);
+        @Override
+        public void run() {
+            super.run();
+            try {
+                statusConsumer.stopListen(consumerId);
+                log.info("Successfully un-binded task status consumer");
+            } catch (AiravataException e) {
+                log.error("Error while un-bind enactment status consumer", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
new file mode 100644
index 0000000..ee89dd9
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+/**
+ * All classes implement this WorkflowFactory interface, should be abstract or singleton.
+ */
+public interface WorkflowFactory {
+
+    public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java
new file mode 100644
index 0000000..1df2084
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowFactoryImpl.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.workflow.core.parser.AiravataWorkflowParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+/**
+ * Singleton class, only one instance can exist in runtime.
+ */
+public class WorkflowFactoryImpl implements WorkflowFactory {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkflowFactoryImpl.class);
+
+    private static WorkflowFactoryImpl workflowFactoryImpl;
+
+    private WorkflowFactoryImpl(){
+
+    }
+
+    public static WorkflowFactoryImpl getInstance() {
+        if (workflowFactoryImpl == null) {
+            synchronized (WorkflowFactory.class) {
+                if (workflowFactoryImpl == null) {
+                    workflowFactoryImpl = new WorkflowFactoryImpl();
+                }
+            }
+        }
+        return workflowFactoryImpl;
+    }
+
+
+    @Override
+    public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) throws Exception {
+        WorkflowParser workflowParser = null;
+        try {
+            String wfParserClassName = ServerSettings.getWorkflowParser();
+            Class<?> aClass = Class.forName(wfParserClassName);
+            Constructor<?> constructor = aClass.getConstructor(String.class, String.class);
+            workflowParser = (WorkflowParser) constructor.newInstance(experimentId, credentialToken);
+        } catch (ApplicationSettingsException e) {
+            log.info("A custom workflow parser is not defined, Use default Airavata workflow parser");
+        }
+        if (workflowParser == null) {
+            workflowParser = new AiravataWorkflowParser(experimentId, credentialToken);
+        }
+        return workflowParser;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
new file mode 100644
index 0000000..8d284dd
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowParser.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core;
+
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode;
+
+import java.util.List;
+
+public interface WorkflowParser {
+
+    public List<WorkflowInputNode> parse() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
new file mode 100644
index 0000000..91118cc
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/DirectedEdge.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.edge;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+
+public class DirectedEdge implements Edge {
+
+    private InPort inPort;
+    private OutPort outPort;
+
+    @Override
+    public InPort getToPort() {
+        return inPort;
+    }
+
+    @Override
+    public void setToPort(InPort inPort) {
+        this.inPort = inPort;
+    }
+
+    @Override
+    public OutPort getFromPort() {
+        return outPort;
+    }
+
+    @Override
+    public void setFromPort(OutPort outPort) {
+        this.outPort = outPort;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
new file mode 100644
index 0000000..ee11371
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/edge/Edge.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.edge;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+/**
+ * Edge is a link to one node to another, basically edge should have outPort of a workflow node ,
+ * which is starting point and inPort of a workflow node, which is end point of the edge.
+ */
+
+public interface Edge {
+
+    public InPort getToPort();
+
+    public void setToPort(InPort inPort);
+
+    public OutPort getFromPort();
+
+    public void setFromPort(OutPort outPort);
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java
new file mode 100644
index 0000000..d775bf4
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNode.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+import java.util.List;
+
+public interface ApplicationNode extends WorkflowNode {
+
+    public String getApplicationId();
+
+    public void addInPort(InPort inPort);
+
+    public List<InPort> getInputPorts();
+
+    public void addOutPort(OutPort outPort);
+
+    public List<OutPort> getOutputPorts();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java
new file mode 100644
index 0000000..ad7bd63
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/ApplicationNodeImpl.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.workflow.core.dag.port.InPort;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ApplicationNodeImpl implements ApplicationNode {
+
+    private final String nodeId;
+    private NodeState myState = NodeState.WAITING;
+    private String applicationId;
+    private List<InPort> inPorts = new ArrayList<InPort>();
+    private List<OutPort> outPorts = new ArrayList<OutPort>();
+    private String applicationName;
+
+//    public ApplicationNodeImpl(String nodeId) {
+//        this(nodeId, null);
+//    }
+//
+//    public ApplicationNodeImpl(String nodeId, String applicationId) {
+//        this(nodeId, null, applicationId);
+//    }
+
+    public ApplicationNodeImpl(String nodeId, String applicationName, String applicationId) {
+        this.nodeId = nodeId;
+        this.applicationName = applicationName;
+        this.applicationId = applicationId;
+    }
+
+    @Override
+    public String getId() {
+        return this.nodeId;
+    }
+
+    @Override
+    public String getName() {
+        return applicationName;
+    }
+
+    @Override
+    public NodeType getType() {
+        return NodeType.APPLICATION;
+    }
+
+    @Override
+    public NodeState getState() {
+        return myState;
+    }
+
+    @Override
+    public void setState(NodeState newState) {
+        if (newState.getLevel() > myState.getLevel()) {
+            myState = newState;
+        } else {
+            throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+        }
+    }
+
+    @Override
+    public boolean isReady() {
+        for (InPort inPort : getInputPorts()) {
+            if (!inPort.isReady()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String getApplicationId() {
+        return this.applicationId;
+    }
+
+    @Override
+    public void addInPort(InPort inPort) {
+        this.inPorts.add(inPort);
+    }
+
+    @Override
+    public List<InPort> getInputPorts() {
+        return this.inPorts;
+    }
+
+    @Override
+    public void addOutPort(OutPort outPort) {
+        this.outPorts.add(outPort);
+    }
+
+    @Override
+    public List<OutPort> getOutputPorts() {
+        return this.outPorts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java
new file mode 100644
index 0000000..df2c87a
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeState.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+public enum NodeState {
+    WAITING(0), // waiting on inputs
+    READY(1), // all inputs are available and ready to execute
+    QUEUED(2), //
+    PRE_PROCESSING(3), //
+    EXECUTING(4), // task has been submitted , not yet finish
+    EXECUTED(5), // task executed
+    POST_PROCESSING(6), //
+    FAILED(7),
+    COMPLETE(8); // all works done
+
+    private int level;
+
+    NodeState(int level) {
+        this.level = level;
+    }
+
+    public int getLevel() {
+        return level;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java
new file mode 100644
index 0000000..04e4c07
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/NodeType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+public enum NodeType {
+    APPLICATION,
+    WORKFLOW_INPUT,
+    WORKFLOW_OUTPUT
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java
new file mode 100644
index 0000000..83bcacf
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+public interface WorkflowInputNode extends WorkflowNode {
+
+    public InputDataObjectType getInputObject();
+
+    public void setInputObject(InputDataObjectType inputObject);
+
+    public OutPort getOutPort();
+
+    public void setOutPort(OutPort outPort);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java
new file mode 100644
index 0000000..2364b03
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowInputNodeImpl.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.OutPort;
+
+public class WorkflowInputNodeImpl implements WorkflowInputNode {
+
+    private NodeState myState = NodeState.READY;
+    private final String nodeId;
+    private String nodeName;
+    private OutPort outPort;
+    private InputDataObjectType inputDataObjectType;
+    private String name;
+
+    public WorkflowInputNodeImpl(String nodeId) {
+        this(nodeId, null);
+    }
+
+    public WorkflowInputNodeImpl(String nodeId, String nodeName) {
+        this.nodeId = nodeId;
+        this.nodeName = nodeName;
+    }
+
+    @Override
+    public String getId() {
+        return this.nodeId;
+    }
+
+    @Override
+    public String getName() {
+        return this.nodeName;
+    }
+
+    @Override
+    public NodeType getType() {
+        return NodeType.WORKFLOW_INPUT;
+    }
+
+    @Override
+    public NodeState getState() {
+        return myState;
+    }
+
+    @Override
+    public void setState(NodeState newState) {
+        if (newState.getLevel() > myState.getLevel()) {
+            myState = newState;
+        } else {
+            throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+        }
+    }
+
+    @Override
+    public boolean isReady() {
+        return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+                || !inputDataObjectType.isIsRequired();
+    }
+
+    @Override
+    public InputDataObjectType getInputObject() {
+        return this.inputDataObjectType;
+    }
+
+    @Override
+    public void setInputObject(InputDataObjectType inputObject) {
+        this.inputDataObjectType = inputObject;
+    }
+
+    @Override
+    public OutPort getOutPort() {
+        return this.outPort;
+    }
+
+    @Override
+    public void setOutPort(OutPort outPort) {
+        this.outPort = outPort;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java
new file mode 100644
index 0000000..e86a740
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowNode.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+public interface WorkflowNode {
+
+    public String getId();
+
+    public String getName();
+
+    public NodeType getType();
+
+    public NodeState getState();
+
+    public void setState(NodeState newState);
+
+    public boolean isReady();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java
new file mode 100644
index 0000000..340ac30
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNode.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.InPort;
+
+public interface WorkflowOutputNode extends WorkflowNode {
+
+    public OutputDataObjectType getOutputObject();
+
+    public void setOutputObject(OutputDataObjectType outputObject);
+
+    public InPort getInPort();
+
+    public void setInPort(InPort inPort);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java
new file mode 100644
index 0000000..294109f
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/nodes/WorkflowOutputNodeImpl.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.nodes;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.port.InPort;
+
+public class WorkflowOutputNodeImpl implements WorkflowOutputNode {
+
+    private NodeState myState = NodeState.WAITING;
+    private final String nodeId;
+    private String nodeName;
+    private OutputDataObjectType outputDataObjectType;
+    private InPort inPort;
+
+    public WorkflowOutputNodeImpl(String nodeId) {
+        this(nodeId, null);
+    }
+
+    public WorkflowOutputNodeImpl(String nodeId, String nodeName) {
+        this.nodeId = nodeId;
+        this.nodeName = nodeName;
+    }
+
+    @Override
+    public String getId() {
+        return this.nodeId;
+    }
+
+    @Override
+    public String getName() {
+        return this.nodeName;
+    }
+
+    @Override
+    public NodeType getType() {
+        return NodeType.WORKFLOW_OUTPUT;
+    }
+
+    @Override
+    public NodeState getState() {
+        return myState;
+    }
+
+    @Override
+    public void setState(NodeState newState) {
+        if (newState.getLevel() > myState.getLevel()) {
+            myState = newState;
+        } else {
+            throw new IllegalStateException("Node state can't be reversed. currentState : " + myState.toString() + " , newState " + newState.toString());
+        }
+    }
+
+    @Override
+    public boolean isReady() {
+        return !(inPort.getInputObject() == null || inPort.getInputObject().getValue() == null
+                || inPort.getInputObject().getValue().equals(""));
+    }
+
+    @Override
+    public OutputDataObjectType getOutputObject() {
+        return this.outputDataObjectType;
+    }
+
+    @Override
+    public void setOutputObject(OutputDataObjectType outputObject) {
+        this.outputDataObjectType = outputObject;
+    }
+
+    @Override
+    public InPort getInPort() {
+        return this.inPort;
+    }
+
+    @Override
+    public void setInPort(InPort inPort) {
+        this.inPort = inPort;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java
new file mode 100644
index 0000000..fb8dfa7
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InPort.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+
+public interface InPort extends Port {
+
+    public void setInputObject(InputDataObjectType inputObject);
+
+    public InputDataObjectType getInputObject();
+
+    public Edge getEdge();
+
+    public void addEdge(Edge edge);
+
+    public String getDefaultValue();
+
+    public void setDefaultValue(String defaultValue);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java
new file mode 100644
index 0000000..43a41be
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/InputPortIml.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+public class InputPortIml implements InPort {
+
+    private InputDataObjectType inputDataObjectType;
+    private boolean ready = false;
+    private String portId;
+    private Edge edge;
+    private WorkflowNode node;
+    private String defaultValue;
+
+    public InputPortIml(String portId) {
+        this.portId = portId;
+    }
+
+    @Override
+    public void setInputObject(InputDataObjectType inputObject) {
+        this.inputDataObjectType = inputObject;
+        ready = (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+                || !inputDataObjectType.isIsRequired();
+    }
+
+    @Override
+    public InputDataObjectType getInputObject() {
+        return this.inputDataObjectType;
+    }
+
+    @Override
+    public Edge getEdge() {
+        return this.edge;
+    }
+
+    @Override
+    public void addEdge(Edge edge) {
+        this.edge = edge;
+    }
+
+    @Override
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(String defaultValue) {
+        this.defaultValue = defaultValue;
+    }
+
+    @Override
+    public boolean isReady() {
+        return getInputObject() != null && (!inputDataObjectType.isIsRequired() ||
+                (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("")));
+    }
+
+    @Override
+    public WorkflowNode getNode() {
+        return this.node;
+    }
+
+    @Override
+    public void setNode(WorkflowNode workflowNode) {
+        this.node = workflowNode;
+    }
+
+    @Override
+    public String getId() {
+        return this.portId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
new file mode 100644
index 0000000..d9aa004
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPort.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+
+import java.util.List;
+
+public interface OutPort extends Port {
+
+    public void setOutputObject(OutputDataObjectType outputObject);
+
+    public OutputDataObjectType getOutputObject();
+
+    public List<Edge> getOutEdges();
+
+    public void addEdge(Edge edge);
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
new file mode 100644
index 0000000..9c3f86a
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/OutPortImpl.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.workflow.core.dag.edge.Edge;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OutPortImpl implements OutPort {
+
+    private OutputDataObjectType outputDataObjectType;
+    private List<Edge> outEdges = new ArrayList<Edge>();
+    private boolean isSatisfy = false;
+    private String portId;
+    private WorkflowNode node;
+
+    public OutPortImpl(String portId) {
+        this.portId = portId;
+    }
+
+    @Override
+    public void setOutputObject(OutputDataObjectType outputObject) {
+        this.outputDataObjectType = outputObject;
+    }
+
+    @Override
+    public OutputDataObjectType getOutputObject() {
+        return this.outputDataObjectType;
+    }
+
+    @Override
+    public List<Edge> getOutEdges() {
+        return this.outEdges;
+    }
+
+    @Override
+    public void addEdge(Edge edge) {
+        this.outEdges.add(edge);
+    }
+
+    @Override
+    public boolean isReady() {
+        return this.outputDataObjectType.getValue() != null
+                && !this.outputDataObjectType.getValue().equals("");
+    }
+
+    @Override
+    public WorkflowNode getNode() {
+        return this.node;
+    }
+
+    @Override
+    public void setNode(WorkflowNode workflowNode) {
+        this.node = workflowNode;
+    }
+
+    @Override
+    public String getId() {
+        return portId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/509f2037/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java
new file mode 100644
index 0000000..e3756cf
--- /dev/null
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/dag/port/Port.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.core.dag.port;
+
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
+
+public interface Port {
+
+    public boolean isReady();
+
+    public WorkflowNode getNode();
+
+    public void setNode(WorkflowNode workflowNode);
+
+    public String getId();
+    
+}