You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:30 UTC

[03/15] airavata git commit: adding consumers to gfac without leader election

adding consumers to gfac without leader election


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0149c1af
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0149c1af
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0149c1af

Branch: refs/heads/master
Commit: 0149c1afd477ab1d86d19374bcb9824520d3a3bc
Parents: 88d27d9
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Feb 11 15:48:40 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Feb 11 15:48:40 2015 -0500

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   |  21 +-
 .../main/resources/airavata-server.properties   |   7 +-
 .../main/resources/airavata-server.properties   |   5 +-
 .../airavata/gfac/server/GfacServerHandler.java |  75 ++++-
 .../messaging/core/MessagingConstants.java      |   3 +-
 .../messaging/core/PublisherFactory.java        |   2 +-
 .../airavata/messaging/core/TestClient.java     |   5 +-
 .../messaging/core/impl/RabbitMQConsumer.java   | 258 -----------------
 .../core/impl/RabbitMQStatusConsumer.java       | 274 +++++++++++++++++++
 .../core/impl/RabbitMQStatusPublisher.java      |   2 +-
 .../core/impl/RabbitMQTaskLaunchConsumer.java   | 239 ++++++++++++++++
 .../core/impl/RabbitMQTaskLaunchPublisher.java  |  12 +-
 .../core/utils/OrchestratorConstants.java       |   1 -
 .../airavata/xbaya/messaging/Monitor.java       |   5 +-
 14 files changed, 614 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 4ea0b44..b076e6a 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -28,7 +28,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 
 public class ServerSettings extends ApplicationSettings {
 
-	private static final String DEFAULT_USER = "default.registry.user";
+    private static final String DEFAULT_USER = "default.registry.user";
     private static final String DEFAULT_USER_PASSWORD = "default.registry.password";
     private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway";
 
@@ -51,13 +51,15 @@ public class ServerSettings extends ApplicationSettings {
     private static final String MY_PROXY_USER = "myproxy.user";
     private static final String MY_PROXY_PASSWORD = "myproxy.password";
     private static final String MY_PROXY_LIFETIME = "myproxy.life";
-    private static final String ACTIVITY_PUBLISHER = "activity.publisher";
+    private static final String STATUS_PUBLISHER = "status.publisher";
     private static final String TASK_LAUNCH_PUBLISHER = "task.launch.publisher";
     private static final String ACTIVITY_LISTENERS = "activity.listeners";
     public static final String PUBLISH_RABBITMQ = "publish.rabbitmq";
     public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable";
     public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids";
     public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags";
+    public static final String GFAC_PASSIVE = "gfac.passive"; // by default this is desabled
+
 
     private static boolean stopAllThreads = false;
 
@@ -73,7 +75,7 @@ public class ServerSettings extends ApplicationSettings {
         return getSetting(DEFAULT_USER_GATEWAY);
     }
 
-   public static String getServerContextRoot() {
+    public static String getServerContextRoot() {
         return getSetting(SERVER_CONTEXT_ROOT, "axis2");
     }
 
@@ -151,19 +153,24 @@ public class ServerSettings extends ApplicationSettings {
         return getSetting(ACTIVITY_LISTENERS).split(",");
     }
 
-    public static String getActivityPublisher() throws ApplicationSettingsException{
-        return getSetting(ACTIVITY_PUBLISHER);
+    public static String getStatusPublisher() throws ApplicationSettingsException {
+        return getSetting(STATUS_PUBLISHER);
     }
 
-    public static String getTaskLaunchPublisher() throws ApplicationSettingsException{
+    public static String getTaskLaunchPublisher() throws ApplicationSettingsException {
         return getSetting(TASK_LAUNCH_PUBLISHER);
     }
 
-    public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{
+    public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException {
         String setting = getSetting(PUBLISH_RABBITMQ);
         return Boolean.parseBoolean(setting);
     }
 
+    public static boolean isGFacPassiveMode()throws ApplicationSettingsException {
+        String setting = getSetting(GFAC_PASSIVE);
+        return Boolean.parseBoolean(setting);
+    }
+
     public static boolean isEmbeddedZK() {
         return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index c90fab1..e309901 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -137,6 +137,7 @@ myproxy.password=
 myproxy.life=3600
 # XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
 trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
+gfac.passive=false
 # SSH PKI key pair or ssh password can be used SSH based authentication is used.
 # if user specify both password authentication gets the higher preference
 
@@ -215,10 +216,12 @@ connection.name=xsede
 #publisher
 activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
 publish.rabbitmq=false
-activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
 task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
 rabbitmq.broker.url=amqp://localhost:5672
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
+
 
 ###########################################################################
 # Orchestrator module Configuration

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
index d6be51a..2ecdeb6 100644
--- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
+++ b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
@@ -200,10 +200,11 @@ connection.name=xsede
 #publisher
 activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
 publish.rabbitmq=false
-activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
 task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
 rabbitmq.broker.url=amqp://localhost:5672
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
 
 ###########################################################################
 # Orchestrator module Configuration

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 583ec07..d428d9c 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -27,28 +27,34 @@ import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.common.utils.*;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
 import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.Future;
 
 
@@ -80,6 +86,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
 
     private List<Future> inHandlerFutures;
 
+    private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+
     public GfacServerHandler() throws Exception{
         // registering with zk
         try {
@@ -102,7 +110,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             BetterGfacImpl.startDaemonHandlers();
             BetterGfacImpl.startStatusUpdators(registry, zk, publisher);
             inHandlerFutures = new ArrayList<Future>();
-        } catch (ApplicationSettingsException e) {
+
+            if(ServerSettings.isGFacPassiveMode()) {
+                rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+            }
+
+
+            } catch (ApplicationSettingsException e) {
             logger.error("Error initialising GFAC", e);
             throw new Exception("Error initialising GFAC", e);
         } catch (InterruptedException e) {
@@ -277,4 +291,49 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         }
     }
 
+    private class NotificationMessageHandler implements MessageHandler {
+        private String experimentId;
+
+        private NotificationMessageHandler(String experimentId) {
+            this.experimentId = experimentId;
+        }
+
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            List<String> routingKeys = new ArrayList<String>();
+            routingKeys.add(experimentId);
+            routingKeys.add(experimentId + ".*");
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+            return props;
+        }
+
+        public void onMessage(MessageContext message) {
+            if (message.getType().equals(MessageType.LAUNCHTASK)){
+                try {
+                    TaskSubmitEvent event = new TaskSubmitEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
+                }
+            }else if(message.getType().equals(MessageType.TERMINATETASK)){
+                try {
+                    TaskTerminateEvent event = new TaskTerminateEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    cancelJob(event.getExperimentId(), event.getTaskId());
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
+                }
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index 7458d81..07b39e7 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -23,7 +23,8 @@ package org.apache.airavata.messaging.core;
 
 public abstract class MessagingConstants {
     public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
-    public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+    public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
+    public static final String RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME = "rabbitmq.task.launch.exchange.name";
 
     public static final String RABBIT_ROUTING_KEY = "routingKey";
     public static final String RABBIT_QUEUE= "queue";

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
index 59cdbdf..2e560a3 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
@@ -30,7 +30,7 @@ public class PublisherFactory {
     private static Logger log = LoggerFactory.getLogger(PublisherFactory.class);
 
     public static Publisher createActivityPublisher() throws AiravataException {
-        String activityPublisher = ServerSettings.getActivityPublisher();
+        String activityPublisher = ServerSettings.getStatusPublisher();
 
         if (activityPublisher == null) {
             String s = "Activity publisher is not specified";

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 362f3f2..aea561f 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -25,9 +25,8 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.Message;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
@@ -51,7 +50,7 @@ public class TestClient {
             AiravataUtils.setExecutionAsServer();
             String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
             final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
+            RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
             consumer.listen(new MessageHandler() {
                 @Override
                 public Map<String, Object> getProperties() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
deleted file mode 100644
index 1f13496..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.Consumer;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQConsumer implements Consumer {
-    private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class);
-
-    private String exchangeName;
-    private String url;
-    private Connection connection;
-    private Channel channel;
-    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-
-    public RabbitMQConsumer() throws AiravataException {
-        try {
-            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
-
-            createConnection();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-    }
-
-    public RabbitMQConsumer(String brokerUrl, String exchangeName) throws AiravataException {
-        this.exchangeName = exchangeName;
-        this.url = brokerUrl;
-
-        createConnection();
-    }
-
-    private void createConnection() throws AiravataException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-
-            channel = connection.createChannel();
-            channel.exchangeDeclare(exchangeName, "topic", false);
-
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + exchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public String listen(final MessageHandler handler) throws AiravataException {
-        try {
-            Map<String, Object> props = handler.getProperties();
-            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routing == null) {
-                throw new IllegalArgumentException("The routing key must be present");
-            }
-
-            List<String> keys = new ArrayList<String>();
-            if (routing instanceof List) {
-                for (Object o : (List)routing) {
-                    keys.add(o.toString());
-                }
-            } else if (routing instanceof String) {
-                keys.add((String) routing);
-            }
-
-            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
-            if (queueName == null) {
-                if (!channel.isOpen()) {
-                    channel = connection.createChannel();
-                    channel.exchangeDeclare(exchangeName, "topic", false);
-                }
-                queueName = channel.queueDeclare().getQueue();
-            } else {
-                channel.queueDeclare(queueName, true, false, false, null);
-            }
-
-            final String id = getId(keys, queueName);
-            if (queueDetailsMap.containsKey(id)) {
-                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
-                        "cannot define the same subscriber twice");
-            }
-
-            if (consumerTag == null) {
-                consumerTag = "default";
-            }
-
-            // bind all the routing keys
-            for (String routingKey : keys) {
-                channel.queueBind(queueName, exchangeName, routingKey);
-            }
-
-            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag,
-                                           Envelope envelope,
-                                           AMQP.BasicProperties properties,
-                                           byte[] body) {
-                    Message message = new Message();
-
-                    try {
-                        ThriftUtils.createThriftFromBytes(body, message);
-                        TBase event = null;
-                        String gatewayId = null;
-                        if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
-                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    experimentStatusChangeEvent.getState());
-                            event = experimentStatusChangeEvent;
-                            gatewayId = experimentStatusChangeEvent.getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)) {
-                            WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    wfnStatusChangeEvent.getState());
-                            event = wfnStatusChangeEvent;
-                            gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.TASK)) {
-                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    taskStatusChangeEvent.getState());
-                            event = taskStatusChangeEvent;
-                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.JOB)) {
-                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    jobStatusChangeEvent.getState());
-                            event = jobStatusChangeEvent;
-                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
-                        }
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
-                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-                        handler.onMessage(messageContext);
-                    } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
-                        log.warn(msg, e);
-                    }
-                }
-            });
-            // save the name for deleting the queue
-            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
-            return id;
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + exchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void stopListen(final String id) throws AiravataException {
-        QueueDetails details = queueDetailsMap.get(id);
-        if (details != null) {
-            try {
-                for (String key : details.getRoutingKeys()) {
-                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
-                }
-                channel.queueDelete(details.getQueueName(), true, true);
-            } catch (IOException e) {
-                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
-                log.debug(msg);
-            }
-        }
-    }
-
-    /**
-     * Private class for holding some information about the consumers registered
-     */
-    private class QueueDetails {
-        String queueName;
-
-        List<String> routingKeys;
-
-        private QueueDetails(String queueName, List<String> routingKeys) {
-            this.queueName = queueName;
-            this.routingKeys = routingKeys;
-        }
-
-        public String getQueueName() {
-            return queueName;
-        }
-
-        public List<String> getRoutingKeys() {
-            return routingKeys;
-        }
-    }
-
-    private String getId(List<String> routingKeys, String queueName) {
-        String id = "";
-        for (String key : routingKeys) {
-            id = id + "_" + key;
-        }
-        return id + "_" + queueName;
-    }
-
-    public void close() {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (IOException ignore) {
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
new file mode 100644
index 0000000..d5e8c72
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -0,0 +1,274 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQStatusConsumer implements Consumer {
+    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
+
+    private String exchangeName;
+    private String url;
+    private Connection connection;
+    private Channel channel;
+    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+
+    public RabbitMQStatusConsumer() throws AiravataException {
+        try {
+            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+
+            createConnection();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+    }
+
+    public RabbitMQStatusConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+        this.exchangeName = exchangeName;
+        this.url = brokerUrl;
+
+        createConnection();
+    }
+
+    private void createConnection() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(url);
+            connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+
+            channel = connection.createChannel();
+            channel.exchangeDeclare(exchangeName, "topic", false);
+
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + exchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public String listen(final MessageHandler handler) throws AiravataException {
+        try {
+            Map<String, Object> props = handler.getProperties();
+            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+            if (routing == null) {
+                throw new IllegalArgumentException("The routing key must be present");
+            }
+
+            List<String> keys = new ArrayList<String>();
+            if (routing instanceof List) {
+                for (Object o : (List)routing) {
+                    keys.add(o.toString());
+                }
+            } else if (routing instanceof String) {
+                keys.add((String) routing);
+            }
+
+            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+            if (queueName == null) {
+                if (!channel.isOpen()) {
+                    channel = connection.createChannel();
+                    channel.exchangeDeclare(exchangeName, "topic", false);
+                }
+                queueName = channel.queueDeclare().getQueue();
+            } else {
+                channel.queueDeclare(queueName, true, false, false, null);
+            }
+
+            final String id = getId(keys, queueName);
+            if (queueDetailsMap.containsKey(id)) {
+                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+                        "cannot define the same subscriber twice");
+            }
+
+            if (consumerTag == null) {
+                consumerTag = "default";
+            }
+
+            // bind all the routing keys
+            for (String routingKey : keys) {
+                channel.queueBind(queueName, exchangeName, routingKey);
+            }
+
+            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag,
+                                           Envelope envelope,
+                                           AMQP.BasicProperties properties,
+                                           byte[] body) {
+                    Message message = new Message();
+
+                    try {
+                        ThriftUtils.createThriftFromBytes(body, message);
+                        TBase event = null;
+                        String gatewayId = null;
+                        if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+                                    experimentStatusChangeEvent.getState());
+                            event = experimentStatusChangeEvent;
+                            gatewayId = experimentStatusChangeEvent.getGatewayId();
+                        } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)) {
+                            WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+                                    wfnStatusChangeEvent.getState());
+                            event = wfnStatusChangeEvent;
+                            gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId();
+                        } else if (message.getMessageType().equals(MessageType.TASK)) {
+                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+                                    taskStatusChangeEvent.getState());
+                            event = taskStatusChangeEvent;
+                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+                        } else if (message.getMessageType().equals(MessageType.JOB)) {
+                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+                                    jobStatusChangeEvent.getState());
+                            event = jobStatusChangeEvent;
+                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
+                        }else if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                                    taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+                            event = taskSubmitEvent;
+                            gatewayId = taskSubmitEvent.getGatewayId();
+                        }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
+                            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                                    taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+                            event = taskTerminateEvent;
+                            gatewayId = null;
+                        }
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                        handler.onMessage(messageContext);
+                    } catch (TException e) {
+                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+                        log.warn(msg, e);
+                    }
+                }
+            });
+            // save the name for deleting the queue
+            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+            return id;
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + exchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public void stopListen(final String id) throws AiravataException {
+        QueueDetails details = queueDetailsMap.get(id);
+        if (details != null) {
+            try {
+                for (String key : details.getRoutingKeys()) {
+                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
+                }
+                channel.queueDelete(details.getQueueName(), true, true);
+            } catch (IOException e) {
+                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
+                log.debug(msg);
+            }
+        }
+    }
+
+    /**
+     * Private class for holding some information about the consumers registered
+     */
+    private class QueueDetails {
+        String queueName;
+
+        List<String> routingKeys;
+
+        private QueueDetails(String queueName, List<String> routingKeys) {
+            this.queueName = queueName;
+            this.routingKeys = routingKeys;
+        }
+
+        public String getQueueName() {
+            return queueName;
+        }
+
+        public List<String> getRoutingKeys() {
+            return routingKeys;
+        }
+    }
+
+    private String getId(List<String> routingKeys, String queueName) {
+        String id = "";
+        for (String key : routingKeys) {
+            id = id + "_" + key;
+        }
+        return id + "_" + queueName;
+    }
+
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException ignore) {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index a4b4d1a..fe06ed7 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -45,7 +45,7 @@ public class RabbitMQStatusPublisher implements Publisher {
         String exchangeName;
         try {
             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
+            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
             log.error(message, e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
new file mode 100644
index 0000000..056dcac
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQTaskLaunchConsumer {
+    private final static Logger logger = LoggerFactory.getLogger(RabbitMQTaskLaunchConsumer.class);
+    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
+
+    private String taskLaunchExchangeName;
+    private String url;
+    private Connection connection;
+    private Channel channel;
+    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+
+    public RabbitMQTaskLaunchConsumer() throws AiravataException {
+        try {
+            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+            createConnection();
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+    }
+
+    public RabbitMQTaskLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+        this.taskLaunchExchangeName = exchangeName;
+        this.url = brokerUrl;
+
+        createConnection();
+    }
+
+    private void createConnection() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(url);
+            connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
+
+            channel = connection.createChannel();
+            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public String listen(final MessageHandler handler) throws AiravataException {
+        try {
+            Map<String, Object> props = handler.getProperties();
+            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+            if (routing == null) {
+                throw new IllegalArgumentException("The routing key must be present");
+            }
+
+            List<String> keys = new ArrayList<String>();
+            if (routing instanceof List) {
+                for (Object o : (List)routing) {
+                    keys.add(o.toString());
+                }
+            } else if (routing instanceof String) {
+                keys.add((String) routing);
+            }
+
+            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+            if (queueName == null) {
+                if (!channel.isOpen()) {
+                    channel = connection.createChannel();
+                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+                }
+                queueName = channel.queueDeclare().getQueue();
+            } else {
+                channel.queueDeclare(queueName, true, false, false, null);
+            }
+
+            final String id = getId(keys, queueName);
+            if (queueDetailsMap.containsKey(id)) {
+                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+                        "cannot define the same subscriber twice");
+            }
+
+            if (consumerTag == null) {
+                consumerTag = "default";
+            }
+
+            // bind all the routing keys
+            for (String routingKey : keys) {
+                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+            }
+
+            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag,
+                                           Envelope envelope,
+                                           AMQP.BasicProperties properties,
+                                           byte[] body) {
+                    Message message = new Message();
+
+                    try {
+                        ThriftUtils.createThriftFromBytes(body, message);
+                        TBase event = null;
+                        String gatewayId = null;
+                        if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                                    taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+                            event = taskSubmitEvent;
+                            gatewayId = taskSubmitEvent.getGatewayId();
+                        }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
+                            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+                            log.debug(" Message Received with message id '" + message.getMessageId()
+                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                                    taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+                            event = taskTerminateEvent;
+                            gatewayId = null;
+                        }
+                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                        handler.onMessage(messageContext);
+                    } catch (TException e) {
+                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+                        log.warn(msg, e);
+                    }
+                }
+            });
+            // save the name for deleting the queue
+            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+            return id;
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public void stopListen(final String id) throws AiravataException {
+        QueueDetails details = queueDetailsMap.get(id);
+        if (details != null) {
+            try {
+                for (String key : details.getRoutingKeys()) {
+                    channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
+                }
+                channel.queueDelete(details.getQueueName(), true, true);
+            } catch (IOException e) {
+                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
+                log.debug(msg);
+            }
+        }
+    }
+
+    /**
+     * Private class for holding some information about the consumers registered
+     */
+    private class QueueDetails {
+        String queueName;
+
+        List<String> routingKeys;
+
+        private QueueDetails(String queueName, List<String> routingKeys) {
+            this.queueName = queueName;
+            this.routingKeys = routingKeys;
+        }
+
+        public String getQueueName() {
+            return queueName;
+        }
+
+        public List<String> getRoutingKeys() {
+            return routingKeys;
+        }
+    }
+
+    private String getId(List<String> routingKeys, String queueName) {
+        String id = "";
+        for (String key : routingKeys) {
+            id = id + "_" + key;
+        }
+        return id + "_" + queueName;
+    }
+
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException ignore) {
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index 8029a0c..fe58042 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -44,7 +44,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
         String exchangeName;
         try {
             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
+            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
             log.error(message, e);
@@ -56,7 +56,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
 
     public void publish(MessageContext msgCtx) throws AiravataException {
         try {
-            log.info("Publishing to lauch queue ...");
+            log.info("Publishing to launch queue ...");
             byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
             Message message = new Message();
             message.setEvent(body);
@@ -65,13 +65,9 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
             message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
             String routingKey = null;
             if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){
-                TaskSubmitEvent event = (TaskSubmitEvent) msgCtx.getEvent();
-                routingKey = LAUNCH_TASK + "."+event.getExperimentId() + "." +
-                        event.getTaskId() + "." + event.getGatewayId();
+                routingKey = LAUNCH_TASK;
             }else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){
-                TaskTerminateEvent event = (TaskTerminateEvent) msgCtx.getEvent();
-                routingKey = TERMINATE_TASK + "."+event.getExperimentId() + "." +
-                        event.getTaskId();
+                routingKey = TERMINATE_TASK;
             }
             byte[] messageBody = ThriftUtils.serializeThriftObject(message);
             rabbitMQProducer.send(messageBody, routingKey);

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
index 0e0e425..97b85bc 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
@@ -25,7 +25,6 @@ package org.apache.airavata.orchestrator.core.utils;
  *
  */
 public class OrchestratorConstants {
-    private static final String SUBMITTER_PROPERTY = "job.submitter";
     public static final String AIRAVATA_PROPERTIES = "airavata-server.properties";
     public static final int hotUpdateInterval=1000;
     public static final String JOB_SUBMITTER = "job.submitter";

http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
index 896b248..6ee1111 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
@@ -22,12 +22,11 @@
 package org.apache.airavata.xbaya.messaging;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.messaging.core.Consumer;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.workflow.model.exceptions.WorkflowException;
@@ -101,7 +100,7 @@ public class Monitor extends EventProducer {
     	getEventDataRepository().triggerListenerForPreMonitorStart();
         try {
 //            AiravataUtils.setExecutionAsServer();
-            this.messageClient = new RabbitMQConsumer("amqp://localhost:5672", "airavata_rabbitmq_exchange");
+            this.messageClient = new RabbitMQStatusConsumer("amqp://localhost:5672", "airavata_rabbitmq_exchange");
         } catch (AiravataException e) {
             String msg = "Failed to start the consumer";
             logger.error(msg, e);