You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/03/06 22:19:02 UTC

[2/2] airavata git commit: Fixed AIRAVATA-1620.

Fixed AIRAVATA-1620.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/774b092d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/774b092d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/774b092d

Branch: refs/heads/new-workflow-design-rabbitmq
Commit: 774b092d31e41919b39ca3ce9f1edd5af1c30669
Parents: 44d89ee
Author: shamrath <sh...@gmail.com>
Authored: Fri Mar 6 16:18:44 2015 -0500
Committer: shamrath <sh...@gmail.com>
Committed: Fri Mar 6 16:18:44 2015 -0500

----------------------------------------------------------------------
 .../server/OrchestratorServerHandler.java       |  31 +-
 .../engine/SimpleWorkflowInterpreter.java       | 280 ++++++++-----------
 .../engine/WorkflowEnactmentService.java        | 129 ++++++++-
 3 files changed, 259 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index d168c26..fe306d7 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -52,11 +52,20 @@ import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
 import org.apache.airavata.model.util.ExecutionType;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.TaskStatus;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
 import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
+import org.apache.airavata.orchestrator.util.DataModelUtils;
 import org.apache.airavata.orchestrator.util.OrchestratorServerThreadPoolExecutor;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
@@ -64,17 +73,25 @@ import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
 import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
-import org.apache.airavata.orchestrator.util.DataModelUtils;
-import org.apache.airavata.simple.workflow.engine.SimpleWorkflowInterpreter;
 import org.apache.airavata.simple.workflow.engine.WorkflowEnactmentService;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
-import org.apache.zookeeper.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface,
 		Watcher {
@@ -656,10 +673,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
         try {
             WorkflowEnactmentService.getInstance().
                     submitWorkflow(experimentId, airavataCredStoreToken, getGatewayName(), getRabbitMQProcessPublisher());
-        } catch (RegistryException e) {
-            log.error("Error while launching workflow", e);
         } catch (Exception e) {
-            log.error("Error while initializing rabbit mq process publisher");
+            log.error("Error while launching workflow", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index a052e5c..ee7ff6b 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -24,8 +24,6 @@ package org.apache.airavata.simple.workflow.engine;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
@@ -66,7 +64,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class SimpleWorkflowInterpreter implements Runnable{
+/**
+ * Package-Private class
+ */
+class SimpleWorkflowInterpreter{
 
     private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
     private List<WorkflowInputNode> workflowInputNodes;
@@ -102,8 +103,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
         this.publisher = publisher;
     }
 
-
-    public void launchWorkflow() throws Exception {
+    /**
+     * Package-Private method.
+     * @throws Exception
+     */
+    void launchWorkflow() throws Exception {
         WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
         WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
         log.debug("Initialized workflow parser");
@@ -111,15 +115,16 @@ public class SimpleWorkflowInterpreter implements Runnable{
         log.debug("Parsed the workflow and got the workflow input nodes");
         // process workflow input nodes
         processWorkflowInputNodes(getWorkflowInputNodes());
-        // initialize the rabbitmq status consumer
-        statusConsumer = new RabbitMQStatusConsumer();
-        consumerId = statusConsumer.listen(new TaskMessageHandler());
-
         processReadyList();
     }
 
     // try to remove synchronization tag
-    private synchronized void processReadyList() throws RegistryException, AiravataException {
+    /**
+     * Package-Private method.
+     * @throws RegistryException
+     * @throws AiravataException
+     */
+    void processReadyList() throws RegistryException, AiravataException {
         for (WorkflowNode readyNode : readyList.values()) {
             if (readyNode instanceof WorkflowOutputNode) {
                 WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
@@ -232,10 +237,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
     }
 
     /**
+     * Package-Private method.
      * Remove the workflow node from waiting queue and add it to the ready queue.
      * @param workflowNode - Workflow Node
      */
-    private synchronized void addToReadyQueue(WorkflowNode workflowNode) {
+    synchronized void addToReadyQueue(WorkflowNode workflowNode) {
         waitingList.remove(workflowNode.getId());
         readyList.put(workflowNode.getId(), workflowNode);
     }
@@ -265,31 +271,8 @@ public class SimpleWorkflowInterpreter implements Runnable{
         readyList.remove(wfOutputNode.getId());
     }
 
-    @Override
-    public void run() {
-        try {
-            log.debug("Launching workflow");
-            launchWorkflow();
-            while (continueWorkflow && !(waitingList.isEmpty() && readyList.isEmpty())) {
-//                processReadyList();
-                Thread.sleep(1000);
-            }
-            if (continueWorkflow) {
-                log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
-            } else if (!(waitingList.isEmpty() || readyList.isEmpty())) {
-                log.error("Workflow couldn't execute all workflow nodes due to an error");
-            }
-        } catch (Exception e) {
-            log.error("Error launching workflow", e);
-        } finally {
-            try {
-                statusConsumer.stopListen(consumerId);
-                log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
-            } catch (AiravataException e) {
-                log.error("Error while un-binding status consumer: " + consumerId + " for experiment "
-                        + getExperiment().getExperimentID());
-            }
-        }
+    boolean isAllDone() {
+        return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty());
     }
 
     private void setExperiment(String experimentId) throws RegistryException {
@@ -297,147 +280,108 @@ public class SimpleWorkflowInterpreter implements Runnable{
         log.debug("Retrieve Experiment for experiment id : " + experimentId);
     }
 
-    class TaskMessageHandler implements MessageHandler{
-
-        @Override
-        public Map<String, Object> getProperties() {
-            Map<String, Object> props = new HashMap<String, Object>();
-            String gatewayId = "*";
-            String experimentId = getExperiment().getExperimentID();
-            List<String> routingKeys = new ArrayList<String>();
-//            routingKeys.add(gatewayName+ "." + getExperiment().getExperimentID() + ".*");
-            routingKeys.add(gatewayId);
-            routingKeys.add(gatewayId + "." + experimentId);
-            routingKeys.add(gatewayId + "." + experimentId+ ".*");
-            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-            return props;
-        }
-
-        @Override
-        public void onMessage(MessageContext msgCtx) {
-            String message;
-            if (msgCtx.getType() == MessageType.TASK) {
-                TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
-                TaskIdentifier taskIdentifier = event.getTaskIdentity();
-                handleTaskStatusChangeEvent(event);
-                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
-                log.debug(message);
-            }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
-                TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
-                TaskIdentifier taskIdentifier = event.getTaskIdentity();
-                handleTaskOutputChangeEvent(event);
-                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
-                log.debug(message);
-            } else {
-                // not interesting, ignores
-            }
-        }
-
-        private void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
-
-            String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
-            log.debug("Task Output changed event received for workflow node : " +
-                    taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
-            ProcessContext processContext = processingQueue.get(taskId);
-            Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
-            if (processContext != null) {
-                WorkflowNode workflowNode = processContext.getWorkflowNode();
-                if (workflowNode instanceof ApplicationNode) {
-                    ApplicationNode applicationNode = (ApplicationNode) workflowNode;
-                    // Workflow node can have one to many output ports and each output port can have one to many links
-                    for (OutPort outPort : applicationNode.getOutputPorts()) {
-                        for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
-                            if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
-                                outPort.getOutputObject().setValue(outputDataObjectType.getValue());
-                                break;
-                            }
+    synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+
+        String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
+        log.debug("Task Output changed event received for workflow node : " +
+                taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+        ProcessContext processContext = processingQueue.get(taskId);
+        Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+        if (processContext != null) {
+            WorkflowNode workflowNode = processContext.getWorkflowNode();
+            if (workflowNode instanceof ApplicationNode) {
+                ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+                // Workflow node can have one to many output ports and each output port can have one to many links
+                for (OutPort outPort : applicationNode.getOutputPorts()) {
+                    for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
+                        if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+                            outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+                            break;
                         }
-                        for (Edge edge : outPort.getOutEdges()) {
-                            edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
-                            if (edge.getToPort().getNode().isReady()) {
-                                addToReadyQueue(edge.getToPort().getNode());
-                            }
+                    }
+                    for (Edge edge : outPort.getOutEdges()) {
+                        edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
+                        if (edge.getToPort().getNode().isReady()) {
+                            addToReadyQueue(edge.getToPort().getNode());
                         }
                     }
                 }
-                addToCompleteQueue(processContext);
-                log.debug("removed task from processing queue : " + taskId);
-                try {
-                    processReadyList();
-                } catch (Exception e) {
-                    log.error("Error while processing ready workflow nodes", e);
-                    continueWorkflow = false;
-                }
+            }
+            addToCompleteQueue(processContext);
+            log.debug("removed task from processing queue : " + taskId);
+            try {
+                processReadyList();
+            } catch (Exception e) {
+                log.error("Error while processing ready workflow nodes", e);
+                continueWorkflow = false;
             }
         }
+    }
 
-        private void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
-            TaskState taskState = taskStatusChangeEvent.getState();
-            TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
-            String taskId = taskIdentity.getTaskId();
-            ProcessContext processContext = processingQueue.get(taskId);
-            if (processContext != null) {
-                WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
-                switch (taskState) {
-                    case WAITING:
-                        break;
-                    case STARTED:
-                        break;
-                    case PRE_PROCESSING:
-                        wfNodeState = WorkflowNodeState.INVOKED;
-                        processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
-                        break;
-                    case INPUT_DATA_STAGING:
-                        wfNodeState = WorkflowNodeState.INVOKED;
-                        processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
-                        break;
-                    case EXECUTING:
-                        wfNodeState = WorkflowNodeState.EXECUTING;
-                        processContext.getWorkflowNode().setState(NodeState.EXECUTING);
-                        break;
-                    case OUTPUT_DATA_STAGING:
-                        wfNodeState = WorkflowNodeState.COMPLETED;
-                        processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
-                        break;
-                    case POST_PROCESSING:
-                        wfNodeState = WorkflowNodeState.COMPLETED;
-                        processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
-                        break;
-                    case COMPLETED:
-                        wfNodeState = WorkflowNodeState.COMPLETED;
-                        processContext.getWorkflowNode().setState(NodeState.EXECUTED);
-                        break;
-                    case FAILED:
-                        wfNodeState = WorkflowNodeState.FAILED;
-                        processContext.getWorkflowNode().setState(NodeState.FAILED);
-                        break;
-                    case UNKNOWN:
-                        wfNodeState = WorkflowNodeState.UNKNOWN;
-                        break;
-                    case CONFIGURING_WORKSPACE:
-                        wfNodeState = WorkflowNodeState.COMPLETED;
-                        break;
-                    case CANCELED:
-                    case CANCELING:
-                        wfNodeState = WorkflowNodeState.CANCELED;
-                        processContext.getWorkflowNode().setState(NodeState.FAILED);
-                        break;
-                    default:
-                        break;
-                }
-                if (wfNodeState != WorkflowNodeState.UNKNOWN) {
-                    try {
-                        updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
-                    } catch (RegistryException e) {
-                        log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
-                                + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
-                                + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
-                    }
+    void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
+        TaskState taskState = taskStatusChangeEvent.getState();
+        TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
+        String taskId = taskIdentity.getTaskId();
+        ProcessContext processContext = processingQueue.get(taskId);
+        if (processContext != null) {
+            WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
+            switch (taskState) {
+                case WAITING:
+                    break;
+                case STARTED:
+                    break;
+                case PRE_PROCESSING:
+                    wfNodeState = WorkflowNodeState.INVOKED;
+                    processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                    break;
+                case INPUT_DATA_STAGING:
+                    wfNodeState = WorkflowNodeState.INVOKED;
+                    processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                    break;
+                case EXECUTING:
+                    wfNodeState = WorkflowNodeState.EXECUTING;
+                    processContext.getWorkflowNode().setState(NodeState.EXECUTING);
+                    break;
+                case OUTPUT_DATA_STAGING:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                    break;
+                case POST_PROCESSING:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                    break;
+                case COMPLETED:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    processContext.getWorkflowNode().setState(NodeState.EXECUTED);
+                    break;
+                case FAILED:
+                    wfNodeState = WorkflowNodeState.FAILED;
+                    processContext.getWorkflowNode().setState(NodeState.FAILED);
+                    break;
+                case UNKNOWN:
+                    wfNodeState = WorkflowNodeState.UNKNOWN;
+                    break;
+                case CONFIGURING_WORKSPACE:
+                    wfNodeState = WorkflowNodeState.COMPLETED;
+                    break;
+                case CANCELED:
+                case CANCELING:
+                    wfNodeState = WorkflowNodeState.CANCELED;
+                    processContext.getWorkflowNode().setState(NodeState.FAILED);
+                    break;
+                default:
+                    break;
+            }
+            if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+                try {
+                    updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
+                } catch (RegistryException e) {
+                    log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
+                            + processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
+                            + processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
                 }
             }
-
         }
-    }
 
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/774b092d/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
index ec5acfa..c7ab7b9 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowEnactmentService.java
@@ -21,23 +21,46 @@
 
 package org.apache.airavata.simple.workflow.engine;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
-import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class WorkflowEnactmentService {
 
     private static WorkflowEnactmentService workflowEnactmentService;
+    private final RabbitMQStatusConsumer statusConsumer;
+    private String consumerId;
     private ExecutorService executor;
+    private Map<String,SimpleWorkflowInterpreter> workflowMap;
 
-    private WorkflowEnactmentService () {
+    private WorkflowEnactmentService () throws AiravataException {
         executor = Executors.newFixedThreadPool(getThreadPoolSize());
+        workflowMap = new ConcurrentHashMap<String, SimpleWorkflowInterpreter>();
+        statusConsumer = new RabbitMQStatusConsumer();
+        consumerId = statusConsumer.listen(new TaskMessageHandler());
+        // register the shutdown hook to un-bind status consumer.
+        Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
     }
 
-    public static WorkflowEnactmentService getInstance(){
+    public static WorkflowEnactmentService getInstance() throws AiravataException {
         if (workflowEnactmentService == null) {
             synchronized (WorkflowEnactmentService.class) {
                 if (workflowEnactmentService == null) {
@@ -51,14 +74,110 @@ public class WorkflowEnactmentService {
     public void submitWorkflow(String experimentId,
                                   String credentialToken,
                                   String gatewayName,
-                                  RabbitMQProcessPublisher publisher) throws RegistryException {
+                                  RabbitMQProcessPublisher publisher) throws Exception {
 
         SimpleWorkflowInterpreter simpleWorkflowInterpreter = new SimpleWorkflowInterpreter(
                 experimentId, credentialToken,gatewayName, publisher);
-        executor.execute(simpleWorkflowInterpreter);
+        workflowMap.put(experimentId, simpleWorkflowInterpreter);
+        simpleWorkflowInterpreter.launchWorkflow();
+
     }
 
     private int getThreadPoolSize() {
         return ServerSettings.getEnactmentThreadPoolSize();
     }
+
+    private class TaskMessageHandler implements MessageHandler {
+
+        @Override
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            String gatewayId = "*";
+            String experimentId = "*";
+            List<String> routingKeys = new ArrayList<String>();
+            routingKeys.add(gatewayId);
+            routingKeys.add(gatewayId + "." + experimentId);
+            routingKeys.add(gatewayId + "." + experimentId+ ".*");
+            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+            return props;
+        }
+
+        @Override
+        public void onMessage(MessageContext msgCtx) {
+            StatusHandler statusHandler = new StatusHandler(msgCtx);
+            executor.execute(statusHandler);
+        }
+
+
+    }
+
+    private class StatusHandler implements Runnable{
+        private final Logger log = LoggerFactory.getLogger(StatusHandler.class);
+
+        private final MessageContext msgCtx;
+
+        public StatusHandler(MessageContext msgCtx) {
+            this.msgCtx = msgCtx;
+        }
+
+        @Override
+        public void run() {
+            process();
+        }
+
+        private void process() {
+            String message;
+            SimpleWorkflowInterpreter simpleWorkflowInterpreter;
+            if (msgCtx.getType() == MessageType.TASK) {
+                TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+                TaskIdentifier taskIdentifier = event.getTaskIdentity();
+                simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+                if (simpleWorkflowInterpreter != null) {
+                    simpleWorkflowInterpreter.handleTaskStatusChangeEvent(event);
+                } else {
+                    // this happens when Task status messages comes after the Taskoutput messages,as we have worked on
+                    // output changes it is ok to ignore this.
+                }
+                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+                log.debug(message);
+            }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+                TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+                TaskIdentifier taskIdentifier = event.getTaskIdentity();
+                simpleWorkflowInterpreter = getInterpreter(taskIdentifier.getExperimentId());
+                if (simpleWorkflowInterpreter != null) {
+                    simpleWorkflowInterpreter.handleTaskOutputChangeEvent(event);
+                    if (simpleWorkflowInterpreter.isAllDone()) {
+                        workflowMap.remove(taskIdentifier.getExperimentId());
+                    }
+                } else {
+                    throw new IllegalArgumentException("Error while processing TaskOutputChangeEvent, " +
+                            "There is no registered workflow for experiment Id : " + taskIdentifier.getExperimentId());
+                }
+                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+                log.debug(message);
+            } else {
+                // not interested, ignores
+            }
+        }
+
+        private SimpleWorkflowInterpreter getInterpreter(String experimentId){
+            return workflowMap.get(experimentId);
+        }
+    }
+
+
+    private class EnactmentShutDownHook extends Thread {
+        private final Logger log = LoggerFactory.getLogger(EnactmentShutDownHook.class);
+        @Override
+        public void run() {
+            super.run();
+            try {
+                statusConsumer.stopListen(consumerId);
+                log.info("Successfully un-binded task status consumer");
+            } catch (AiravataException e) {
+                log.error("Error while un-bind enactment status consumer", e);
+            }
+        }
+    }
 }