You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/26 18:08:10 UTC

[11/50] [abbrv] airavata git commit: Added temporary pulisher to publish task status change events and outputchange events, Refactored workflow interpreter code and improved it to lauch and iterate the workflow

Added temporary pulisher to publish task status change events and outputchange events, Refactored workflow interpreter code and improved it to lauch and iterate the workflow


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/27f6f1b1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/27f6f1b1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/27f6f1b1

Branch: refs/heads/master
Commit: 27f6f1b12c83448b702064df5f90fb4103ec36c4
Parents: 20d6817
Author: shamrath <sh...@gmail.com>
Authored: Fri Feb 20 20:29:18 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Fri Feb 20 20:29:18 2015 -0500

----------------------------------------------------------------------
 .../simple/workflow/engine/ProcessPack.java     |  62 ++++++
 .../engine/SimpleWorkflowInterpreter.java       | 204 ++++++++++++++-----
 .../simple/workflow/engine/WfNodeContainer.java |  51 -----
 .../simple/workflow/engine/WorkflowUtil.java    |  10 +
 .../engine/dag/nodes/WorkflowInputNodeImpl.java |   3 +-
 5 files changed, 228 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java
new file mode 100644
index 0000000..ab8b724
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/ProcessPack.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.ariavata.simple.workflow.engine;
+
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+
+public class ProcessPack {
+    private WorkflowNode workflowNode;
+    private WorkflowNodeDetails wfNodeDetails;
+    private TaskDetails taskDetails;
+
+    public ProcessPack(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails, TaskDetails taskDetails) {
+        this.workflowNode = workflowNode;
+        this.wfNodeDetails = wfNodeDetails;
+        this.taskDetails = taskDetails;
+    }
+
+    public WorkflowNode getWorkflowNode() {
+        return workflowNode;
+    }
+
+    public void setWorkflowNode(WorkflowNode workflowNode) {
+        this.workflowNode = workflowNode;
+    }
+
+    public WorkflowNodeDetails getWfNodeDetails() {
+        return wfNodeDetails;
+    }
+
+    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
+        this.wfNodeDetails = wfNodeDetails;
+    }
+
+    public TaskDetails getTaskDetails() {
+        return taskDetails;
+    }
+
+    public void setTaskDetails(TaskDetails taskDetails) {
+        this.taskDetails = taskDetails;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index b4ec3cb..e122fa6 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -21,13 +21,18 @@
 
 package org.apache.ariavata.simple.workflow.engine;
 
+import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
 import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
 import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.util.ExperimentModelUtil;
 import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
@@ -36,6 +41,7 @@ import org.apache.airavata.registry.cpi.ChildDataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.ariavata.simple.workflow.engine.dag.edge.Edge;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.ApplicationNode;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.NodeState;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
@@ -43,6 +49,8 @@ import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
 import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
 import org.apache.ariavata.simple.workflow.engine.dag.port.InPort;
 import org.apache.ariavata.simple.workflow.engine.dag.port.OutPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -53,6 +61,7 @@ import java.util.Set;
 
 public class SimpleWorkflowInterpreter implements Runnable{
 
+    private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
 
     private List<WorkflowInputNode> workflowInputNodes;
 
@@ -60,11 +69,12 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     private String credentialToken;
 
-    private List<WorkflowNode> readList = new ArrayList<WorkflowNode>();
-    private List<WorkflowNode> waitingList = new ArrayList<WorkflowNode>();
-    private Map<String,WfNodeContainer> processingQueue = new HashMap<String, WfNodeContainer>();
-    private List<WorkflowNode> completeList = new ArrayList<WorkflowNode>();
+    private Map<String, WorkflowNode> readList = new HashMap<String, WorkflowNode>();
+    private Map<String, WorkflowNode> waitingList = new HashMap<String, WorkflowNode>();
+    private Map<String, ProcessPack> processingQueue = new HashMap<String, ProcessPack>();
+    private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>();
     private Registry registry;
+    private EventBus eventBus = new EventBus();
 
     public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) {
         // read the workflow file and build the topology to a DAG. Then execute that dag
@@ -77,6 +87,9 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     public void launchWorkflow() throws Exception {
         // process workflow input nodes
+        WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
+        WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
+        setWorkflowInputNodes(workflowParser.parse());
         processWorkflowInputNodes(getWorkflowInputNodes());
         processReadyList();
         // process workflow application nodes
@@ -85,11 +98,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     // try to remove synchronization tag
     private synchronized void processReadyList() {
-        for (WorkflowNode readyNode : readList) {
+        for (WorkflowNode readyNode : readList.values()) {
             try {
                 WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
                 TaskDetails process = getProcess(workflowNodeDetails);
-                processingQueue.put(process.getTaskID(), new WfNodeContainer(readyNode, workflowNodeDetails));
+                addToProcessingQueue(new ProcessPack(readyNode, workflowNodeDetails, process));
                 publishToProcessQueue(process);
             } catch (RegistryException e) {
                 // FIXME : handle this exception
@@ -98,6 +111,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
     }
 
     private void publishToProcessQueue(TaskDetails process) {
+        Thread thread = new Thread(new TempPublisher(process, eventBus));
+        thread.start();
         //TODO: publish to process queue.
     }
 
@@ -150,31 +165,21 @@ public class SimpleWorkflowInterpreter implements Runnable{
         Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
         for (WorkflowInputNode wfInputNode : wfInputNodes) {
             if (wfInputNode.isReady()) {
-
-//                for (Edge edge : wfInputNode.getOutputLinks()) {
-//                    WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getToPort().getInputObject());
-//                    tempNodeSet.add(edge.getToPort().getNode());
-//                }
-            }
-        }
-        for (WorkflowNode workflowNode : tempNodeSet) {
-            if (workflowNode.isReady()) {
-                readList.add(workflowNode);
-            } else {
-                waitingList.add(workflowNode);
+                for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
+                    edge.getToPort().setInputObject(
+                            WorkflowUtil.copyValues(wfInputNode.getInputObject(), edge.getToPort().getInputObject()));
+                    if (edge.getToPort().getNode().isReady()) {
+                        addToReadyQueue(edge.getToPort().getNode());
+                    } else {
+                        addToWaitingQueue(edge.getToPort().getNode());
+                    }
+                }
             }
         }
     }
 
 
     public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception {
-        if (workflowInputNodes == null) {
-            // read workflow description from registry and parse it
-            WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
-            List<WorkflowInputNode> wfInputNodes = wfFactory.getWorkflowParser(experiment.getExperimentID(),
-                    credentialToken).parse();
-            setWorkflowInputNodes(wfInputNodes);
-        }
         return workflowInputNodes;
     }
 
@@ -208,29 +213,29 @@ public class SimpleWorkflowInterpreter implements Runnable{
     @Subscribe
     public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
         String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
-        WfNodeContainer wfNodeContainer = processingQueue.get(taskId);
+        ProcessPack processPack = processingQueue.get(taskId);
         Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
-        if (wfNodeContainer != null) {
-            WorkflowNode workflowNode = wfNodeContainer.getWorkflowNode();
+        if (processPack != null) {
+            WorkflowNode workflowNode = processPack.getWorkflowNode();
             if (workflowNode instanceof ApplicationNode) {
                 ApplicationNode applicationNode = (ApplicationNode) workflowNode;
                 // Workflow node can have one to many output ports and each output port can have one to many links
                 for (OutPort outPort : applicationNode.getOutputPorts()) {
-//                    for (Edge edge : outPort.getOutputLinks()) {
-//                        WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject());
-//                        tempWfNodeSet.add(edge.getToPort().getNode());
-//                    }
-                }
-
-                for (WorkflowNode node : tempWfNodeSet) {
-                    if (node.isReady()) {
-                        waitingList.remove(node);
-                        readList.add(node);
+                    for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) {
+                        if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+                            outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+                            break;
+                        }
+                    }
+                    for (Edge edge : outPort.getOutEdges()) {
+                        WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject());
+                        if (edge.getToPort().getNode().isReady()) {
+                            addToReadyQueue(edge.getToPort().getNode());
+                        }
                     }
                 }
             }
             processingQueue.remove(taskId);
-            processReadyList();
         }
 
     }
@@ -238,8 +243,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
     @Subscribe
     public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
         String taskId = taskStatus.getTaskIdentity().getTaskId();
-        WfNodeContainer wfNodeContainer = processingQueue.get(taskId);
-        if (wfNodeContainer != null) {
+        ProcessPack processPack = processingQueue.get(taskId);
+        if (processPack != null) {
             WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
             switch (taskStatus.getState()) {
                 case WAITING:
@@ -247,25 +252,25 @@ public class SimpleWorkflowInterpreter implements Runnable{
                 case STARTED:
                     break;
                 case PRE_PROCESSING:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+                    processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
                     break;
                 case INPUT_DATA_STAGING:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
+                    processPack.getWorkflowNode().setNodeState(NodeState.PRE_PROCESSING);
                     break;
                 case OUTPUT_DATA_STAGING:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+                    processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
                     break;
                 case EXECUTING:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.EXECUTING);
+                    processPack.getWorkflowNode().setNodeState(NodeState.EXECUTING);
                     break;
                 case POST_PROCESSING:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
+                    processPack.getWorkflowNode().setNodeState(NodeState.POST_PROCESSING);
                     break;
                 case COMPLETED:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.EXECUTED);
+                    processPack.getWorkflowNode().setNodeState(NodeState.EXECUTED);
                     break;
                 case FAILED:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.FAILED);
+                    processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
                     break;
                 case UNKNOWN:
                     break;
@@ -273,14 +278,14 @@ public class SimpleWorkflowInterpreter implements Runnable{
                     break;
                 case CANCELED:
                 case CANCELING:
-                    wfNodeContainer.getWorkflowNode().setNodeState(NodeState.FAILED);
+                    processPack.getWorkflowNode().setNodeState(NodeState.FAILED);
                     break;
                 default:
                     break;
             }
             if (wfNodeState != WorkflowNodeState.UNKNOWN) {
                 try {
-                    updateWorkflowNodeStatus(wfNodeContainer.getWfNodeDetails(), wfNodeState);
+                    updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState);
                 } catch (RegistryException e) {
                     // TODO: handle this.
                 }
@@ -289,8 +294,107 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     }
 
+    /**
+     * Remove the workflow node from waiting queue and add it to the ready queue.
+     * @param workflowNode - Workflow Node
+     */
+    private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+        waitingList.remove(workflowNode.getNodeId());
+        readList.put(workflowNode.getNodeId(), workflowNode);
+    }
+
+    private void addToWaitingQueue(WorkflowNode workflowNode) {
+        waitingList.put(workflowNode.getNodeId(), workflowNode);
+    }
+
+    /**
+     * First remove the node from ready list and then add the WfNodeContainer to the process queue.
+     * Note that underline data structure of the process queue is a Map.
+     * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails
+     */
+    private synchronized void addToProcessingQueue(ProcessPack processPack) {
+        readList.remove(processPack.getWorkflowNode().getNodeId());
+        processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack);
+    }
+
+    private synchronized void addToCompleteQueue(ProcessPack processPack) {
+        processingQueue.remove(processPack.getTaskDetails().getTaskID());
+        completeList.put(processPack.getTaskDetails().getTaskID(), processPack);
+    }
+
+
     @Override
     public void run() {
         // TODO: Auto generated method body.
+        try {
+            launchWorkflow();
+            while (!(waitingList.isEmpty() && readList.isEmpty())) {
+                processReadyList();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    class TempPublisher implements Runnable {
+        private TaskDetails tempTaskDetails;
+        private EventBus tempEventBus;
+
+        public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) {
+            this.tempTaskDetails = tempTaskDetails;
+            this.tempEventBus = tempEventBus;
+        }
+
+        @Override
+        public void run() {
+            try {
+                TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null);
+                TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+                statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier);
+                tempEventBus.post(statusChangeEvent);
+                Thread.sleep(1000);
+
+                List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs();
+                List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs();
+                log.info("**************   Task output change event fired for application id :" + tempTaskDetails.getApplicationId());
+                if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) {
+                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) +
+                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                } else if (tempTaskDetails.getApplicationId().equals("Subtract")) {
+                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) -
+                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                } else if (tempTaskDetails.getApplicationId().equals("Multiply")) {
+                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) *
+                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+                }
+                TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier);
+                eventBus.post(taskOutputChangeEvent);
+
+            } catch (InterruptedException e) {
+                log.error("Thread was interrupted while sleeping");
+            }
+
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java
deleted file mode 100644
index e0cebd6..0000000
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WfNodeContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.ariavata.simple.workflow.engine;
-
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
-import org.apache.ariavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-
-public class WfNodeContainer {
-    private WorkflowNode workflowNode;
-    private WorkflowNodeDetails wfNodeDetails;
-
-    public WfNodeContainer(WorkflowNode workflowNode, WorkflowNodeDetails wfNodeDetails) {
-        this.workflowNode = workflowNode;
-        this.wfNodeDetails = wfNodeDetails;
-    }
-
-    public WorkflowNode getWorkflowNode() {
-        return workflowNode;
-    }
-
-    public void setWorkflowNode(WorkflowNode workflowNode) {
-        this.workflowNode = workflowNode;
-    }
-
-    public WorkflowNodeDetails getWfNodeDetails() {
-        return wfNodeDetails;
-    }
-
-    public void setWfNodeDetails(WorkflowNodeDetails wfNodeDetails) {
-        this.wfNodeDetails = wfNodeDetails;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
index 71d0288..d4bbad3 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/WorkflowUtil.java
@@ -21,12 +21,21 @@
 
 package org.apache.ariavata.simple.workflow.engine;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
 
 public class WorkflowUtil {
 
     public static InputDataObjectType copyValues(InputDataObjectType fromInputObj, InputDataObjectType toInputObj){
+        if (toInputObj == null) {
+            // TODO : throw an error
+        }
         toInputObj.setValue(fromInputObj.getValue());
         if (fromInputObj.getApplicationArgument() != null
                 && !fromInputObj.getApplicationArgument().trim().equals("")) {
@@ -40,4 +49,5 @@ public class WorkflowUtil {
         return inputData;
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/27f6f1b1/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
index 2f912b3..f419ae2 100644
--- a/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/ariavata/simple/workflow/engine/dag/nodes/WorkflowInputNodeImpl.java
@@ -67,7 +67,8 @@ public class WorkflowInputNodeImpl implements WorkflowInputNode {
 
     @Override
     public boolean isReady() {
-        return inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals("");
+        return (inputDataObjectType.getValue() != null && !inputDataObjectType.getValue().equals(""))
+                || !inputDataObjectType.isIsRequired();
     }
 
     @Override