You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/09 22:19:40 UTC

[2/4] airavata git commit: Refactored messaging module to remove duplicate code and support multiple subscribers

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
deleted file mode 100644
index 561cde2..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.Consumer;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQStatusConsumer implements Consumer {
-	public static final String EXCHANGE_TYPE = "topic";
-	private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
-    private String exchangeName;
-    private String url;
-    private Connection connection;
-    private Channel channel;
-    private int prefetchCount;
-    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-
-    public RabbitMQStatusConsumer() throws AiravataException {
-        try {
-            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
-            createConnection();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-    }
-
-    public RabbitMQStatusConsumer(String brokerUrl, String exchangeName) throws AiravataException {
-        this.exchangeName = exchangeName;
-        this.url = brokerUrl;
-
-        createConnection();
-    }
-
-    private void createConnection() throws AiravataException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connectionFactory.setAutomaticRecoveryEnabled(true);
-            connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-
-            channel = connection.createChannel();
-            channel.basicQos(prefetchCount);
-            channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false);
-
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + exchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public String listen(final MessageHandler handler) throws AiravataException {
-        try {
-            Map<String, Object> props = handler.getProperties();
-            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routing == null) {
-                throw new IllegalArgumentException("The routing key must be present");
-            }
-
-            List<String> keys = new ArrayList<String>();
-            if (routing instanceof List) {
-                for (Object o : (List)routing) {
-                    keys.add(o.toString());
-                }
-            } else if (routing instanceof String) {
-                keys.add((String) routing);
-            }
-
-            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
-            if (queueName == null) {
-                if (!channel.isOpen()) {
-                    channel = connection.createChannel();
-                    channel.exchangeDeclare(exchangeName, "topic", false);
-                }
-                queueName = channel.queueDeclare().getQueue();
-            } else {
-                channel.queueDeclare(queueName, true, false, false, null);
-            }
-
-            final String id = getId(keys, queueName);
-            if (queueDetailsMap.containsKey(id)) {
-                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
-                        "cannot define the same subscriber twice");
-            }
-
-            if (consumerTag == null) {
-                consumerTag = "default";
-            }
-
-            // bind all the routing keys
-            for (String routingKey : keys) {
-                channel.queueBind(queueName, exchangeName, routingKey);
-            }
-
-            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag,
-                                           Envelope envelope,
-                                           AMQP.BasicProperties properties,
-                                           byte[] body) {
-                    Message message = new Message();
-
-                    try {
-                        ThriftUtils.createThriftFromBytes(body, message);
-                        TBase event = null;
-                        String gatewayId = null;
-
-                        if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
-                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    experimentStatusChangeEvent.getState());
-                            event = experimentStatusChangeEvent;
-                            gatewayId = experimentStatusChangeEvent.getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.PROCESS)) {
-	                        ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
-	                        ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
-	                        log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
-			                        "message type " + message.getMessageType() + " with status " +
-			                        processStatusChangeEvent.getState());
-	                        event = processStatusChangeEvent;
-	                        gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.TASK)) {
-                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    taskStatusChangeEvent.getState());
-                            event = taskStatusChangeEvent;
-                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
-                        }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
-                            TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
-                            event = taskOutputChangeEvent;
-                            gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.JOB)) {
-                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    jobStatusChangeEvent.getState());
-                            event = jobStatusChangeEvent;
-                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
-                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
-                                    taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
-                            event = taskSubmitEvent;
-                            gatewayId = taskSubmitEvent.getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
-                            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
-                                    taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
-                            event = taskTerminateEvent;
-                            gatewayId = null;
-                        }
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
-                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-	                    messageContext.setIsRedeliver(envelope.isRedeliver());
-                        handler.onMessage(messageContext);
-                    } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
-                        log.warn(msg, e);
-                    }
-                }
-            });
-            // save the name for deleting the queue
-            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
-            return id;
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + exchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void stopListen(final String id) throws AiravataException {
-        QueueDetails details = queueDetailsMap.get(id);
-        if (details != null) {
-            try {
-                for (String key : details.getRoutingKeys()) {
-                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
-                }
-                channel.queueDelete(details.getQueueName(), true, true);
-            } catch (IOException e) {
-                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
-                log.debug(msg);
-            }
-        }
-    }
-
-    /**
-     * Private class for holding some information about the consumers registered
-     */
-    private class QueueDetails {
-        String queueName;
-
-        List<String> routingKeys;
-
-        private QueueDetails(String queueName, List<String> routingKeys) {
-            this.queueName = queueName;
-            this.routingKeys = routingKeys;
-        }
-
-        public String getQueueName() {
-            return queueName;
-        }
-
-        public List<String> getRoutingKeys() {
-            return routingKeys;
-        }
-    }
-
-    private String getId(List<String> routingKeys, String queueName) {
-        String id = "";
-        for (String key : routingKeys) {
-            id = id + "_" + key;
-        }
-        return id + "_" + queueName;
-    }
-
-    public void close() {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (IOException ignore) {
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 60cb7a0..f5c4d2a 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -29,8 +29,12 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
-import org.apache.airavata.messaging.core.*;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -59,7 +63,14 @@ import org.apache.airavata.orchestrator.util.OrchestratorUtils;
 import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractResource;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ComputeResource;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
+import org.apache.airavata.registry.cpi.ReplicaCatalogException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -72,7 +83,12 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -83,7 +99,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private String airavataUserName;
 	private String gatewayName;
 	private Publisher publisher;
-	private RabbitMQStatusConsumer statusConsumer;
+	private Subscriber statusSubscribe;
 	private CuratorFramework curatorClient;
 
     /**
@@ -110,10 +126,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 			appCatalog = RegistryFactory.getAppCatalog();
 			orchestrator.initialize();
 			orchestrator.getOrchestratorContext().setPublisher(this.publisher);
-			String brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-			String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-			statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-			statusConsumer.listen(new ProcessStatusHandler());
+			List<String> routingKeys = new ArrayList<>();
+//			routingKeys.add("*"); // listen for gateway level messages
+//			routingKeys.add("*.*"); // listen for gateway/experiment level messages
+			routingKeys.add("*.*.*"); // listen for gateway/experiment/process level messages
+			statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Subscriber.Type.STATUS);
 			startCurator();
 		} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) {
 			log.error(e.getMessage(), e);
@@ -481,18 +498,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
     }
 
 	private class ProcessStatusHandler implements MessageHandler {
-
-		@Override
-		public Map<String, Object> getProperties() {
-			Map<String, Object> props = new HashMap<>();
-			List<String> routingKeys = new ArrayList<>();
-//			routingKeys.add("*"); // listen for gateway level messages
-//			routingKeys.add("*.*"); // listen for gateway/experiment level messages
-			routingKeys.add("*.*.*"); // listern for gateway/experiment/process level messages
-			props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-			return props;
-		}
-
 		/**
 		 * This method only handle MessageType.PROCESS type messages.
 		 * @param message

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
----------------------------------------------------------------------
diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
index 49af1ce..fa4c3de 100644
--- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
+++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
@@ -24,9 +24,8 @@ package org.apache.airavata.testsuite.multitenantedairavata;
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
@@ -56,7 +55,11 @@ import java.io.File;
 import java.io.PrintWriter;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class ExperimentExecution {
     private Airavata.Client airavata;
@@ -92,7 +95,7 @@ public class ExperimentExecution {
         String resultFileName = resultFileLocation + getResultFileName();
 
         File resultFolder = new File(resultFileLocation);
-        if (!resultFolder.exists()){
+        if (!resultFolder.exists()) {
             resultFolder.mkdir();
         }
         File resultFile = new File(resultFileName);
@@ -109,11 +112,11 @@ public class ExperimentExecution {
         this.resultWriter = resultWriter;
     }
 
-    protected Map<String, Map<String, String>> getApplicationMap (Map<String, String> tokenMap) throws  Exception{
+    protected Map<String, Map<String, String>> getApplicationMap(Map<String, String> tokenMap) throws Exception {
         appInterfaceMap = new HashMap<String, Map<String, String>>();
         try {
-            if (tokenMap != null && !tokenMap.isEmpty()){
-                for (String gatewayId : tokenMap.keySet()){
+            if (tokenMap != null && !tokenMap.isEmpty()) {
+                for (String gatewayId : tokenMap.keySet()) {
                     Map<String, String> allApplicationInterfaceNames = airavata.getAllApplicationInterfaceNames(authzToken, gatewayId);
                     appInterfaceMap.put(gatewayId, allApplicationInterfaceNames);
                 }
@@ -134,19 +137,19 @@ public class ExperimentExecution {
         return appInterfaceMap;
     }
 
-    protected Map<String, List<Project>> getProjects (Map<String, String> tokenMap) throws Exception{
+    protected Map<String, List<Project>> getProjects(Map<String, String> tokenMap) throws Exception {
         projectsMap = new HashMap<String, List<Project>>();
         try {
-            if (tokenMap != null && !tokenMap.isEmpty()){
-                for (String gatewayId : tokenMap.keySet()){
+            if (tokenMap != null && !tokenMap.isEmpty()) {
+                for (String gatewayId : tokenMap.keySet()) {
                     boolean isgatewayValid = true;
-                    for (String ovoidGateway : gatewaysToAvoid){
-                        if (gatewayId.equals(ovoidGateway)){
+                    for (String ovoidGateway : gatewaysToAvoid) {
+                        if (gatewayId.equals(ovoidGateway)) {
                             isgatewayValid = false;
                             break;
                         }
                     }
-                    if (isgatewayValid){
+                    if (isgatewayValid) {
                         List<Project> allUserProjects = airavata.getUserProjects(authzToken, gatewayId, testUser, 5, 0);
                         projectsMap.put(gatewayId, allUserProjects);
                     }
@@ -168,127 +171,119 @@ public class ExperimentExecution {
         return projectsMap;
     }
 
-    public void launchExperiments () throws Exception {
+    public void launchExperiments() throws Exception {
         try {
-            for (String expId : experimentsWithTokens.keySet()){
+            for (String expId : experimentsWithTokens.keySet()) {
                 airavata.launchExperiment(authzToken, expId, experimentsWithTokens.get(expId));
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while launching experiment", e);
             throw new Exception("Error while launching experiment", e);
         }
     }
 
-    public void monitorExperiments () throws Exception {
+    public void monitorExperiments() throws Exception {
 
         String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT);
         System.out.println("broker url " + brokerUrl);
         final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT);
-        RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-
-        consumer.listen(new MessageHandler() {
-            @Override
-            public Map<String, Object> getProperties() {
-                Map<String, Object> props = new HashMap<String, Object>();
-                List<String> routingKeys = new ArrayList<String>();
-                for (String expId : experimentsWithGateway.keySet()) {
-                    String gatewayId = experimentsWithGateway.get(expId);
-                    System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId);
-
-                    routingKeys.add(gatewayId);
-                    routingKeys.add(gatewayId + "." + expId);
-                    routingKeys.add(gatewayId + "." + expId + ".*");
-                    routingKeys.add(gatewayId + "." + expId + ".*.*");
-                    routingKeys.add(gatewayId + "." + expId + ".*.*.*");
-                }
-                props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-                return props;
-            }
+        Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Subscriber.Type.STATUS);
+    }
 
-            @Override
-            public void onMessage(MessageContext message) {
-
-                if (message.getType().equals(MessageType.EXPERIMENT)) {
-                    try {
-                        ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
-                        TBase messageEvent = message.getEvent();
-                        byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                        ThriftUtils.createThriftFromBytes(bytes, event);
-                        ExperimentState expState = event.getState();
-                        String expId = event.getExperimentId();
-                        String gatewayId = event.getGatewayId();
-
-                        if (expState.equals(ExperimentState.COMPLETED)) {
-                            resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
-                            resultWriter.println("=====================================================================");
-                            resultWriter.println("Status : " + ExperimentState.COMPLETED.toString());
-                            // check file transfers
-                            List<OutputDataObjectType> experimentOutputs = airavata.getExperimentOutputs(authzToken, expId);
-                            int i = 1;
-                            for (OutputDataObjectType output : experimentOutputs) {
-                                System.out.println("################ Experiment : " + expId + " COMPLETES ###################");
-                                System.out.println("Output " + i + " : " + output.getValue());
-                                resultWriter.println("Output " + i + " : " + output.getValue());
-                                i++;
-                            }
-                            resultWriter.println("End of Results for Experiment : " + expId );
-                            resultWriter.println("=====================================================================");
-                        } else if (expState.equals(ExperimentState.FAILED)) {
-                            resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
-                            resultWriter.println("=====================================================================");
-                            int j = 1;
-                            resultWriter.println("Status : " + ExperimentState.FAILED.toString());
-                            System.out.println("################ Experiment : " + expId + " FAILED ###################");
-                            ExperimentModel experiment = airavata.getExperiment(authzToken, expId);
-                            List<ErrorModel> errors = experiment.getErrors();
-                            if (errors != null && !errors.isEmpty()){
-                                for (ErrorModel errorDetails : errors) {
-                                    System.out.println(errorDetails.getActualErrorMessage());
-                                    resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage());
-                                    resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage());
-                                }
-                            }
+    private List<String> getRoutingKeys() {
+        List<String> routingKeys = new ArrayList<String>();
+        for (String expId : experimentsWithGateway.keySet()) {
+            String gatewayId = experimentsWithGateway.get(expId);
+            System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId);
+            routingKeys.add(gatewayId);
+            routingKeys.add(gatewayId + "." + expId);
+            routingKeys.add(gatewayId + "." + expId + ".*");
+            routingKeys.add(gatewayId + "." + expId + ".*.*");
+            routingKeys.add(gatewayId + "." + expId + ".*.*.*");
+        }
+        return routingKeys;
+    }
 
-                            resultWriter.println("End of Results for Experiment : " + expId );
-                            resultWriter.println("=====================================================================");
+    private void processMessage(MessageContext message) {
+        if (message.getType().equals(MessageType.EXPERIMENT)) {
+            try {
+                ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+                TBase messageEvent = message.getEvent();
+                byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                ThriftUtils.createThriftFromBytes(bytes, event);
+                ExperimentState expState = event.getState();
+                String expId = event.getExperimentId();
+                String gatewayId = event.getGatewayId();
+
+                if (expState.equals(ExperimentState.COMPLETED)) {
+                    resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
+                    resultWriter.println("=====================================================================");
+                    resultWriter.println("Status : " + ExperimentState.COMPLETED.toString());
+                    // check file transfers
+                    List<OutputDataObjectType> experimentOutputs = airavata.getExperimentOutputs(authzToken, expId);
+                    int i = 1;
+                    for (OutputDataObjectType output : experimentOutputs) {
+                        System.out.println("################ Experiment : " + expId + " COMPLETES ###################");
+                        System.out.println("Output " + i + " : " + output.getValue());
+                        resultWriter.println("Output " + i + " : " + output.getValue());
+                        i++;
+                    }
+                    resultWriter.println("End of Results for Experiment : " + expId);
+                    resultWriter.println("=====================================================================");
+                } else if (expState.equals(ExperimentState.FAILED)) {
+                    resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
+                    resultWriter.println("=====================================================================");
+                    int j = 1;
+                    resultWriter.println("Status : " + ExperimentState.FAILED.toString());
+                    System.out.println("################ Experiment : " + expId + " FAILED ###################");
+                    ExperimentModel experiment = airavata.getExperiment(authzToken, expId);
+                    List<ErrorModel> errors = experiment.getErrors();
+                    if (errors != null && !errors.isEmpty()) {
+                        for (ErrorModel errorDetails : errors) {
+                            System.out.println(errorDetails.getActualErrorMessage());
+                            resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage());
+                            resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage());
                         }
+                    }
+
+                    resultWriter.println("End of Results for Experiment : " + expId);
+                    resultWriter.println("=====================================================================");
+                }
 //                        System.out.println(" Experiment Id : '" + expId
 //                                + "' with state : '" + event.getState().toString() +
 //                                " for Gateway " + event.getGatewayId());
-                    } catch (TException e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                } else if (message.getType().equals(MessageType.JOB)) {
-                    try {
-                        JobStatusChangeEvent event = new JobStatusChangeEvent();
-                        TBase messageEvent = message.getEvent();
-                        byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                        ThriftUtils.createThriftFromBytes(bytes, event);
+            } catch (TException e) {
+                logger.error(e.getMessage(), e);
+            }
+        } else if (message.getType().equals(MessageType.JOB)) {
+            try {
+                JobStatusChangeEvent event = new JobStatusChangeEvent();
+                TBase messageEvent = message.getEvent();
+                byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                ThriftUtils.createThriftFromBytes(bytes, event);
 //                        System.out.println(" Job ID : '" + event.getJobIdentity().getJobId()
 //                                + "' with state : '" + event.getState().toString() +
 //                                " for Gateway " + event.getJobIdentity().getGatewayId());
 //                        resultWriter.println("Job Status : " + event.getState().toString());
 
-                    } catch (TException e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                }
-            resultWriter.flush();
+            } catch (TException e) {
+                logger.error(e.getMessage(), e);
             }
-        });
+        }
+        resultWriter.flush();
     }
 
-    private String getResultFileName (){
+    private String getResultFileName() {
         DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HHmmss");
         Calendar cal = Calendar.getInstance();
         return dateFormat.format(cal.getTime());
     }
 
-    public void createAmberWithErrorInputs (String gatewayId,
-                                            String token,
-                                            String projectId,
-                                            String hostId,
-                                            String appId) throws Exception {
+    public void createAmberWithErrorInputs(String gatewayId,
+                                           String token,
+                                           String projectId,
+                                           String hostId,
+                                           String appId) throws Exception {
         try {
             List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
             List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
@@ -352,11 +347,11 @@ public class ExperimentExecution {
         }
     }
 
-    public void createAmberWithErrorUserConfig (String gatewayId,
-                                            String token,
-                                            String projectId,
-                                            String hostId,
-                                            String appId) throws Exception {
+    public void createAmberWithErrorUserConfig(String gatewayId,
+                                               String token,
+                                               String projectId,
+                                               String hostId,
+                                               String appId) throws Exception {
         try {
 
             TestFrameworkProps.Error[] errors = properties.getErrors();
@@ -422,25 +417,25 @@ public class ExperimentExecution {
         }
     }
 
-    public void createAmberExperiment () throws Exception{
+    public void createAmberExperiment() throws Exception {
         try {
             TestFrameworkProps.Application[] applications = properties.getApplications();
             Map<String, String> userGivenAmberInputs = new HashMap<>();
-            for (TestFrameworkProps.Application application : applications){
-                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){
+            for (TestFrameworkProps.Application application : applications) {
+                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) {
                     userGivenAmberInputs = application.getInputs();
                 }
             }
 
-            for (String gatewayId : csTokens.keySet()){
+            for (String gatewayId : csTokens.keySet()) {
                 String token = csTokens.get(gatewayId);
                 Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
-                for (String appId : appsWithNames.keySet()){
+                for (String appId : appsWithNames.keySet()) {
                     String appName = appsWithNames.get(appId);
-                    if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){
+                    if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) {
                         List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
                         List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
-                        for (String inputName : userGivenAmberInputs.keySet()){
+                        for (String inputName : userGivenAmberInputs.keySet()) {
                             for (InputDataObjectType inputDataObjectType : applicationInputs) {
                                 if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) {
                                     inputDataObjectType.setValue(userGivenAmberInputs.get(inputName));
@@ -449,7 +444,7 @@ public class ExperimentExecution {
                         }
                         List<Project> projectsPerGateway = projectsMap.get(gatewayId);
                         String projectID = null;
-                        if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){
+                        if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
                             projectID = projectsPerGateway.get(0).getProjectID();
                         }
                         ExperimentModel simpleExperiment =
@@ -470,7 +465,7 @@ public class ExperimentExecution {
                                     experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                     experimentsWithTokens.put(experimentId, token);
                                     experimentsWithGateway.put(experimentId, gatewayId);
-                                }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
+                                } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
                                     ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
                                     UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                     userConfigurationData.setAiravataAutoSchedule(false);
@@ -498,33 +493,33 @@ public class ExperimentExecution {
                     }
                 }
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while creating AMBEr experiment", e);
             throw new Exception("Error while creating AMBER experiment", e);
         }
     }
 
-    public void createUltrascanExperiment () throws Exception{
+    public void createUltrascanExperiment() throws Exception {
         try {
             TestFrameworkProps.Application[] applications = properties.getApplications();
             int numberOfIterations = properties.getNumberOfIterations();
             Map<String, String> userGivenAmberInputs = new HashMap<>();
-            for (TestFrameworkProps.Application application : applications){
-                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){
+            for (TestFrameworkProps.Application application : applications) {
+                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) {
                     userGivenAmberInputs = application.getInputs();
                 }
             }
 
-            for (int i=0; i < numberOfIterations; i++){
-                for (String gatewayId : csTokens.keySet()){
+            for (int i = 0; i < numberOfIterations; i++) {
+                for (String gatewayId : csTokens.keySet()) {
                     String token = csTokens.get(gatewayId);
                     Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
-                    for (String appId : appsWithNames.keySet()){
+                    for (String appId : appsWithNames.keySet()) {
                         String appName = appsWithNames.get(appId);
-                        if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){
+                        if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) {
                             List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
                             List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
-                            for (String inputName : userGivenAmberInputs.keySet()){
+                            for (String inputName : userGivenAmberInputs.keySet()) {
                                 for (InputDataObjectType inputDataObjectType : applicationInputs) {
                                     if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) {
                                         inputDataObjectType.setValue(userGivenAmberInputs.get(inputName));
@@ -533,7 +528,7 @@ public class ExperimentExecution {
                             }
                             List<Project> projectsPerGateway = projectsMap.get(gatewayId);
                             String projectID = null;
-                            if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){
+                            if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
                                 projectID = projectsPerGateway.get(0).getProjectID();
                             }
                             ExperimentModel simpleExperiment =
@@ -554,7 +549,7 @@ public class ExperimentExecution {
                                         experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "batch", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -564,7 +559,7 @@ public class ExperimentExecution {
                                         experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -574,17 +569,17 @@ public class ExperimentExecution {
                                         experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "compute", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
                                         userConfigurationData.setOverrideManualScheduledParams(false);
                                         userConfigurationData.setComputationalResourceScheduling(scheduling);
                                         simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                        experimentId = airavata.createExperiment(authzToken,gatewayId, simpleExperiment);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -602,89 +597,89 @@ public class ExperimentExecution {
                 }
             }
 
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while creating Ultrascan experiment", e);
             throw new Exception("Error while creating Ultrascan experiment", e);
         }
     }
 
 
-    public void createEchoExperiment () throws Exception{
+    public void createEchoExperiment() throws Exception {
         try {
             for (String gatewayId : csTokens.keySet()) {
-                    boolean isgatewayValid = true;
-                    for (String ovoidGateway : gatewaysToAvoid){
-                        if (gatewayId.equals(ovoidGateway)){
-                            isgatewayValid = false;
-                            break;
-                        }
+                boolean isgatewayValid = true;
+                for (String ovoidGateway : gatewaysToAvoid) {
+                    if (gatewayId.equals(ovoidGateway)) {
+                        isgatewayValid = false;
+                        break;
                     }
-                    if (isgatewayValid) {
-                        String token = csTokens.get(gatewayId);
-                        Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
-                        for (String appId : appsWithNames.keySet()) {
-                            String appName = appsWithNames.get(appId);
-                            if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) {
-                                List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
-                                List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
-                                for (InputDataObjectType inputDataObjectType : applicationInputs) {
-                                    if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) {
-                                        inputDataObjectType.setValue("Hello World !!!");
-                                    }
+                }
+                if (isgatewayValid) {
+                    String token = csTokens.get(gatewayId);
+                    Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
+                    for (String appId : appsWithNames.keySet()) {
+                        String appName = appsWithNames.get(appId);
+                        if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) {
+                            List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
+                            List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
+                            for (InputDataObjectType inputDataObjectType : applicationInputs) {
+                                if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) {
+                                    inputDataObjectType.setValue("Hello World !!!");
                                 }
+                            }
 
-                                List<Project> projectsPerGateway = projectsMap.get(gatewayId);
-                                String projectID = null;
-                                if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
-                                    projectID = projectsPerGateway.get(0).getProjectID();
-                                }
-                                ExperimentModel simpleExperiment =
-                                        ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs);
-                                simpleExperiment.setExperimentOutputs(appOutputs);
-                                String experimentId;
-                                Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId);
-                                if (computeResources != null && computeResources.size() != 0) {
-                                    for (String id : computeResources.keySet()) {
-                                        String resourceName = computeResources.get(id);
-                                        if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) {
-                                            ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
-                                            UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
-                                            userConfigurationData.setAiravataAutoSchedule(false);
-                                            userConfigurationData.setOverrideManualScheduledParams(false);
-                                            userConfigurationData.setComputationalResourceScheduling(scheduling);
-                                            simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                            experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
-                                            experimentsWithTokens.put(experimentId, token);
-                                            experimentsWithGateway.put(experimentId, gatewayId);
-                                        } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
-                                            ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
-                                            UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
-                                            userConfigurationData.setAiravataAutoSchedule(false);
-                                            userConfigurationData.setOverrideManualScheduledParams(false);
-                                            userConfigurationData.setComputationalResourceScheduling(scheduling);
-                                            simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                            experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
-                                            experimentsWithTokens.put(experimentId, token);
-                                            experimentsWithGateway.put(experimentId, gatewayId);
-                                        } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) {
-                                            ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0);
-                                            UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
-                                            userConfigurationData.setAiravataAutoSchedule(false);
-                                            userConfigurationData.setOverrideManualScheduledParams(false);
-                                            userConfigurationData.setComputationalResourceScheduling(scheduling);
-                                            simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                            experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
-                                            experimentsWithTokens.put(experimentId, token);
-                                            experimentsWithGateway.put(experimentId, gatewayId);
-                                        }
+                            List<Project> projectsPerGateway = projectsMap.get(gatewayId);
+                            String projectID = null;
+                            if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
+                                projectID = projectsPerGateway.get(0).getProjectID();
+                            }
+                            ExperimentModel simpleExperiment =
+                                    ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs);
+                            simpleExperiment.setExperimentOutputs(appOutputs);
+                            String experimentId;
+                            Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId);
+                            if (computeResources != null && computeResources.size() != 0) {
+                                for (String id : computeResources.keySet()) {
+                                    String resourceName = computeResources.get(id);
+                                    if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) {
+                                        ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
+                                        UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+                                        userConfigurationData.setAiravataAutoSchedule(false);
+                                        userConfigurationData.setOverrideManualScheduledParams(false);
+                                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+                                        experimentsWithTokens.put(experimentId, token);
+                                        experimentsWithGateway.put(experimentId, gatewayId);
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
+                                        ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
+                                        UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+                                        userConfigurationData.setAiravataAutoSchedule(false);
+                                        userConfigurationData.setOverrideManualScheduledParams(false);
+                                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+                                        experimentsWithTokens.put(experimentId, token);
+                                        experimentsWithGateway.put(experimentId, gatewayId);
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) {
+                                        ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0);
+                                        UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+                                        userConfigurationData.setAiravataAutoSchedule(false);
+                                        userConfigurationData.setOverrideManualScheduledParams(false);
+                                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+                                        experimentsWithTokens.put(experimentId, token);
+                                        experimentsWithGateway.put(experimentId, gatewayId);
                                     }
                                 }
                             }
                         }
                     }
+                }
 
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while creating Echo experiment", e);
             throw new Exception("Error while creating Echo experiment", e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index 8339aea..a492ef2 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -24,16 +24,18 @@ package org.apache.airavata.workflow.core;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,16 +45,17 @@ import java.util.concurrent.Executors;
 public class WorkflowEnactmentService {
 
     private static WorkflowEnactmentService workflowEnactmentService;
-    private final RabbitMQStatusConsumer statusConsumer;
+    private final Subscriber statusSubscriber;
     private String consumerId;
     private ExecutorService executor;
     private Map<String,WorkflowInterpreter> workflowMap;
 
     private WorkflowEnactmentService () throws AiravataException {
         executor = Executors.newFixedThreadPool(getThreadPoolSize());
-        workflowMap = new ConcurrentHashMap<String, WorkflowInterpreter>();
-        statusConsumer = new RabbitMQStatusConsumer();
-        consumerId = statusConsumer.listen(new TaskMessageHandler());
+        workflowMap = new ConcurrentHashMap<>();
+        statusSubscriber = MessagingFactory.getSubscriber((message -> executor.execute(new StatusHandler(message))),
+                                                           getRoutingKeys(),
+                                                           Subscriber.Type.STATUS);
         // register the shutdown hook to un-bind status consumer.
         Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
     }
@@ -80,33 +83,20 @@ public class WorkflowEnactmentService {
 
     }
 
-    private int getThreadPoolSize() {
-        return ServerSettings.getEnactmentThreadPoolSize();
-    }
-
-    private class TaskMessageHandler implements MessageHandler {
-
-        @Override
-        public Map<String, Object> getProperties() {
-            Map<String, Object> props = new HashMap<String, Object>();
-            String gatewayId = "*";
-            String experimentId = "*";
-            List<String> routingKeys = new ArrayList<String>();
-            routingKeys.add(gatewayId);
-            routingKeys.add(gatewayId + "." + experimentId);
-            routingKeys.add(gatewayId + "." + experimentId+ ".*");
-            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-            return props;
-        }
-
-        @Override
-        public void onMessage(MessageContext msgCtx) {
-            StatusHandler statusHandler = new StatusHandler(msgCtx);
-            executor.execute(statusHandler);
-        }
 
+    public List<String> getRoutingKeys() {
+        String gatewayId = "*";
+        String experimentId = "*";
+        List<String> routingKeys = new ArrayList<String>();
+        routingKeys.add(gatewayId);
+        routingKeys.add(gatewayId + "." + experimentId);
+        routingKeys.add(gatewayId + "." + experimentId+ ".*");
+        routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+        return routingKeys;
+    }
 
+    private int getThreadPoolSize() {
+        return ServerSettings.getEnactmentThreadPoolSize();
     }
 
     private class StatusHandler implements Runnable{
@@ -169,7 +159,7 @@ public class WorkflowEnactmentService {
         public void run() {
             super.run();
             try {
-                statusConsumer.stopListen(consumerId);
+                statusSubscriber.stopListen(consumerId);
                 log.info("Successfully un-binded task status consumer");
             } catch (AiravataException e) {
                 log.error("Error while un-bind enactment status consumer", e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
index b42e7ac..ecfdeea 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
@@ -23,7 +23,6 @@ package org.apache.airavata.workflow.core;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.ComponentState;
 import org.apache.airavata.model.ComponentStatus;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -32,14 +31,27 @@ import org.apache.airavata.model.messaging.event.ProcessIdentifier;
 import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.WorkflowCatalog;
+import org.apache.airavata.registry.cpi.WorkflowCatalogException;
 import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.*;
+import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
+import org.apache.airavata.workflow.core.dag.nodes.InputNode;
+import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
 import org.apache.airavata.workflow.core.parser.WorkflowParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -64,7 +76,6 @@ class WorkflowInterpreter {
     private Registry registry;
     private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
     private RabbitMQProcessLaunchPublisher publisher;
-    private RabbitMQStatusConsumer statusConsumer;
     private String consumerId;
     private boolean continueWorkflow = true;