You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/09 22:19:41 UTC

[3/4] airavata git commit: Refactored messaging module to remove duplicate code and support multiple subscribers

Refactored messaging module to remove duplicate code and support multiple subscribers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e247b00d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e247b00d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e247b00d

Branch: refs/heads/develop
Commit: e247b00d0b4d65a684f5e5551549c329241492d7
Parents: a6670f8
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Tue Aug 9 18:19:09 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Tue Aug 9 18:19:09 2016 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   |  42 +-
 .../main/resources/airavata-server.properties   |  12 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |  26 +-
 .../apache/airavata/gfac/impl/GFacWorker.java   |   3 +-
 .../airavata/gfac/server/GfacServerHandler.java |  82 +--
 .../messaging/client/RabbitMQListener.java      | 258 +++++----
 .../airavata/messaging/core/Consumer.java       |  40 --
 .../airavata/messaging/core/MessageHandler.java |   4 +-
 .../messaging/core/MessagingConstants.java      |   3 +-
 .../airavata/messaging/core/Metadata.java       |  25 -
 .../airavata/messaging/core/Publisher.java      |   1 +
 .../airavata/messaging/core/TestClient.java     |  56 +-
 .../impl/RabbitMQProcessLaunchConsumer.java     | 574 +++++++++----------
 .../impl/RabbitMQProcessLaunchPublisher.java    |   2 +-
 .../core/impl/RabbitMQStatusConsumer.java       | 286 ---------
 .../server/OrchestratorServerHandler.java       |  47 +-
 .../ExperimentExecution.java                    | 391 +++++++------
 .../workflow/core/WorkflowEnactmentService.java |  60 +-
 .../workflow/core/WorkflowInterpreter.java      |  21 +-
 19 files changed, 785 insertions(+), 1148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 5699e9a..2459658 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -84,8 +84,15 @@ public class ServerSettings extends ApplicationSettings {
     public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable";
     public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids";
     public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags";
-    public static final String LAUNCH_QUEUE_NAME = "launch.queue.name";
-    public static final String CANCEL_QUEUE_NAME = "cancel.queue.name";
+
+    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+    public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
+    public static final String RABBITMQ_PROCESS_EXCHANGE_NAME = "rabbitmq.process.exchange.name";
+    public static final String RABBITMQ_EXPERIMENT_EXCHANGE_NAME = "rabbitmq.experiment.exchange.name";
+    public static final String RABBITMQ_PROCESS_LAUNCH_QUEUE_NAME = "process.launch.queue.name";
+    public static final String RABBITMQ_EXPERIMENT_LAUNCH_QUEUE_NAME = "experiment.launch.queue.name";
+    public static final String RABBITMQ_DURABLE_QUEUE="durable.queue";
+    public static final String RABBITMQ_PREFETCH_COUNT="prefetch.count";
 
 
     //    Workflow Enactment Service component configuration.
@@ -110,13 +117,36 @@ public class ServerSettings extends ApplicationSettings {
         return getSetting(DEFAULT_USER);
     }
 
-    public static String getLaunchQueueName() {
-        return getSetting(LAUNCH_QUEUE_NAME, "launch.queue");
+    public static String getRabbitmqProcessLaunchQueueName() {
+        return getSetting(RABBITMQ_PROCESS_LAUNCH_QUEUE_NAME, "process.launch.queue");
+    }
+
+    public static String getRabbitmqExperimentLaunchQueueName() {
+        return getSetting(RABBITMQ_EXPERIMENT_EXCHANGE_NAME, "experiment.launch.queue");
+    }
+
+    public static String getRabbitmqBrokerUrl() throws ApplicationSettingsException {
+        return getSetting(RABBITMQ_BROKER_URL);
+    }
+
+    public static String getRabbitmqStatusExchangeName(){
+        return getSetting(RABBITMQ_STATUS_EXCHANGE_NAME, "status_exchange");
+    }
+
+    public static String getRabbitmqProcessExchangeName(){
+        return getSetting(RABBITMQ_PROCESS_EXCHANGE_NAME, "process_exchange");
     }
 
+    public static String getRabbitmqExperimentExchangeName() {
+        return getSetting(RABBITMQ_EXPERIMENT_EXCHANGE_NAME, "experiment_exchange");
+    }
+
+    public static boolean getRabbitmqDurableQueue(){
+        return Boolean.valueOf(getSetting(RABBITMQ_DURABLE_QUEUE, "false"));
+    }
 
-    public static String getCancelQueueName() {
-        return getSetting(CANCEL_QUEUE_NAME, "cancel.queue");
+    public static int getRabbitmqPrefetchCount(){
+        return Integer.valueOf(getSetting(RABBITMQ_PREFETCH_COUNT, "200"));
     }
 
     public static String getDefaultUserPassword() throws ApplicationSettingsException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 4fcdd03..29b256f 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -246,16 +246,14 @@ rabbitmq.broker.url=amqp://localhost:5672
 #for production scenarios, give url as amqp://userName:password@hostName:portNumber/virtualHost, create user, virtualhost
 # and give permissions, refer: http://blog.dtzq.com/2012/06/rabbitmq-users-and-virtual-hosts.html
 #rabbitmq.broker.url=amqp://airavata:airavata@localhost:5672/messaging
-status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
-task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher
-rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
-rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
+rabbitmq.status.exchange.name=status_exchange
+rabbitmq.process.exchange.name=process_exchange
+rabbitmq.experiment.exchange.name=experiment_exchange
 durable.queue=false
 prefetch.count=200
-launch.queue.name=launch.queue
-cancel.queue.name=cancel.queue
+process.launch.queue.name=process.launch.queue
+experiment.launch.queue.name=experiment.launch.queue
 activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
 
 ###########################################################################
 # Zookeeper Server Configuration

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index fbeb1d8..d105c18 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
-import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
 import org.apache.airavata.gfac.core.cluster.OutputParser;
@@ -55,15 +54,23 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
-import org.apache.airavata.gfac.impl.job.*;
+import org.apache.airavata.gfac.impl.job.ForkJobConfiguration;
+import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
+import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
+import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration;
+import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.task.ArchiveTask;
 import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
 import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
@@ -81,6 +88,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -106,7 +114,7 @@ public abstract class Factory {
 	private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>();
 	private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
 	private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>();
-	private static RabbitMQProcessLaunchConsumer processLaunchConsumer;
+	private static Subscriber processLaunchSubscriber;
 	private static Map<String, Session> sessionMap = new HashMap<>();
 
 	public static GFacEngine getGFacEngine() throws GFacException {
@@ -159,11 +167,11 @@ public abstract class Factory {
 		return curatorClient;
 	}
 
-	public static RabbitMQProcessLaunchConsumer getProcessLaunchConsumer() throws AiravataException {
-		if (processLaunchConsumer == null) {
-			processLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+	public static synchronized  Subscriber getProcessLaunchSubscriber() throws AiravataException {
+		if (processLaunchSubscriber == null) {
+			processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Subscriber.Type.PROCESS_LAUNCH);
 		}
-		return processLaunchConsumer;
+		return processLaunchSubscriber;
 	}
 
 	public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 6dc4c1d..001b679 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,7 +29,6 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
-import org.apache.airavata.registry.core.experiment.catalog.model.Process;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,7 +239,7 @@ public class GFacWorker implements Runnable {
 			try {
                 long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
                         processContext.getExperimentId(), processId);
-                Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+                Factory.getProcessLaunchSubscriber().sendAck(processDeliveryTag);
                 processContext.setAcknowledge(true);
                 log.info("expId: {}, processId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
                         processId, processDeliveryTag);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c59e199..a490d91 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,7 +28,6 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
@@ -37,11 +36,15 @@ import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.gfac.impl.GFacWorker;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
@@ -60,16 +63,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class GfacServerHandler implements GfacService.Iface {
     private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
-    private RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer;
+    private Subscriber processLaunchSubscriber;
     private static int requestCount=0;
     private ExperimentCatalog experimentCatalog;
     private AppCatalog appCatalog;
@@ -88,7 +89,6 @@ public class GfacServerHandler implements GfacService.Iface {
             initZkDataStructure();
             initAMQPClient();
 	        executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
-            startStatusUpdators(experimentCatalog, curatorClient, statusPublisher, rabbitMQProcessLaunchConsumer);
         } catch (Exception e) {
             throw new AiravataStartupException("Gfac Server Initialization error ", e);
         }
@@ -96,9 +96,10 @@ public class GfacServerHandler implements GfacService.Iface {
 
     private void initAMQPClient() throws AiravataException {
 	    // init process consumer
-        rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer();
-        rabbitMQProcessLaunchConsumer.listen(new ProcessLaunchMessageHandler());
-	    // init status publisher
+        List<String> routingKeys = new ArrayList<>();
+        routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName());
+        processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Subscriber.Type.PROCESS_LAUNCH);
+        // init status publisher
 	    statusPublisher = new RabbitMQStatusPublisher();
     }
 
@@ -173,25 +174,6 @@ public class GfacServerHandler implements GfacService.Iface {
         return false;
     }
 
-    public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, Publisher publisher,
-
-                                           RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer) {
-       /* try {
-            String[] listenerClassList = ServerSettings.getActivityListeners();
-            Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
-            for (String listenerClass : listenerClassList) {
-                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
-                AbstractActivityListener abstractActivityListener = aClass.newInstance();
-                activityListeners.add(abstractActivityListener);
-                abstractActivityListener.setup(statusPublisher, experimentCatalog, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
-                log.info("Registering listener: " + listenerClass);
-                statusPublisher.registerListener(abstractActivityListener);
-            }
-        } catch (Exception e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        }*/
-    }
-
     private class ProcessLaunchMessageHandler implements MessageHandler {
         private String experimentNode;
         private String gfacServerName;
@@ -201,15 +183,6 @@ public class GfacServerHandler implements GfacService.Iface {
             gfacServerName = ServerSettings.getGFacServerName();
         }
 
-        public Map<String, Object> getProperties() {
-            Map<String, Object> props = new HashMap<String, Object>();
-            ArrayList<String> keys = new ArrayList<String>();
-            keys.add(ServerSettings.getLaunchQueueName());
-            keys.add(ServerSettings.getCancelQueueName());
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
-            props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
-            return props;
-        }
 
         public void onMessage(MessageContext message) {
             log.info(" Message Received with message id '" + message.getMessageId()
@@ -232,7 +205,7 @@ public class GfacServerHandler implements GfacService.Iface {
 			                } catch (Exception e) {
 				                log.error("Error while updating delivery tag for redelivery message , messageId : " +
 						                message.getMessageId(), e);
-				                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+				                processLaunchSubscriber.sendAck(message.getDeliveryTag());
 			                }
 		                } else {
 			                // read process status from registry
@@ -264,7 +237,7 @@ public class GfacServerHandler implements GfacService.Iface {
                                 status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                                 Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
                                 publishProcessStatus(event, status);
-                                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+                                processLaunchSubscriber.sendAck(message.getDeliveryTag());
                                 return;
                             } else {
                                 setCancelData(event.getExperimentId(),event.getProcessId());
@@ -273,7 +246,7 @@ public class GfacServerHandler implements GfacService.Iface {
                         submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
-                        rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+                        processLaunchSubscriber.sendAck(message.getDeliveryTag());
                     }
                 } catch (TException e) {
                     log.error(e.getMessage(), e); //nobody is listening so nothing to throw
@@ -283,31 +256,6 @@ public class GfacServerHandler implements GfacService.Iface {
 	                log.error("Error while publishing process status", e);
                 }
             }
-            // TODO - Now there is no process termination type messages, use zookeeper instead of rabbitmq to do that. it is safe to remove this else part.
-            else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
-                ProcessTerminateEvent event = new ProcessTerminateEvent();
-                TBase messageEvent = message.getEvent();
-                try {
-                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                    ThriftUtils.createThriftFromBytes(bytes, event);
-	                boolean success = GFacUtils.setExperimentCancelRequest(event.getProcessId(), curatorClient,
-			                message.getDeliveryTag());
-	                if (success) {
-		                log.info("processId:{} - Process cancel request save successfully", event.getProcessId());
-	                }
-                } catch (Exception e) {
-	                log.error("processId:" + event.getProcessId() + " - Process cancel reqeust failed", e);
-                }finally {
-	                try {
-		                if (!rabbitMQProcessLaunchConsumer.isOpen()) {
-			                rabbitMQProcessLaunchConsumer.reconnect();
-		                }
-		                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
-	                } catch (AiravataException e) {
-		                log.error("processId: " + event.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e);
-	                }
-                }
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
index 301934b..984aa59 100644
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
@@ -22,25 +22,32 @@
 package org.apache.airavata.messaging.client;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.commons.cli.*;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 
 public class RabbitMQListener {
@@ -48,12 +55,9 @@ public class RabbitMQListener {
     public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
     private final static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
     private static String gatewayId = "*";
-    private static boolean gatewayLevelMessages = false;
-    private static boolean experimentLevelMessages = false;
-    private static boolean jobLevelMessages = false;
     private static String experimentId = "*";
     private static String jobId = "*";
-    private static boolean allMessages = false;
+    private static LEVEL level = LEVEL.ALL;
 
     public static void main(String[] args) {
         File file = new File("/tmp/latency_client");
@@ -64,66 +68,39 @@ public class RabbitMQListener {
             String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
             System.out.println("broker url " + brokerUrl);
             final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-            consumer.listen(new MessageHandler() {
-                @Override
-                public Map<String, Object> getProperties() {
-                    Map<String, Object> props = new HashMap<String, Object>();
-                    List<String> routingKeys = new ArrayList<String>();
-                    if (allMessages){
-                        routingKeys.add("*");
-                        routingKeys.add("*.*");
-                        routingKeys.add("*.*.*");
-                        routingKeys.add("*.*.*.*");
-                        routingKeys.add("*.*.*.*.*");
-                    }else {
-                        if (gatewayLevelMessages){
-                            routingKeys.add(gatewayId);
-                            routingKeys.add(gatewayId + ".*");
-                            routingKeys.add(gatewayId + ".*.*");
-                            routingKeys.add(gatewayId + ".*.*.*");
-                            routingKeys.add(gatewayId + ".*.*.*.*");
-                        }else if (experimentLevelMessages){
-                            routingKeys.add(gatewayId);
-                            routingKeys.add(gatewayId + "." + experimentId);
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*");
-                        }else if  (jobLevelMessages){
-                            routingKeys.add(gatewayId);
-                            routingKeys.add(gatewayId + "." + experimentId);
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId);
-                        }
-                    }
-                    props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-                    return props;
-                }
+            List<String> routingKeys = getRoutingKeys(level);
+            Subscriber subscriber = MessagingFactory.getSubscriber(null, routingKeys, Subscriber.Type.STATUS);
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error reading airavata server properties", e);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
 
-                @Override
-                public void onMessage(MessageContext message) {
-                    try {
-                        long latency = System.currentTimeMillis() - message.getUpdatedTime().getTime();
-                        bw.write(message.getMessageId() + " :" + latency);
-                        bw.newLine();
-                        bw.flush();
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-                    if (message.getType().equals(MessageType.EXPERIMENT)){
-                        try {
-                            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
-                                       " for Gateway " + event.getGatewayId());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }else if (message.getType().equals(MessageType.PROCESS)){
+    }
+
+    private static MessageHandler getMessageHandler(final BufferedWriter bw) {
+        return message -> {
+            try {
+                long latency = System.currentTimeMillis() - message.getUpdatedTime().getTime();
+                bw.write(message.getMessageId() + " :" + latency);
+                bw.newLine();
+                bw.flush();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            if (message.getType().equals(MessageType.EXPERIMENT)) {
+                try {
+                    ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+                            " for Gateway " + event.getGatewayId());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            } else if (message.getType().equals(MessageType.PROCESS)) {
                         /*try {
                             WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent();
                             TBase messageEvent = message.getEvent();
@@ -135,93 +112,132 @@ public class RabbitMQListener {
                         } catch (TException e) {
                             logger.error(e.getMessage(), e);
                         }*/
-                    }else if (message.getType().equals(MessageType.TASK)){
-                        try {
-                            TaskStatusChangeEvent event = new TaskStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
-                                    " for Gateway " + event.getTaskIdentity().getGatewayId());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }else if (message.getType().equals(MessageType.JOB)){
-                        try {
-                            JobStatusChangeEvent event = new JobStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
-                                    " for Gateway " + event.getJobIdentity().getGatewayId());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }
+            } else if (message.getType().equals(MessageType.TASK)) {
+                try {
+                    TaskStatusChangeEvent event = new TaskStatusChangeEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+                            " for Gateway " + event.getTaskIdentity().getGatewayId());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e);
                 }
-            });
-        } catch (ApplicationSettingsException e) {
-            logger.error("Error reading airavata server properties", e);
-        }catch (Exception e) {
-           logger.error(e.getMessage(), e);
-        }
+            } else if (message.getType().equals(MessageType.JOB)) {
+                try {
+                    JobStatusChangeEvent event = new JobStatusChangeEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+                            " for Gateway " + event.getJobIdentity().getGatewayId());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        };
 
     }
 
+    private static List<String> getRoutingKeys(LEVEL level) {
+        List<String> routingKeys = new ArrayList<String>();
+        switch (level) {
+            case ALL:
+                routingKeys.add("*");
+                routingKeys.add("*.*");
+                routingKeys.add("*.*.*");
+                routingKeys.add("*.*.*.*");
+                routingKeys.add("*.*.*.*.*");
+                break;
+            case GATEWAY:
+                routingKeys.add(gatewayId);
+                routingKeys.add(gatewayId + ".*");
+                routingKeys.add(gatewayId + ".*.*");
+                routingKeys.add(gatewayId + ".*.*.*");
+                routingKeys.add(gatewayId + ".*.*.*.*");
+                break;
+            case EXPERIMENT:
+                routingKeys.add(gatewayId);
+                routingKeys.add(gatewayId + "." + experimentId);
+                routingKeys.add(gatewayId + "." + experimentId + ".*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*.*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*.*.*");
+                break;
+            case JOB:
+                routingKeys.add(gatewayId);
+                routingKeys.add(gatewayId + "." + experimentId);
+                routingKeys.add(gatewayId + "." + experimentId + ".*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*.*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*." + jobId);
+                break;
+            default:
+                break;
+        }
+        return routingKeys;
+    }
+
     public static void parseArguments(String[] args) {
-        try{
+        try {
             Options options = new Options();
 
-            options.addOption("gId", true , "Gateway ID");
+            options.addOption("gId", true, "Gateway ID");
             options.addOption("eId", true, "Experiment ID");
             options.addOption("jId", true, "Job ID");
             options.addOption("a", false, "All Notifications");
 
             CommandLineParser parser = new PosixParser();
-            CommandLine cmd = parser.parse( options, args);
-            if (cmd.getOptions() == null || cmd.getOptions().length == 0){
+            CommandLine cmd = parser.parse(options, args);
+            if (cmd.getOptions() == null || cmd.getOptions().length == 0) {
                 logger.info("You have not specified any options. We assume you need to listen to all the messages...");
-                allMessages = true;
+                level = LEVEL.ALL;
                 gatewayId = "*";
             }
-            if (cmd.hasOption("a")){
+            if (cmd.hasOption("a")) {
                 logger.info("Listening to all the messages...");
-                allMessages = true;
+                level = LEVEL.ALL;
                 gatewayId = "*";
-            }else {
+            } else {
                 gatewayId = cmd.getOptionValue("gId");
-                if (gatewayId == null){
+                if (gatewayId == null) {
                     gatewayId = "*";
                     logger.info("You have not specified a gateway id. We assume you need to listen to all the messages...");
                 } else {
-                    gatewayLevelMessages = true;
+                    level = LEVEL.GATEWAY;
                 }
                 experimentId = cmd.getOptionValue("eId");
-                if (experimentId == null && !gatewayId.equals("*")){
+                if (experimentId == null && !gatewayId.equals("*")) {
                     experimentId = "*";
                     logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId);
                 } else if (experimentId == null && gatewayId.equals("*")) {
                     experimentId = "*";
                     logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages...");
-                }else {
-                    experimentLevelMessages = true;
+                } else {
+                    level = LEVEL.EXPERIMENT;
                 }
                 jobId = cmd.getOptionValue("jId");
-                if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){
+                if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")) {
                     jobId = "*";
                     logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId
-                            + " with experiment id : " + experimentId );
+                            + " with experiment id : " + experimentId);
                 } else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) {
                     jobId = "*";
                     logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages...");
-                }else {
-                    jobLevelMessages = true;
+                } else {
+                    level = LEVEL.JOB;
                 }
             }
         } catch (ParseException e) {
-            logger.error("Error while reading command line parameters" , e);
+            logger.error("Error while reading command line parameters", e);
         }
     }
+
+    private enum LEVEL {
+        ALL,
+        GATEWAY,
+        EXPERIMENT,
+        JOB;
+    }
 }
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
deleted file mode 100644
index eb557ac..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core;
-
-import org.apache.airavata.common.exception.AiravataException;
-
-/**
- * This is the basic consumer
- */
-public interface Consumer {
-    /**
-     * Start listening for messages, The binding properties are specified in the handler.
-     * Returns and unique id to this Consumer. This id can be used to stop the listening
-     * @param handler
-     * @return
-     * @throws AiravataException
-     */
-    public String listen(MessageHandler handler) throws AiravataException;
-
-    public void stopListen(final String id) throws AiravataException;
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index c5f8b3d..23646da 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -21,10 +21,8 @@
 
 package org.apache.airavata.messaging.core;
 
-import java.util.Map;
-
+@FunctionalInterface
 public interface MessageHandler {
-    Map<String, Object> getProperties();
 
     void onMessage(MessageContext message);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index 7fb77a9..a64d18d 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -21,10 +21,11 @@
 
 package org.apache.airavata.messaging.core;
 
+@Deprecated
 public abstract class MessagingConstants {
     public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
     public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
-    public static final String RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME = "rabbitmq.task.launch.exchange.name";
+    public static final String RABBITMQ_TASK_EXCHANGE_NAME = "rabbitmq.task.exchange.name";
 
     public static final String RABBIT_ROUTING_KEY = "routingKey";
     public static final String RABBIT_QUEUE= "queue";

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
deleted file mode 100644
index cbf41c1..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core;
-
-public class Metadata {
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index dea8b00..b8b586c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.AiravataException;
 /**
  * This is the basic publisher interface.
  */
+@FunctionalInterface
 public interface Publisher {
 
     /**

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 89fe7ce..daa9886 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -22,10 +22,8 @@
 package org.apache.airavata.messaging.core;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.thrift.TBase;
@@ -34,9 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 
 public class TestClient {
@@ -47,36 +43,10 @@ public class TestClient {
 
     public static void main(String[] args) {
         try {
-            String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
-            final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-            consumer.listen(new MessageHandler() {
-                @Override
-                public Map<String, Object> getProperties() {
-                    Map<String, Object> props = new HashMap<String, Object>();
-                    List<String> routingKeys = new ArrayList<String>();
-                    routingKeys.add(experimentId);
-                    routingKeys.add(experimentId + ".*");
-                    props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-                    return props;
-                }
-
-                @Override
-                public void onMessage(MessageContext message) {
-                    if (message.getType().equals(MessageType.EXPERIMENT)){
-                        try {
-                            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }
-                }
-            });
+            List<String> routingKeys = new ArrayList<>();
+            routingKeys.add(experimentId);
+            routingKeys.add(experimentId + ".*");
+            MessagingFactory.getSubscriber(getMessageHandler(),routingKeys,  Subscriber.Type.STATUS);
         } catch (ApplicationSettingsException e) {
             logger.error("Error reading airavata server properties", e);
         }catch (Exception e) {
@@ -84,4 +54,22 @@ public class TestClient {
         }
 
     }
+
+
+    private static MessageHandler getMessageHandler(){
+        return message -> {
+                if (message.getType().equals(MessageType.EXPERIMENT)){
+                    try {
+                        ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+                        TBase messageEvent = message.getEvent();
+                        byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                        ThriftUtils.createThriftFromBytes(bytes, event);
+                        System.out.println(" Message Received with message id '" + message.getMessageId()
+                                + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString());
+                    } catch (TException e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            };
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
index 855ae27..456ca07 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -1,288 +1,288 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQProcessLaunchConsumer {
-    private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
-    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
-    private String taskLaunchExchangeName;
-    private String url;
-    private Connection connection;
-    private Channel channel;
-    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-    private boolean durableQueue;
-    private MessageHandler messageHandler;
-    private int prefetchCount;
-
-
-    public RabbitMQProcessLaunchConsumer() throws AiravataException {
-        try {
-            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
-            taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
-            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
-            createConnection();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-    }
-
-    public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
-        this.taskLaunchExchangeName = exchangeName;
-        this.url = brokerUrl;
-
-        createConnection();
-    }
-
-    private void createConnection() throws AiravataException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connectionFactory.setAutomaticRecoveryEnabled(true);
-            connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
-
-            channel = connection.createChannel();
-            channel.basicQos(prefetchCount);
-
-//            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void reconnect() throws AiravataException{
-        if(messageHandler!=null) {
-            try {
-                listen(messageHandler);
-            } catch (AiravataException e) {
-                String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-                log.error(msg);
-                throw new AiravataException(msg, e);
-
-            }
-        }
-    }
-    public String listen(final MessageHandler handler) throws AiravataException {
-        try {
-            messageHandler = handler;
-            Map<String, Object> props = handler.getProperties();
-            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routing == null) {
-                throw new IllegalArgumentException("The routing key must be present");
-            }
-            List<String> keys = new ArrayList<String>();
-            if (routing instanceof List) {
-                for (Object o : (List)routing) {
-                    keys.add(o.toString());
-                }
-            } else if (routing instanceof String) {
-                keys.add((String) routing);
-            }
-
-            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
-            if (queueName == null) {
-                if (!channel.isOpen()) {
-                    channel = connection.createChannel();
-                    channel.basicQos(prefetchCount);
-//                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-                }
-                queueName = channel.queueDeclare().getQueue();
-            } else {
-
-                channel.queueDeclare(queueName, durableQueue, false, false, null);
-            }
-
-            final String id = getId(keys, queueName);
-            if (queueDetailsMap.containsKey(id)) {
-                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
-                        "cannot define the same subscriber twice");
-            }
-
-            if (consumerTag == null) {
-                consumerTag = "default";
-            }
-
-            // bind all the routing keys
-//            for (String routingKey : keys) {
-//                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.messaging.core.impl;
+//
+//import com.rabbitmq.client.*;
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.AiravataUtils;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessageHandler;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TBase;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//public class RabbitMQProcessLaunchConsumer {
+//    private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
+//    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class);
+//
+//    private String taskLaunchExchangeName;
+//    private String url;
+//    private Connection connection;
+//    private Channel channel;
+//    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+//    private boolean durableQueue;
+//    private MessageHandler messageHandler;
+//    private int prefetchCount;
+//
+//
+//    public RabbitMQProcessLaunchConsumer() throws AiravataException {
+//        try {
+//            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+//            durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
+//            taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+//            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
+//            createConnection();
+//        } catch (ApplicationSettingsException e) {
+//            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+//            log.error(message, e);
+//            throw new AiravataException(message, e);
+//        }
+//    }
+//
+//    public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+//        this.taskLaunchExchangeName = exchangeName;
+//        this.url = brokerUrl;
+//
+//        createConnection();
+//    }
+//
+//    private void createConnection() throws AiravataException {
+//        try {
+//            ConnectionFactory connectionFactory = new ConnectionFactory();
+//            connectionFactory.setUri(url);
+//            connectionFactory.setAutomaticRecoveryEnabled(true);
+//            connection = connectionFactory.newConnection();
+//            connection.addShutdownListener(new ShutdownListener() {
+//                public void shutdownCompleted(ShutdownSignalException cause) {
+//                }
+//            });
+//            log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
+//
+//            channel = connection.createChannel();
+//            channel.basicQos(prefetchCount);
+//
+////            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+//
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public void reconnect() throws AiravataException{
+//        if(messageHandler!=null) {
+//            try {
+//                listen(messageHandler);
+//            } catch (AiravataException e) {
+//                String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+//                log.error(msg);
+//                throw new AiravataException(msg, e);
+//
 //            }
-            // autoAck=false, we will ack after task is done
-            channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag,
-                                           Envelope envelope,
-                                           AMQP.BasicProperties properties,
-                                           byte[] body) {
-                    Message message = new Message();
-
-                    try {
-                        ThriftUtils.createThriftFromBytes(body, message);
-                        TBase event = null;
-                        String gatewayId = null;
-                        long deliveryTag = envelope.getDeliveryTag();
-	                    if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
-		                    ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
-		                    ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
-		                    log.debug(" Message Received with message id '" + message.getMessageId()
-				                    + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
-				                    " " +
-				                    processSubmitEvent.getProcessId());
-		                    event = processSubmitEvent;
-		                    gatewayId = processSubmitEvent.getGatewayId();
-		                    MessageContext messageContext = new MessageContext(event, message.getMessageType(),
-				                    message.getMessageId(), gatewayId, deliveryTag);
-		                    messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-		                    messageContext.setIsRedeliver(envelope.isRedeliver());
-		                    handler.onMessage(messageContext);
-	                    } else {
-		                    log.error("{} message type is not handle in ProcessLaunch Consumer. Sending ack for " +
-				                    "delivery tag {} ", message.getMessageType().name(), deliveryTag);
-		                    sendAck(deliveryTag);
-	                    }
-                    } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
-                        log.warn(msg, e);
-                    }
-                }
-
-                @Override
-                public void handleCancel(String consumerTag) throws IOException {
-                    super.handleCancel(consumerTag);
-                    log.info("Consumer cancelled : " + consumerTag);
-                }
-            });
-
-            // save the name for deleting the queue
-            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
-            return id;
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void stopListen(final String id) throws AiravataException {
-        QueueDetails details = queueDetailsMap.get(id);
-        if (details != null) {
-            try {
-                for (String key : details.getRoutingKeys()) {
-                    channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
-                }
-            } catch (IOException e) {
-                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
-                log.debug(msg);
-            }
-        }
-    }
-
-    /**
-     * Private class for holding some information about the consumers registered
-     */
-    private class QueueDetails {
-        String queueName;
-
-        List<String> routingKeys;
-
-        private QueueDetails(String queueName, List<String> routingKeys) {
-            this.queueName = queueName;
-            this.routingKeys = routingKeys;
-        }
-
-        public String getQueueName() {
-            return queueName;
-        }
-
-        public List<String> getRoutingKeys() {
-            return routingKeys;
-        }
-    }
-
-    private String getId(List<String> routingKeys, String queueName) {
-        String id = "";
-        for (String key : routingKeys) {
-            id = id + "_" + key;
-        }
-        return id + "_" + queueName;
-    }
-
-    public void close() {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (IOException ignore) {
-            }
-        }
-    }
-    public boolean isOpen(){
-        if(connection!=null){
-            return connection.isOpen();
-        }
-        return false;
-    }
-
-    public void sendAck(long deliveryTag){
-        try {
-            if (channel.isOpen()){
-                channel.basicAck(deliveryTag,false);
-            }else {
-                channel = connection.createChannel();
-                channel.basicQos(prefetchCount);
-                channel.basicAck(deliveryTag, false);
-            }
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-}
+//        }
+//    }
+//    public String listen(final MessageHandler handler) throws AiravataException {
+//        try {
+//            messageHandler = handler;
+//            Map<String, Object> props = handler.getProperties();
+//            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+//            if (routing == null) {
+//                throw new IllegalArgumentException("The routing key must be present");
+//            }
+//            List<String> keys = new ArrayList<String>();
+//            if (routing instanceof List) {
+//                for (Object o : (List)routing) {
+//                    keys.add(o.toString());
+//                }
+//            } else if (routing instanceof String) {
+//                keys.add((String) routing);
+//            }
+//
+//            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+//            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+//            if (queueName == null) {
+//                if (!channel.isOpen()) {
+//                    channel = connection.createChannel();
+//                    channel.basicQos(prefetchCount);
+////                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+//                }
+//                queueName = channel.queueDeclare().getQueue();
+//            } else {
+//
+//                channel.queueDeclare(queueName, durableQueue, false, false, null);
+//            }
+//
+//            final String id = getId(keys, queueName);
+//            if (queueDetailsMap.containsKey(id)) {
+//                throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
+//                        "cannot define the same subscriber twice");
+//            }
+//
+//            if (consumerTag == null) {
+//                consumerTag = "default";
+//            }
+//
+//            // bind all the routing keys
+////            for (String routingKey : keys) {
+////                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+////            }
+//            // autoAck=false, we will ack after task is done
+//            channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
+//                @Override
+//                public void handleDelivery(String consumerTag,
+//                                           Envelope envelope,
+//                                           AMQP.BasicProperties properties,
+//                                           byte[] body) {
+//                    Message message = new Message();
+//
+//                    try {
+//                        ThriftUtils.createThriftFromBytes(body, message);
+//                        TBase event = null;
+//                        String gatewayId = null;
+//                        long deliveryTag = envelope.getDeliveryTag();
+//	                    if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+//		                    ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+//		                    ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+//		                    log.debug(" Message Received with message id '" + message.getMessageId()
+//				                    + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
+//				                    " " +
+//				                    processSubmitEvent.getProcessId());
+//		                    event = processSubmitEvent;
+//		                    gatewayId = processSubmitEvent.getGatewayId();
+//		                    MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+//				                    message.getMessageId(), gatewayId, deliveryTag);
+//		                    messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+//		                    messageContext.setIsRedeliver(envelope.isRedeliver());
+//		                    handler.onMessage(messageContext);
+//	                    } else {
+//		                    log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
+//				                    "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+//		                    sendAck(deliveryTag);
+//	                    }
+//                    } catch (TException e) {
+//                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+//                        log.warn(msg, e);
+//                    }
+//                }
+//
+//                @Override
+//                public void handleCancel(String consumerTag) throws IOException {
+//                    super.handleCancel(consumerTag);
+//                    log.info("Subscriber cancelled : " + consumerTag);
+//                }
+//            });
+//
+//            // save the name for deleting the queue
+//            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+//            return id;
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public void stopListen(final String id) throws AiravataException {
+//        QueueDetails details = queueDetailsMap.get(id);
+//        if (details != null) {
+//            try {
+//                for (String key : details.getRoutingKeys()) {
+//                    channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
+//                }
+//            } catch (IOException e) {
+//                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
+//                log.debug(msg);
+//            }
+//        }
+//    }
+//
+//    /**
+//     * Private class for holding some information about the consumers registered
+//     */
+//    private class QueueDetails {
+//        String queueName;
+//
+//        List<String> routingKeys;
+//
+//        private QueueDetails(String queueName, List<String> routingKeys) {
+//            this.queueName = queueName;
+//            this.routingKeys = routingKeys;
+//        }
+//
+//        public String getQueueName() {
+//            return queueName;
+//        }
+//
+//        public List<String> getRoutingKeys() {
+//            return routingKeys;
+//        }
+//    }
+//
+//    private String getId(List<String> routingKeys, String queueName) {
+//        String id = "";
+//        for (String key : routingKeys) {
+//            id = id + "_" + key;
+//        }
+//        return id + "_" + queueName;
+//    }
+//
+//    public void close() {
+//        if (connection != null) {
+//            try {
+//                connection.close();
+//            } catch (IOException ignore) {
+//            }
+//        }
+//    }
+//    public boolean isOpen(){
+//        if(connection!=null){
+//            return connection.isOpen();
+//        }
+//        return false;
+//    }
+//
+//    public void sendAck(long deliveryTag){
+//        try {
+//            if (channel.isOpen()){
+//                channel.basicAck(deliveryTag,false);
+//            }else {
+//                channel = connection.createChannel();
+//                channel.basicQos(prefetchCount);
+//                channel.basicAck(deliveryTag, false);
+//            }
+//        } catch (IOException e) {
+//            logger.error(e.getMessage(), e);
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
index e488f26..5cf960e 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
@@ -42,7 +42,7 @@ public class RabbitMQProcessLaunchPublisher implements Publisher{
         String brokerUrl;
         try {
             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            launchTask = ServerSettings.getLaunchQueueName();
+            launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName();
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
             log.error(message, e);