You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/09 22:19:39 UTC

[1/4] airavata git commit: Refactored messaging module to remove duplicate code and support multiple subscribers

Repository: airavata
Updated Branches:
  refs/heads/develop 4157065cc -> 20b3d251a


Refactored messaging module to remove duplicate code and support multiple subscribers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a6670f88
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a6670f88
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a6670f88

Branch: refs/heads/develop
Commit: a6670f884c8c4d2aaa5a1dff37fbe3f4ab447285
Parents: 0e14f97
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Tue Aug 9 18:17:33 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Tue Aug 9 18:17:33 2016 -0400

----------------------------------------------------------------------
 .../messaging/core/MessagingFactory.java        |  88 ++++++
 .../airavata/messaging/core/Subscriber.java     |  58 ++++
 .../messaging/core/SubscriberProperties.java    | 125 ++++++++
 .../messaging/core/impl/ExperimentConsumer.java |  42 +++
 .../messaging/core/impl/ProcessConsumer.java    | 114 ++++++++
 .../core/impl/RabbitMQStatusSubscriber.java     | 287 +++++++++++++++++++
 .../messaging/core/impl/RabbitMQSubscriber.java | 189 ++++++++++++
 .../messaging/core/impl/StatusConsumer.java     | 143 +++++++++
 8 files changed, 1046 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
new file mode 100644
index 0000000..ee68d0c
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.impl.ProcessConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQSubscriber;
+import org.apache.airavata.messaging.core.impl.StatusConsumer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagingFactory {
+
+    public static Subscriber getSubscriber(final MessageHandler messageHandler,List<String> routingKeys, Subscriber.Type type) throws AiravataException {
+        Subscriber subscriber = null;
+        SubscriberProperties sp = getSubscriberProperties();
+
+        switch (type) {
+            case EXPERIMENT_LAUNCH:
+                break;
+            case PROCESS_LAUNCH:
+                subscriber = getProcessSubscriber(sp);
+                subscriber.listen((connection ,channel) -> new ProcessConsumer(messageHandler, connection, channel),
+                        null,
+                        routingKeys);
+                break;
+            case STATUS:
+                subscriber = getStatusSubscriber(sp);
+                subscriber.listen((connection, channel) -> new StatusConsumer(messageHandler, connection, channel),
+                        null,
+                        routingKeys);
+                break;
+            default:
+                break;
+        }
+
+        return subscriber;
+    }
+
+    private static SubscriberProperties getSubscriberProperties() {
+        return new SubscriberProperties()
+                .setBrokerUrl(ServerSettings.RABBITMQ_BROKER_URL)
+                .setDurable(ServerSettings.getRabbitmqDurableQueue())
+                .setPrefetchCount(ServerSettings.getRabbitmqPrefetchCount())
+                .setAutoRecoveryEnable(true)
+                .setConsumerTag("default")
+                .setExchangeType(SubscriberProperties.EXCHANGE_TYPE.TOPIC);
+    }
+
+    private static RabbitMQSubscriber getStatusSubscriber(SubscriberProperties sp) throws AiravataException {
+        sp.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName())
+                .setAutoAck(true);
+        return new RabbitMQSubscriber(sp);
+    }
+
+
+    private static RabbitMQSubscriber getProcessSubscriber(SubscriberProperties sp) throws AiravataException {
+        sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName())
+                .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName())
+                .setAutoAck(false);
+        return new RabbitMQSubscriber(sp);
+    }
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
new file mode 100644
index 0000000..7952cb3
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
+import org.apache.airavata.common.exception.AiravataException;
+
+import javax.annotation.Nonnull;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * This is the basic consumer
+ */
+public interface Subscriber {
+    /**
+     * Start listening for messages, The binding properties are specified in the handler.
+     * Returns and unique id to this Subscriber. This id can be used to stop the listening
+     * @param supplier - return RabbitMQ Consumer
+     * @return string id
+     * @throws AiravataException
+     */
+    String listen(BiFunction<Connection, Channel, Consumer>  supplier,
+                  String queueName,
+                  List<String> routingKeys) throws AiravataException;
+
+    void stopListen(final String id) throws AiravataException;
+
+    void sendAck(long deliveryTag);
+
+    enum Type {
+        EXPERIMENT_LAUNCH,
+        PROCESS_LAUNCH,
+        STATUS
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
new file mode 100644
index 0000000..025e93b
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+public class SubscriberProperties {
+    private String brokerUrl;
+    private EXCHANGE_TYPE exchangeType;
+    private String exchangeName;
+    private int prefetchCount;
+    private boolean durable;
+    private String queueName;
+    private String consumerTag = "default";
+    private boolean autoRecoveryEnable;
+    private boolean autoAck;
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public SubscriberProperties setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+        return this;
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public SubscriberProperties setDurable(boolean durable) {
+        this.durable = durable;
+        return this;
+    }
+
+    public String getExchangeName() {
+        return exchangeName;
+    }
+
+    public SubscriberProperties setExchangeName(String exchangeName) {
+        this.exchangeName = exchangeName;
+        return this;
+    }
+
+    public int getPrefetchCount() {
+        return prefetchCount;
+    }
+
+    public SubscriberProperties setPrefetchCount(int prefetchCount) {
+        this.prefetchCount = prefetchCount;
+        return this;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public SubscriberProperties setQueueName(String queueName) {
+        this.queueName = queueName;
+        return this;
+    }
+
+    public String getConsumerTag() {
+        return consumerTag;
+    }
+
+    public SubscriberProperties setConsumerTag(String consumerTag) {
+        this.consumerTag = consumerTag;
+        return this;
+    }
+
+    public boolean isAutoRecoveryEnable() {
+        return autoRecoveryEnable;
+    }
+
+    public SubscriberProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) {
+        this.autoRecoveryEnable = autoRecoveryEnable;
+        return this;
+    }
+
+    public String getExchangeType() {
+        return exchangeType.type;
+    }
+
+    public SubscriberProperties setExchangeType(EXCHANGE_TYPE exchangeType) {
+        this.exchangeType = exchangeType;
+        return this;
+    }
+
+    public boolean isAutoAck() {
+        return autoAck;
+    }
+
+    public SubscriberProperties setAutoAck(boolean autoAck) {
+        this.autoAck = autoAck;
+        return this;
+    }
+
+    public enum EXCHANGE_TYPE{
+        TOPIC("topic"),
+        FANOUT("fanout");
+
+        private String type;
+
+        EXCHANGE_TYPE(String type) {
+            this.type = type;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
new file mode 100644
index 0000000..058b99e
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+
+import java.io.IOException;
+
+public class ExperimentConsumer extends QueueingConsumer {
+    public ExperimentConsumer(Channel ch) {
+        super(ch);
+    }
+
+
+    @Override
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties properties,
+                               byte[] body) throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
new file mode 100644
index 0000000..368c100
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ProcessConsumer extends QueueingConsumer{
+    private static final Logger log = LoggerFactory.getLogger(ProcessConsumer.class);
+
+    private MessageHandler handler;
+    private Channel channel;
+    private Connection connection;
+
+    public ProcessConsumer(MessageHandler messageHandler, Connection connection, Channel channel){
+        this(channel);
+        this.handler = messageHandler;
+        this.connection = connection;
+        this.channel = channel;
+    }
+
+
+    private ProcessConsumer(Channel ch) {
+        super(ch);
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties basicProperties,
+                               byte[] body) throws IOException {
+
+        Message message = new Message();
+
+        try {
+            ThriftUtils.createThriftFromBytes(body, message);
+            TBase event = null;
+            String gatewayId = null;
+            long deliveryTag = envelope.getDeliveryTag();
+            if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+                ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
+                        " " +
+                        processSubmitEvent.getProcessId());
+                event = processSubmitEvent;
+                gatewayId = processSubmitEvent.getGatewayId();
+                MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+                        message.getMessageId(), gatewayId, deliveryTag);
+                messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                messageContext.setIsRedeliver(envelope.isRedeliver());
+                handler.onMessage(messageContext);
+            } else {
+                log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
+                        "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+                sendAck(deliveryTag);
+            }
+        } catch (TException e) {
+            String msg = "Failed to de-serialize the thrift message, from routing keys:" + envelope.getRoutingKey();
+            log.warn(msg, e);
+        }
+
+    }
+
+    private void sendAck(long deliveryTag){
+        try {
+            if (channel.isOpen()){
+                channel.basicAck(deliveryTag,false);
+            }else {
+                channel = connection.createChannel();
+                channel.basicQos(ServerSettings.getRabbitmqPrefetchCount());
+                channel.basicAck(deliveryTag, false);
+            }
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java
new file mode 100644
index 0000000..62c48d5
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusSubscriber.java
@@ -0,0 +1,287 @@
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+// */
+//
+//package org.apache.airavata.messaging.core.impl;
+//
+//
+//import com.rabbitmq.client.*;
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.AiravataUtils;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.Subscriber;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessageHandler;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TBase;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import javax.annotation.Nonnull;
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//public class RabbitMQStatusSubscriber implements Subscriber {
+//	public static final String EXCHANGE_TYPE = "topic";
+//	private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class);
+//
+//    private String exchangeName;
+//    private String url;
+//    private Connection connection;
+//    private Channel channel;
+//    private int prefetchCount;
+//    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+//
+//    public RabbitMQStatusSubscriber() throws AiravataException {
+//        try {
+//            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+//            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+//            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
+//            createConnection();
+//        } catch (ApplicationSettingsException e) {
+//            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+//            log.error(message, e);
+//            throw new AiravataException(message, e);
+//        }
+//    }
+//
+//    public RabbitMQStatusSubscriber(String brokerUrl, String exchangeName) throws AiravataException {
+//        this.exchangeName = exchangeName;
+//        this.url = brokerUrl;
+//
+//        createConnection();
+//    }
+//
+//    private void createConnection() throws AiravataException {
+//        try {
+//            ConnectionFactory connectionFactory = new ConnectionFactory();
+//            connectionFactory.setUri(url);
+//            connectionFactory.setAutomaticRecoveryEnabled(true);
+//            connection = connectionFactory.newConnection();
+//            connection.addShutdownListener(new ShutdownListener() {
+//                public void shutdownCompleted(ShutdownSignalException cause) {
+//                }
+//            });
+//            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+//
+//            channel = connection.createChannel();
+//            channel.basicQos(prefetchCount);
+//            channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false);
+//
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + exchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public String listen(final MessageHandler handler) throws AiravataException {
+//        try {
+//            Map<String, Object> props = handler.getProperties();
+//            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+//            if (routing == null) {
+//                throw new IllegalArgumentException("The routing key must be present");
+//            }
+//
+//            List<String> keys = new ArrayList<String>();
+//            if (routing instanceof List) {
+//                for (Object o : (List)routing) {
+//                    keys.add(o.toString());
+//                }
+//            } else if (routing instanceof String) {
+//                keys.add((String) routing);
+//            }
+//
+//            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+//            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+//            if (queueName == null) {
+//                if (!channel.isOpen()) {
+//                    channel = connection.createChannel();
+//                    channel.exchangeDeclare(exchangeName, "topic", false);
+//                }
+//                queueName = channel.queueDeclare().getQueue();
+//            } else {
+//                channel.queueDeclare(queueName, true, false, false, null);
+//            }
+//
+//            final String id = getId(keys, queueName);
+//            if (queueDetailsMap.containsKey(id)) {
+//                throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
+//                        "cannot define the same subscriber twice");
+//            }
+//
+//            if (consumerTag == null) {
+//                consumerTag = "default";
+//            }
+//
+//            // bind all the routing keys
+//            for (String routingKey : keys) {
+//                channel.queueBind(queueName, exchangeName, routingKey);
+//            }
+//
+//            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+//                @Override
+//                public void handleDelivery(String consumerTag,
+//                                           Envelope envelope,
+//                                           AMQP.BasicProperties properties,
+//                                           byte[] body) {
+//                    Message message = new Message();
+//
+//                    try {
+//                        ThriftUtils.createThriftFromBytes(body, message);
+//                        TBase event = null;
+//                        String gatewayId = null;
+//
+//                        if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+//                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+//                                    experimentStatusChangeEvent.getState());
+//                            event = experimentStatusChangeEvent;
+//                            gatewayId = experimentStatusChangeEvent.getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.PROCESS)) {
+//	                        ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
+//	                        ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
+//	                        log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
+//			                        "message type " + message.getMessageType() + " with status " +
+//			                        processStatusChangeEvent.getState());
+//	                        event = processStatusChangeEvent;
+//	                        gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.TASK)) {
+//                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+//                                    taskStatusChangeEvent.getState());
+//                            event = taskStatusChangeEvent;
+//                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+//                        }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
+//                            TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
+//                            event = taskOutputChangeEvent;
+//                            gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.JOB)) {
+//                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
+//                                    jobStatusChangeEvent.getState());
+//                            event = jobStatusChangeEvent;
+//                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+//                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+//                                    taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+//                            event = taskSubmitEvent;
+//                            gatewayId = taskSubmitEvent.getGatewayId();
+//                        } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
+//                            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+//                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+//                            log.debug(" Message Received with message id '" + message.getMessageId()
+//                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+//                                    taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+//                            event = taskTerminateEvent;
+//                            gatewayId = null;
+//                        }
+//                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+//                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+//	                    messageContext.setIsRedeliver(envelope.isRedeliver());
+//                        handler.onMessage(messageContext);
+//                    } catch (TException e) {
+//                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+//                        log.warn(msg, e);
+//                    }
+//                }
+//            });
+//            // save the name for deleting the queue
+//            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+//            return id;
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + exchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public void stopListen(final String id) throws AiravataException {
+//        QueueDetails details = queueDetailsMap.get(id);
+//        if (details != null) {
+//            try {
+//                for (String key : details.getRoutingKeys()) {
+//                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
+//                }
+//                channel.queueDelete(details.getQueueName(), true, true);
+//            } catch (IOException e) {
+//                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
+//                log.debug(msg);
+//            }
+//        }
+//    }
+//
+//    /**
+//     * Private class for holding some information about the consumers registered
+//     */
+//    private class QueueDetails {
+//        String queueName;
+//
+//        List<String> routingKeys;
+//
+//        private QueueDetails(String queueName, List<String> routingKeys) {
+//            this.queueName = queueName;
+//            this.routingKeys = routingKeys;
+//        }
+//
+//        public String getQueueName() {
+//            return queueName;
+//        }
+//
+//        public List<String> getRoutingKeys() {
+//            return routingKeys;
+//        }
+//    }
+//
+//    private String getId(List<String> routingKeys, String queueName) {
+//        String id = "";
+//        for (String key : routingKeys) {
+//            id = id + "_" + key;
+//        }
+//        return id + "_" + queueName;
+//    }
+//
+//    public void close() {
+//        if (connection != null) {
+//            try {
+//                connection.close();
+//            } catch (IOException ignore) {
+//            }
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
new file mode 100644
index 0000000..188847f
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.SubscriberProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+
+public class RabbitMQSubscriber implements Subscriber {
+    private static final Logger log = LoggerFactory.getLogger(RabbitMQSubscriber.class);
+
+    private Connection connection;
+    private Channel channel;
+    private Map<String, QueueDetail> queueDetailMap = new HashMap<>();
+    private SubscriberProperties properties;
+
+    public RabbitMQSubscriber(SubscriberProperties properties) throws AiravataException {
+        this.properties = properties;
+        createConnection();
+    }
+
+    private void createConnection() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(properties.getBrokerUrl());
+            connectionFactory.setAutomaticRecoveryEnabled(properties.isAutoRecoveryEnable());
+            connection = connectionFactory.newConnection();
+            addShutdownListener();
+            log.info("connected to rabbitmq: " + connection + " for " + properties.getExchangeName());
+            channel = connection.createChannel();
+            channel.basicQos(properties.getPrefetchCount());
+            channel.exchangeDeclare(properties.getExchangeName(),
+                    properties.getExchangeType(),
+                    false);
+        } catch (Exception e) {
+            String msg = "could not open channel for exchange " + properties.getExchangeName();
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    @Override
+    public String listen(BiFunction<Connection, Channel, Consumer> supplier,
+                         String queueName,
+                         List<String> routingKeys) throws AiravataException {
+
+        try {
+            if (!channel.isOpen()) {
+                channel = connection.createChannel();
+                channel.exchangeDeclare(properties.getExchangeName(), properties.getExchangeType(), false);
+            }
+            if (queueName == null) {
+                queueName = channel.queueDeclare().getQueue();
+            } else {
+                channel.queueDeclare(queueName, true, false, false, null);
+            }
+            final String id = getId(routingKeys, queueName);
+            if (queueDetailMap.containsKey(id)) {
+                throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
+                        "cannot define the same subscriber twice");
+            }
+            // bind all the routing keys
+            for (String key : routingKeys) {
+                channel.queueBind(queueName, properties.getExchangeName(), key);
+            }
+
+            channel.basicConsume(queueName,
+                    properties.isAutoAck(),
+                    properties.getConsumerTag(),
+                    supplier.apply(connection, channel));
+
+            queueDetailMap.put(id, new QueueDetail(queueName, routingKeys));
+            return id;
+        } catch (IOException e) {
+            String msg = "could not open channel for exchange " + properties.getExchangeName();
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    @Override
+    public void stopListen(String id) throws AiravataException {
+        QueueDetail details = queueDetailMap.get(id);
+        if (details != null) {
+            try {
+                for (String key : details.getRoutingKeys()) {
+                    channel.queueUnbind(details.getQueueName(), properties.getExchangeName(), key);
+                }
+                channel.queueDelete(details.getQueueName(), true, true);
+            } catch (IOException e) {
+                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + properties.getExchangeName();
+                log.debug(msg);
+            }
+        }
+    }
+
+    @Override
+    public void sendAck(long deliveryTag) {
+        try {
+            if (channel.isOpen()){
+                channel.basicAck(deliveryTag,false);
+            }else {
+                channel = connection.createChannel();
+                channel.basicQos(properties.getPrefetchCount());
+                channel.basicAck(deliveryTag, false);
+            }
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+
+    private void addShutdownListener() {
+        connection.addShutdownListener(new ShutdownListener() {
+            public void shutdownCompleted(ShutdownSignalException cause) {
+            }
+        });
+    }
+
+
+    private String getId(List<String> routingKeys, String queueName) {
+        String id = "";
+        for (String key : routingKeys) {
+            id = id + "_" + key;
+        }
+        return id + "_" + queueName;
+    }
+
+    public void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException ignore) {
+            }
+        }
+    }
+
+
+    private class QueueDetail {
+        String queueName;
+        List<String> routingKeys;
+
+        private QueueDetail(String queueName, List<String> routingKeys) {
+            this.queueName = queueName;
+            this.routingKeys = routingKeys;
+        }
+
+        public String getQueueName() {
+            return queueName;
+        }
+
+        List<String> getRoutingKeys() {
+            return routingKeys;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a6670f88/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java
new file mode 100644
index 0000000..b5cae51
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/StatusConsumer.java
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class StatusConsumer extends DefaultConsumer {
+    private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class);
+
+    private MessageHandler handler;
+    private Connection connection;
+    private Channel channel;
+
+    public StatusConsumer(MessageHandler handler, Connection connection, Channel channel) {
+        super(channel);
+        this.handler = handler;
+        this.connection = connection;
+        this.channel = channel;
+    }
+
+    private StatusConsumer(Channel channel) {
+        super(channel);
+    }
+
+
+    @Override
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties properties,
+                               byte[] body) throws IOException {
+        Message message = new Message();
+
+        try {
+            ThriftUtils.createThriftFromBytes(body, message);
+            TBase event = null;
+            String gatewayId = null;
+
+            if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+                ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  with status " +
+                        experimentStatusChangeEvent.getState());
+                event = experimentStatusChangeEvent;
+                gatewayId = experimentStatusChangeEvent.getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.PROCESS)) {
+                ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
+                log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
+                        "message type " + message.getMessageType() + " with status " +
+                        processStatusChangeEvent.getState());
+                event = processStatusChangeEvent;
+                gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.TASK)) {
+                TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  with status " +
+                        taskStatusChangeEvent.getState());
+                event = taskStatusChangeEvent;
+                gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+            } else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
+                TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
+                event = taskOutputChangeEvent;
+                gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.JOB)) {
+                JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  with status " +
+                        jobStatusChangeEvent.getState());
+                event = jobStatusChangeEvent;
+                gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+                TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                        taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+                event = taskSubmitEvent;
+                gatewayId = taskSubmitEvent.getGatewayId();
+            } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
+                TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+                log.debug(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
+                        taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+                event = taskTerminateEvent;
+                gatewayId = null;
+            }
+            MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+            messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+            messageContext.setIsRedeliver(envelope.isRedeliver());
+            handler.onMessage(messageContext);
+        } catch (TException e) {
+            String msg = "Failed to de-serialize the thrift message, from routing keys: " + envelope.getRoutingKey();
+            log.warn(msg, e);
+        }
+    }
+}


[2/4] airavata git commit: Refactored messaging module to remove duplicate code and support multiple subscribers

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
deleted file mode 100644
index 561cde2..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.Consumer;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQStatusConsumer implements Consumer {
-	public static final String EXCHANGE_TYPE = "topic";
-	private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
-    private String exchangeName;
-    private String url;
-    private Connection connection;
-    private Channel channel;
-    private int prefetchCount;
-    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-
-    public RabbitMQStatusConsumer() throws AiravataException {
-        try {
-            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
-            createConnection();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-    }
-
-    public RabbitMQStatusConsumer(String brokerUrl, String exchangeName) throws AiravataException {
-        this.exchangeName = exchangeName;
-        this.url = brokerUrl;
-
-        createConnection();
-    }
-
-    private void createConnection() throws AiravataException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connectionFactory.setAutomaticRecoveryEnabled(true);
-            connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-
-            channel = connection.createChannel();
-            channel.basicQos(prefetchCount);
-            channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false);
-
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + exchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public String listen(final MessageHandler handler) throws AiravataException {
-        try {
-            Map<String, Object> props = handler.getProperties();
-            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routing == null) {
-                throw new IllegalArgumentException("The routing key must be present");
-            }
-
-            List<String> keys = new ArrayList<String>();
-            if (routing instanceof List) {
-                for (Object o : (List)routing) {
-                    keys.add(o.toString());
-                }
-            } else if (routing instanceof String) {
-                keys.add((String) routing);
-            }
-
-            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
-            if (queueName == null) {
-                if (!channel.isOpen()) {
-                    channel = connection.createChannel();
-                    channel.exchangeDeclare(exchangeName, "topic", false);
-                }
-                queueName = channel.queueDeclare().getQueue();
-            } else {
-                channel.queueDeclare(queueName, true, false, false, null);
-            }
-
-            final String id = getId(keys, queueName);
-            if (queueDetailsMap.containsKey(id)) {
-                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
-                        "cannot define the same subscriber twice");
-            }
-
-            if (consumerTag == null) {
-                consumerTag = "default";
-            }
-
-            // bind all the routing keys
-            for (String routingKey : keys) {
-                channel.queueBind(queueName, exchangeName, routingKey);
-            }
-
-            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag,
-                                           Envelope envelope,
-                                           AMQP.BasicProperties properties,
-                                           byte[] body) {
-                    Message message = new Message();
-
-                    try {
-                        ThriftUtils.createThriftFromBytes(body, message);
-                        TBase event = null;
-                        String gatewayId = null;
-
-                        if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
-                            ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    experimentStatusChangeEvent.getState());
-                            event = experimentStatusChangeEvent;
-                            gatewayId = experimentStatusChangeEvent.getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.PROCESS)) {
-	                        ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
-	                        ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
-	                        log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
-			                        "message type " + message.getMessageType() + " with status " +
-			                        processStatusChangeEvent.getState());
-	                        event = processStatusChangeEvent;
-	                        gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.TASK)) {
-                            TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    taskStatusChangeEvent.getState());
-                            event = taskStatusChangeEvent;
-                            gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
-                        }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
-                            TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
-                            event = taskOutputChangeEvent;
-                            gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.JOB)) {
-                            JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  with status " +
-                                    jobStatusChangeEvent.getState());
-                            event = jobStatusChangeEvent;
-                            gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
-                            TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
-                                    taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
-                            event = taskSubmitEvent;
-                            gatewayId = taskSubmitEvent.getGatewayId();
-                        } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
-                            TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
-                                    taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
-                            event = taskTerminateEvent;
-                            gatewayId = null;
-                        }
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
-                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-	                    messageContext.setIsRedeliver(envelope.isRedeliver());
-                        handler.onMessage(messageContext);
-                    } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
-                        log.warn(msg, e);
-                    }
-                }
-            });
-            // save the name for deleting the queue
-            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
-            return id;
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + exchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void stopListen(final String id) throws AiravataException {
-        QueueDetails details = queueDetailsMap.get(id);
-        if (details != null) {
-            try {
-                for (String key : details.getRoutingKeys()) {
-                    channel.queueUnbind(details.getQueueName(), exchangeName, key);
-                }
-                channel.queueDelete(details.getQueueName(), true, true);
-            } catch (IOException e) {
-                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
-                log.debug(msg);
-            }
-        }
-    }
-
-    /**
-     * Private class for holding some information about the consumers registered
-     */
-    private class QueueDetails {
-        String queueName;
-
-        List<String> routingKeys;
-
-        private QueueDetails(String queueName, List<String> routingKeys) {
-            this.queueName = queueName;
-            this.routingKeys = routingKeys;
-        }
-
-        public String getQueueName() {
-            return queueName;
-        }
-
-        public List<String> getRoutingKeys() {
-            return routingKeys;
-        }
-    }
-
-    private String getId(List<String> routingKeys, String queueName) {
-        String id = "";
-        for (String key : routingKeys) {
-            id = id + "_" + key;
-        }
-        return id + "_" + queueName;
-    }
-
-    public void close() {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (IOException ignore) {
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 60cb7a0..f5c4d2a 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -29,8 +29,12 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
-import org.apache.airavata.messaging.core.*;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -59,7 +63,14 @@ import org.apache.airavata.orchestrator.util.OrchestratorUtils;
 import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractResource;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ComputeResource;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
+import org.apache.airavata.registry.cpi.ReplicaCatalogException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -72,7 +83,12 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -83,7 +99,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private String airavataUserName;
 	private String gatewayName;
 	private Publisher publisher;
-	private RabbitMQStatusConsumer statusConsumer;
+	private Subscriber statusSubscribe;
 	private CuratorFramework curatorClient;
 
     /**
@@ -110,10 +126,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 			appCatalog = RegistryFactory.getAppCatalog();
 			orchestrator.initialize();
 			orchestrator.getOrchestratorContext().setPublisher(this.publisher);
-			String brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-			String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-			statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-			statusConsumer.listen(new ProcessStatusHandler());
+			List<String> routingKeys = new ArrayList<>();
+//			routingKeys.add("*"); // listen for gateway level messages
+//			routingKeys.add("*.*"); // listen for gateway/experiment level messages
+			routingKeys.add("*.*.*"); // listen for gateway/experiment/process level messages
+			statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Subscriber.Type.STATUS);
 			startCurator();
 		} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) {
 			log.error(e.getMessage(), e);
@@ -481,18 +498,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
     }
 
 	private class ProcessStatusHandler implements MessageHandler {
-
-		@Override
-		public Map<String, Object> getProperties() {
-			Map<String, Object> props = new HashMap<>();
-			List<String> routingKeys = new ArrayList<>();
-//			routingKeys.add("*"); // listen for gateway level messages
-//			routingKeys.add("*.*"); // listen for gateway/experiment level messages
-			routingKeys.add("*.*.*"); // listern for gateway/experiment/process level messages
-			props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-			return props;
-		}
-
 		/**
 		 * This method only handle MessageType.PROCESS type messages.
 		 * @param message

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
----------------------------------------------------------------------
diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
index 49af1ce..fa4c3de 100644
--- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
+++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
@@ -24,9 +24,8 @@ package org.apache.airavata.testsuite.multitenantedairavata;
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
@@ -56,7 +55,11 @@ import java.io.File;
 import java.io.PrintWriter;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class ExperimentExecution {
     private Airavata.Client airavata;
@@ -92,7 +95,7 @@ public class ExperimentExecution {
         String resultFileName = resultFileLocation + getResultFileName();
 
         File resultFolder = new File(resultFileLocation);
-        if (!resultFolder.exists()){
+        if (!resultFolder.exists()) {
             resultFolder.mkdir();
         }
         File resultFile = new File(resultFileName);
@@ -109,11 +112,11 @@ public class ExperimentExecution {
         this.resultWriter = resultWriter;
     }
 
-    protected Map<String, Map<String, String>> getApplicationMap (Map<String, String> tokenMap) throws  Exception{
+    protected Map<String, Map<String, String>> getApplicationMap(Map<String, String> tokenMap) throws Exception {
         appInterfaceMap = new HashMap<String, Map<String, String>>();
         try {
-            if (tokenMap != null && !tokenMap.isEmpty()){
-                for (String gatewayId : tokenMap.keySet()){
+            if (tokenMap != null && !tokenMap.isEmpty()) {
+                for (String gatewayId : tokenMap.keySet()) {
                     Map<String, String> allApplicationInterfaceNames = airavata.getAllApplicationInterfaceNames(authzToken, gatewayId);
                     appInterfaceMap.put(gatewayId, allApplicationInterfaceNames);
                 }
@@ -134,19 +137,19 @@ public class ExperimentExecution {
         return appInterfaceMap;
     }
 
-    protected Map<String, List<Project>> getProjects (Map<String, String> tokenMap) throws Exception{
+    protected Map<String, List<Project>> getProjects(Map<String, String> tokenMap) throws Exception {
         projectsMap = new HashMap<String, List<Project>>();
         try {
-            if (tokenMap != null && !tokenMap.isEmpty()){
-                for (String gatewayId : tokenMap.keySet()){
+            if (tokenMap != null && !tokenMap.isEmpty()) {
+                for (String gatewayId : tokenMap.keySet()) {
                     boolean isgatewayValid = true;
-                    for (String ovoidGateway : gatewaysToAvoid){
-                        if (gatewayId.equals(ovoidGateway)){
+                    for (String ovoidGateway : gatewaysToAvoid) {
+                        if (gatewayId.equals(ovoidGateway)) {
                             isgatewayValid = false;
                             break;
                         }
                     }
-                    if (isgatewayValid){
+                    if (isgatewayValid) {
                         List<Project> allUserProjects = airavata.getUserProjects(authzToken, gatewayId, testUser, 5, 0);
                         projectsMap.put(gatewayId, allUserProjects);
                     }
@@ -168,127 +171,119 @@ public class ExperimentExecution {
         return projectsMap;
     }
 
-    public void launchExperiments () throws Exception {
+    public void launchExperiments() throws Exception {
         try {
-            for (String expId : experimentsWithTokens.keySet()){
+            for (String expId : experimentsWithTokens.keySet()) {
                 airavata.launchExperiment(authzToken, expId, experimentsWithTokens.get(expId));
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while launching experiment", e);
             throw new Exception("Error while launching experiment", e);
         }
     }
 
-    public void monitorExperiments () throws Exception {
+    public void monitorExperiments() throws Exception {
 
         String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT);
         System.out.println("broker url " + brokerUrl);
         final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT);
-        RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-
-        consumer.listen(new MessageHandler() {
-            @Override
-            public Map<String, Object> getProperties() {
-                Map<String, Object> props = new HashMap<String, Object>();
-                List<String> routingKeys = new ArrayList<String>();
-                for (String expId : experimentsWithGateway.keySet()) {
-                    String gatewayId = experimentsWithGateway.get(expId);
-                    System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId);
-
-                    routingKeys.add(gatewayId);
-                    routingKeys.add(gatewayId + "." + expId);
-                    routingKeys.add(gatewayId + "." + expId + ".*");
-                    routingKeys.add(gatewayId + "." + expId + ".*.*");
-                    routingKeys.add(gatewayId + "." + expId + ".*.*.*");
-                }
-                props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-                return props;
-            }
+        Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Subscriber.Type.STATUS);
+    }
 
-            @Override
-            public void onMessage(MessageContext message) {
-
-                if (message.getType().equals(MessageType.EXPERIMENT)) {
-                    try {
-                        ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
-                        TBase messageEvent = message.getEvent();
-                        byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                        ThriftUtils.createThriftFromBytes(bytes, event);
-                        ExperimentState expState = event.getState();
-                        String expId = event.getExperimentId();
-                        String gatewayId = event.getGatewayId();
-
-                        if (expState.equals(ExperimentState.COMPLETED)) {
-                            resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
-                            resultWriter.println("=====================================================================");
-                            resultWriter.println("Status : " + ExperimentState.COMPLETED.toString());
-                            // check file transfers
-                            List<OutputDataObjectType> experimentOutputs = airavata.getExperimentOutputs(authzToken, expId);
-                            int i = 1;
-                            for (OutputDataObjectType output : experimentOutputs) {
-                                System.out.println("################ Experiment : " + expId + " COMPLETES ###################");
-                                System.out.println("Output " + i + " : " + output.getValue());
-                                resultWriter.println("Output " + i + " : " + output.getValue());
-                                i++;
-                            }
-                            resultWriter.println("End of Results for Experiment : " + expId );
-                            resultWriter.println("=====================================================================");
-                        } else if (expState.equals(ExperimentState.FAILED)) {
-                            resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
-                            resultWriter.println("=====================================================================");
-                            int j = 1;
-                            resultWriter.println("Status : " + ExperimentState.FAILED.toString());
-                            System.out.println("################ Experiment : " + expId + " FAILED ###################");
-                            ExperimentModel experiment = airavata.getExperiment(authzToken, expId);
-                            List<ErrorModel> errors = experiment.getErrors();
-                            if (errors != null && !errors.isEmpty()){
-                                for (ErrorModel errorDetails : errors) {
-                                    System.out.println(errorDetails.getActualErrorMessage());
-                                    resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage());
-                                    resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage());
-                                }
-                            }
+    private List<String> getRoutingKeys() {
+        List<String> routingKeys = new ArrayList<String>();
+        for (String expId : experimentsWithGateway.keySet()) {
+            String gatewayId = experimentsWithGateway.get(expId);
+            System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId);
+            routingKeys.add(gatewayId);
+            routingKeys.add(gatewayId + "." + expId);
+            routingKeys.add(gatewayId + "." + expId + ".*");
+            routingKeys.add(gatewayId + "." + expId + ".*.*");
+            routingKeys.add(gatewayId + "." + expId + ".*.*.*");
+        }
+        return routingKeys;
+    }
 
-                            resultWriter.println("End of Results for Experiment : " + expId );
-                            resultWriter.println("=====================================================================");
+    private void processMessage(MessageContext message) {
+        if (message.getType().equals(MessageType.EXPERIMENT)) {
+            try {
+                ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+                TBase messageEvent = message.getEvent();
+                byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                ThriftUtils.createThriftFromBytes(bytes, event);
+                ExperimentState expState = event.getState();
+                String expId = event.getExperimentId();
+                String gatewayId = event.getGatewayId();
+
+                if (expState.equals(ExperimentState.COMPLETED)) {
+                    resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
+                    resultWriter.println("=====================================================================");
+                    resultWriter.println("Status : " + ExperimentState.COMPLETED.toString());
+                    // check file transfers
+                    List<OutputDataObjectType> experimentOutputs = airavata.getExperimentOutputs(authzToken, expId);
+                    int i = 1;
+                    for (OutputDataObjectType output : experimentOutputs) {
+                        System.out.println("################ Experiment : " + expId + " COMPLETES ###################");
+                        System.out.println("Output " + i + " : " + output.getValue());
+                        resultWriter.println("Output " + i + " : " + output.getValue());
+                        i++;
+                    }
+                    resultWriter.println("End of Results for Experiment : " + expId);
+                    resultWriter.println("=====================================================================");
+                } else if (expState.equals(ExperimentState.FAILED)) {
+                    resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
+                    resultWriter.println("=====================================================================");
+                    int j = 1;
+                    resultWriter.println("Status : " + ExperimentState.FAILED.toString());
+                    System.out.println("################ Experiment : " + expId + " FAILED ###################");
+                    ExperimentModel experiment = airavata.getExperiment(authzToken, expId);
+                    List<ErrorModel> errors = experiment.getErrors();
+                    if (errors != null && !errors.isEmpty()) {
+                        for (ErrorModel errorDetails : errors) {
+                            System.out.println(errorDetails.getActualErrorMessage());
+                            resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage());
+                            resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage());
                         }
+                    }
+
+                    resultWriter.println("End of Results for Experiment : " + expId);
+                    resultWriter.println("=====================================================================");
+                }
 //                        System.out.println(" Experiment Id : '" + expId
 //                                + "' with state : '" + event.getState().toString() +
 //                                " for Gateway " + event.getGatewayId());
-                    } catch (TException e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                } else if (message.getType().equals(MessageType.JOB)) {
-                    try {
-                        JobStatusChangeEvent event = new JobStatusChangeEvent();
-                        TBase messageEvent = message.getEvent();
-                        byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                        ThriftUtils.createThriftFromBytes(bytes, event);
+            } catch (TException e) {
+                logger.error(e.getMessage(), e);
+            }
+        } else if (message.getType().equals(MessageType.JOB)) {
+            try {
+                JobStatusChangeEvent event = new JobStatusChangeEvent();
+                TBase messageEvent = message.getEvent();
+                byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                ThriftUtils.createThriftFromBytes(bytes, event);
 //                        System.out.println(" Job ID : '" + event.getJobIdentity().getJobId()
 //                                + "' with state : '" + event.getState().toString() +
 //                                " for Gateway " + event.getJobIdentity().getGatewayId());
 //                        resultWriter.println("Job Status : " + event.getState().toString());
 
-                    } catch (TException e) {
-                        logger.error(e.getMessage(), e);
-                    }
-                }
-            resultWriter.flush();
+            } catch (TException e) {
+                logger.error(e.getMessage(), e);
             }
-        });
+        }
+        resultWriter.flush();
     }
 
-    private String getResultFileName (){
+    private String getResultFileName() {
         DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HHmmss");
         Calendar cal = Calendar.getInstance();
         return dateFormat.format(cal.getTime());
     }
 
-    public void createAmberWithErrorInputs (String gatewayId,
-                                            String token,
-                                            String projectId,
-                                            String hostId,
-                                            String appId) throws Exception {
+    public void createAmberWithErrorInputs(String gatewayId,
+                                           String token,
+                                           String projectId,
+                                           String hostId,
+                                           String appId) throws Exception {
         try {
             List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
             List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
@@ -352,11 +347,11 @@ public class ExperimentExecution {
         }
     }
 
-    public void createAmberWithErrorUserConfig (String gatewayId,
-                                            String token,
-                                            String projectId,
-                                            String hostId,
-                                            String appId) throws Exception {
+    public void createAmberWithErrorUserConfig(String gatewayId,
+                                               String token,
+                                               String projectId,
+                                               String hostId,
+                                               String appId) throws Exception {
         try {
 
             TestFrameworkProps.Error[] errors = properties.getErrors();
@@ -422,25 +417,25 @@ public class ExperimentExecution {
         }
     }
 
-    public void createAmberExperiment () throws Exception{
+    public void createAmberExperiment() throws Exception {
         try {
             TestFrameworkProps.Application[] applications = properties.getApplications();
             Map<String, String> userGivenAmberInputs = new HashMap<>();
-            for (TestFrameworkProps.Application application : applications){
-                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){
+            for (TestFrameworkProps.Application application : applications) {
+                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) {
                     userGivenAmberInputs = application.getInputs();
                 }
             }
 
-            for (String gatewayId : csTokens.keySet()){
+            for (String gatewayId : csTokens.keySet()) {
                 String token = csTokens.get(gatewayId);
                 Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
-                for (String appId : appsWithNames.keySet()){
+                for (String appId : appsWithNames.keySet()) {
                     String appName = appsWithNames.get(appId);
-                    if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){
+                    if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) {
                         List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
                         List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
-                        for (String inputName : userGivenAmberInputs.keySet()){
+                        for (String inputName : userGivenAmberInputs.keySet()) {
                             for (InputDataObjectType inputDataObjectType : applicationInputs) {
                                 if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) {
                                     inputDataObjectType.setValue(userGivenAmberInputs.get(inputName));
@@ -449,7 +444,7 @@ public class ExperimentExecution {
                         }
                         List<Project> projectsPerGateway = projectsMap.get(gatewayId);
                         String projectID = null;
-                        if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){
+                        if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
                             projectID = projectsPerGateway.get(0).getProjectID();
                         }
                         ExperimentModel simpleExperiment =
@@ -470,7 +465,7 @@ public class ExperimentExecution {
                                     experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                     experimentsWithTokens.put(experimentId, token);
                                     experimentsWithGateway.put(experimentId, gatewayId);
-                                }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
+                                } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
                                     ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
                                     UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                     userConfigurationData.setAiravataAutoSchedule(false);
@@ -498,33 +493,33 @@ public class ExperimentExecution {
                     }
                 }
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while creating AMBEr experiment", e);
             throw new Exception("Error while creating AMBER experiment", e);
         }
     }
 
-    public void createUltrascanExperiment () throws Exception{
+    public void createUltrascanExperiment() throws Exception {
         try {
             TestFrameworkProps.Application[] applications = properties.getApplications();
             int numberOfIterations = properties.getNumberOfIterations();
             Map<String, String> userGivenAmberInputs = new HashMap<>();
-            for (TestFrameworkProps.Application application : applications){
-                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){
+            for (TestFrameworkProps.Application application : applications) {
+                if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) {
                     userGivenAmberInputs = application.getInputs();
                 }
             }
 
-            for (int i=0; i < numberOfIterations; i++){
-                for (String gatewayId : csTokens.keySet()){
+            for (int i = 0; i < numberOfIterations; i++) {
+                for (String gatewayId : csTokens.keySet()) {
                     String token = csTokens.get(gatewayId);
                     Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
-                    for (String appId : appsWithNames.keySet()){
+                    for (String appId : appsWithNames.keySet()) {
                         String appName = appsWithNames.get(appId);
-                        if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){
+                        if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) {
                             List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
                             List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
-                            for (String inputName : userGivenAmberInputs.keySet()){
+                            for (String inputName : userGivenAmberInputs.keySet()) {
                                 for (InputDataObjectType inputDataObjectType : applicationInputs) {
                                     if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) {
                                         inputDataObjectType.setValue(userGivenAmberInputs.get(inputName));
@@ -533,7 +528,7 @@ public class ExperimentExecution {
                             }
                             List<Project> projectsPerGateway = projectsMap.get(gatewayId);
                             String projectID = null;
-                            if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){
+                            if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
                                 projectID = projectsPerGateway.get(0).getProjectID();
                             }
                             ExperimentModel simpleExperiment =
@@ -554,7 +549,7 @@ public class ExperimentExecution {
                                         experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "batch", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -564,7 +559,7 @@ public class ExperimentExecution {
                                         experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -574,17 +569,17 @@ public class ExperimentExecution {
                                         experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "compute", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
                                         userConfigurationData.setOverrideManualScheduledParams(false);
                                         userConfigurationData.setComputationalResourceScheduling(scheduling);
                                         simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                        experimentId = airavata.createExperiment(authzToken,gatewayId, simpleExperiment);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
                                         experimentsWithTokens.put(experimentId, token);
                                         experimentsWithGateway.put(experimentId, gatewayId);
-                                    }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) {
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) {
                                         ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0);
                                         UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
                                         userConfigurationData.setAiravataAutoSchedule(false);
@@ -602,89 +597,89 @@ public class ExperimentExecution {
                 }
             }
 
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while creating Ultrascan experiment", e);
             throw new Exception("Error while creating Ultrascan experiment", e);
         }
     }
 
 
-    public void createEchoExperiment () throws Exception{
+    public void createEchoExperiment() throws Exception {
         try {
             for (String gatewayId : csTokens.keySet()) {
-                    boolean isgatewayValid = true;
-                    for (String ovoidGateway : gatewaysToAvoid){
-                        if (gatewayId.equals(ovoidGateway)){
-                            isgatewayValid = false;
-                            break;
-                        }
+                boolean isgatewayValid = true;
+                for (String ovoidGateway : gatewaysToAvoid) {
+                    if (gatewayId.equals(ovoidGateway)) {
+                        isgatewayValid = false;
+                        break;
                     }
-                    if (isgatewayValid) {
-                        String token = csTokens.get(gatewayId);
-                        Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
-                        for (String appId : appsWithNames.keySet()) {
-                            String appName = appsWithNames.get(appId);
-                            if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) {
-                                List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
-                                List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
-                                for (InputDataObjectType inputDataObjectType : applicationInputs) {
-                                    if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) {
-                                        inputDataObjectType.setValue("Hello World !!!");
-                                    }
+                }
+                if (isgatewayValid) {
+                    String token = csTokens.get(gatewayId);
+                    Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
+                    for (String appId : appsWithNames.keySet()) {
+                        String appName = appsWithNames.get(appId);
+                        if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) {
+                            List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
+                            List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
+                            for (InputDataObjectType inputDataObjectType : applicationInputs) {
+                                if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) {
+                                    inputDataObjectType.setValue("Hello World !!!");
                                 }
+                            }
 
-                                List<Project> projectsPerGateway = projectsMap.get(gatewayId);
-                                String projectID = null;
-                                if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
-                                    projectID = projectsPerGateway.get(0).getProjectID();
-                                }
-                                ExperimentModel simpleExperiment =
-                                        ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs);
-                                simpleExperiment.setExperimentOutputs(appOutputs);
-                                String experimentId;
-                                Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId);
-                                if (computeResources != null && computeResources.size() != 0) {
-                                    for (String id : computeResources.keySet()) {
-                                        String resourceName = computeResources.get(id);
-                                        if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) {
-                                            ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
-                                            UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
-                                            userConfigurationData.setAiravataAutoSchedule(false);
-                                            userConfigurationData.setOverrideManualScheduledParams(false);
-                                            userConfigurationData.setComputationalResourceScheduling(scheduling);
-                                            simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                            experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
-                                            experimentsWithTokens.put(experimentId, token);
-                                            experimentsWithGateway.put(experimentId, gatewayId);
-                                        } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
-                                            ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
-                                            UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
-                                            userConfigurationData.setAiravataAutoSchedule(false);
-                                            userConfigurationData.setOverrideManualScheduledParams(false);
-                                            userConfigurationData.setComputationalResourceScheduling(scheduling);
-                                            simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                            experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
-                                            experimentsWithTokens.put(experimentId, token);
-                                            experimentsWithGateway.put(experimentId, gatewayId);
-                                        } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) {
-                                            ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0);
-                                            UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
-                                            userConfigurationData.setAiravataAutoSchedule(false);
-                                            userConfigurationData.setOverrideManualScheduledParams(false);
-                                            userConfigurationData.setComputationalResourceScheduling(scheduling);
-                                            simpleExperiment.setUserConfigurationData(userConfigurationData);
-                                            experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
-                                            experimentsWithTokens.put(experimentId, token);
-                                            experimentsWithGateway.put(experimentId, gatewayId);
-                                        }
+                            List<Project> projectsPerGateway = projectsMap.get(gatewayId);
+                            String projectID = null;
+                            if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
+                                projectID = projectsPerGateway.get(0).getProjectID();
+                            }
+                            ExperimentModel simpleExperiment =
+                                    ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs);
+                            simpleExperiment.setExperimentOutputs(appOutputs);
+                            String experimentId;
+                            Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId);
+                            if (computeResources != null && computeResources.size() != 0) {
+                                for (String id : computeResources.keySet()) {
+                                    String resourceName = computeResources.get(id);
+                                    if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) {
+                                        ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
+                                        UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+                                        userConfigurationData.setAiravataAutoSchedule(false);
+                                        userConfigurationData.setOverrideManualScheduledParams(false);
+                                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+                                        experimentsWithTokens.put(experimentId, token);
+                                        experimentsWithGateway.put(experimentId, gatewayId);
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
+                                        ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
+                                        UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+                                        userConfigurationData.setAiravataAutoSchedule(false);
+                                        userConfigurationData.setOverrideManualScheduledParams(false);
+                                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+                                        experimentsWithTokens.put(experimentId, token);
+                                        experimentsWithGateway.put(experimentId, gatewayId);
+                                    } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) {
+                                        ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0);
+                                        UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+                                        userConfigurationData.setAiravataAutoSchedule(false);
+                                        userConfigurationData.setOverrideManualScheduledParams(false);
+                                        userConfigurationData.setComputationalResourceScheduling(scheduling);
+                                        simpleExperiment.setUserConfigurationData(userConfigurationData);
+                                        experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+                                        experimentsWithTokens.put(experimentId, token);
+                                        experimentsWithGateway.put(experimentId, gatewayId);
                                     }
                                 }
                             }
                         }
                     }
+                }
 
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("Error while creating Echo experiment", e);
             throw new Exception("Error while creating Echo experiment", e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index 8339aea..a492ef2 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -24,16 +24,18 @@ package org.apache.airavata.workflow.core;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,16 +45,17 @@ import java.util.concurrent.Executors;
 public class WorkflowEnactmentService {
 
     private static WorkflowEnactmentService workflowEnactmentService;
-    private final RabbitMQStatusConsumer statusConsumer;
+    private final Subscriber statusSubscriber;
     private String consumerId;
     private ExecutorService executor;
     private Map<String,WorkflowInterpreter> workflowMap;
 
     private WorkflowEnactmentService () throws AiravataException {
         executor = Executors.newFixedThreadPool(getThreadPoolSize());
-        workflowMap = new ConcurrentHashMap<String, WorkflowInterpreter>();
-        statusConsumer = new RabbitMQStatusConsumer();
-        consumerId = statusConsumer.listen(new TaskMessageHandler());
+        workflowMap = new ConcurrentHashMap<>();
+        statusSubscriber = MessagingFactory.getSubscriber((message -> executor.execute(new StatusHandler(message))),
+                                                           getRoutingKeys(),
+                                                           Subscriber.Type.STATUS);
         // register the shutdown hook to un-bind status consumer.
         Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
     }
@@ -80,33 +83,20 @@ public class WorkflowEnactmentService {
 
     }
 
-    private int getThreadPoolSize() {
-        return ServerSettings.getEnactmentThreadPoolSize();
-    }
-
-    private class TaskMessageHandler implements MessageHandler {
-
-        @Override
-        public Map<String, Object> getProperties() {
-            Map<String, Object> props = new HashMap<String, Object>();
-            String gatewayId = "*";
-            String experimentId = "*";
-            List<String> routingKeys = new ArrayList<String>();
-            routingKeys.add(gatewayId);
-            routingKeys.add(gatewayId + "." + experimentId);
-            routingKeys.add(gatewayId + "." + experimentId+ ".*");
-            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-            return props;
-        }
-
-        @Override
-        public void onMessage(MessageContext msgCtx) {
-            StatusHandler statusHandler = new StatusHandler(msgCtx);
-            executor.execute(statusHandler);
-        }
 
+    public List<String> getRoutingKeys() {
+        String gatewayId = "*";
+        String experimentId = "*";
+        List<String> routingKeys = new ArrayList<String>();
+        routingKeys.add(gatewayId);
+        routingKeys.add(gatewayId + "." + experimentId);
+        routingKeys.add(gatewayId + "." + experimentId+ ".*");
+        routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+        return routingKeys;
+    }
 
+    private int getThreadPoolSize() {
+        return ServerSettings.getEnactmentThreadPoolSize();
     }
 
     private class StatusHandler implements Runnable{
@@ -169,7 +159,7 @@ public class WorkflowEnactmentService {
         public void run() {
             super.run();
             try {
-                statusConsumer.stopListen(consumerId);
+                statusSubscriber.stopListen(consumerId);
                 log.info("Successfully un-binded task status consumer");
             } catch (AiravataException e) {
                 log.error("Error while un-bind enactment status consumer", e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
index b42e7ac..ecfdeea 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
@@ -23,7 +23,6 @@ package org.apache.airavata.workflow.core;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.ComponentState;
 import org.apache.airavata.model.ComponentStatus;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -32,14 +31,27 @@ import org.apache.airavata.model.messaging.event.ProcessIdentifier;
 import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.WorkflowCatalog;
+import org.apache.airavata.registry.cpi.WorkflowCatalogException;
 import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.*;
+import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
+import org.apache.airavata.workflow.core.dag.nodes.InputNode;
+import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
 import org.apache.airavata.workflow.core.parser.WorkflowParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -64,7 +76,6 @@ class WorkflowInterpreter {
     private Registry registry;
     private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
     private RabbitMQProcessLaunchPublisher publisher;
-    private RabbitMQStatusConsumer statusConsumer;
     private String consumerId;
     private boolean continueWorkflow = true;
 


[3/4] airavata git commit: Refactored messaging module to remove duplicate code and support multiple subscribers

Posted by sh...@apache.org.
Refactored messaging module to remove duplicate code and support multiple subscribers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e247b00d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e247b00d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e247b00d

Branch: refs/heads/develop
Commit: e247b00d0b4d65a684f5e5551549c329241492d7
Parents: a6670f8
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Tue Aug 9 18:19:09 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Tue Aug 9 18:19:09 2016 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   |  42 +-
 .../main/resources/airavata-server.properties   |  12 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |  26 +-
 .../apache/airavata/gfac/impl/GFacWorker.java   |   3 +-
 .../airavata/gfac/server/GfacServerHandler.java |  82 +--
 .../messaging/client/RabbitMQListener.java      | 258 +++++----
 .../airavata/messaging/core/Consumer.java       |  40 --
 .../airavata/messaging/core/MessageHandler.java |   4 +-
 .../messaging/core/MessagingConstants.java      |   3 +-
 .../airavata/messaging/core/Metadata.java       |  25 -
 .../airavata/messaging/core/Publisher.java      |   1 +
 .../airavata/messaging/core/TestClient.java     |  56 +-
 .../impl/RabbitMQProcessLaunchConsumer.java     | 574 +++++++++----------
 .../impl/RabbitMQProcessLaunchPublisher.java    |   2 +-
 .../core/impl/RabbitMQStatusConsumer.java       | 286 ---------
 .../server/OrchestratorServerHandler.java       |  47 +-
 .../ExperimentExecution.java                    | 391 +++++++------
 .../workflow/core/WorkflowEnactmentService.java |  60 +-
 .../workflow/core/WorkflowInterpreter.java      |  21 +-
 19 files changed, 785 insertions(+), 1148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 5699e9a..2459658 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -84,8 +84,15 @@ public class ServerSettings extends ApplicationSettings {
     public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable";
     public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids";
     public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags";
-    public static final String LAUNCH_QUEUE_NAME = "launch.queue.name";
-    public static final String CANCEL_QUEUE_NAME = "cancel.queue.name";
+
+    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+    public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
+    public static final String RABBITMQ_PROCESS_EXCHANGE_NAME = "rabbitmq.process.exchange.name";
+    public static final String RABBITMQ_EXPERIMENT_EXCHANGE_NAME = "rabbitmq.experiment.exchange.name";
+    public static final String RABBITMQ_PROCESS_LAUNCH_QUEUE_NAME = "process.launch.queue.name";
+    public static final String RABBITMQ_EXPERIMENT_LAUNCH_QUEUE_NAME = "experiment.launch.queue.name";
+    public static final String RABBITMQ_DURABLE_QUEUE="durable.queue";
+    public static final String RABBITMQ_PREFETCH_COUNT="prefetch.count";
 
 
     //    Workflow Enactment Service component configuration.
@@ -110,13 +117,36 @@ public class ServerSettings extends ApplicationSettings {
         return getSetting(DEFAULT_USER);
     }
 
-    public static String getLaunchQueueName() {
-        return getSetting(LAUNCH_QUEUE_NAME, "launch.queue");
+    public static String getRabbitmqProcessLaunchQueueName() {
+        return getSetting(RABBITMQ_PROCESS_LAUNCH_QUEUE_NAME, "process.launch.queue");
+    }
+
+    public static String getRabbitmqExperimentLaunchQueueName() {
+        return getSetting(RABBITMQ_EXPERIMENT_EXCHANGE_NAME, "experiment.launch.queue");
+    }
+
+    public static String getRabbitmqBrokerUrl() throws ApplicationSettingsException {
+        return getSetting(RABBITMQ_BROKER_URL);
+    }
+
+    public static String getRabbitmqStatusExchangeName(){
+        return getSetting(RABBITMQ_STATUS_EXCHANGE_NAME, "status_exchange");
+    }
+
+    public static String getRabbitmqProcessExchangeName(){
+        return getSetting(RABBITMQ_PROCESS_EXCHANGE_NAME, "process_exchange");
     }
 
+    public static String getRabbitmqExperimentExchangeName() {
+        return getSetting(RABBITMQ_EXPERIMENT_EXCHANGE_NAME, "experiment_exchange");
+    }
+
+    public static boolean getRabbitmqDurableQueue(){
+        return Boolean.valueOf(getSetting(RABBITMQ_DURABLE_QUEUE, "false"));
+    }
 
-    public static String getCancelQueueName() {
-        return getSetting(CANCEL_QUEUE_NAME, "cancel.queue");
+    public static int getRabbitmqPrefetchCount(){
+        return Integer.valueOf(getSetting(RABBITMQ_PREFETCH_COUNT, "200"));
     }
 
     public static String getDefaultUserPassword() throws ApplicationSettingsException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 4fcdd03..29b256f 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -246,16 +246,14 @@ rabbitmq.broker.url=amqp://localhost:5672
 #for production scenarios, give url as amqp://userName:password@hostName:portNumber/virtualHost, create user, virtualhost
 # and give permissions, refer: http://blog.dtzq.com/2012/06/rabbitmq-users-and-virtual-hosts.html
 #rabbitmq.broker.url=amqp://airavata:airavata@localhost:5672/messaging
-status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
-task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher
-rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
-rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
+rabbitmq.status.exchange.name=status_exchange
+rabbitmq.process.exchange.name=process_exchange
+rabbitmq.experiment.exchange.name=experiment_exchange
 durable.queue=false
 prefetch.count=200
-launch.queue.name=launch.queue
-cancel.queue.name=cancel.queue
+process.launch.queue.name=process.launch.queue
+experiment.launch.queue.name=experiment.launch.queue
 activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
 
 ###########################################################################
 # Zookeeper Server Configuration

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index fbeb1d8..d105c18 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.JobManagerConfiguration;
-import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
 import org.apache.airavata.gfac.core.cluster.OutputParser;
@@ -55,15 +54,23 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
-import org.apache.airavata.gfac.impl.job.*;
+import org.apache.airavata.gfac.impl.job.ForkJobConfiguration;
+import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
+import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
+import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration;
+import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.task.ArchiveTask;
 import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
 import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
 import org.apache.airavata.model.data.movement.DataMovementProtocol;
@@ -81,6 +88,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -106,7 +114,7 @@ public abstract class Factory {
 	private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>();
 	private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
 	private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>();
-	private static RabbitMQProcessLaunchConsumer processLaunchConsumer;
+	private static Subscriber processLaunchSubscriber;
 	private static Map<String, Session> sessionMap = new HashMap<>();
 
 	public static GFacEngine getGFacEngine() throws GFacException {
@@ -159,11 +167,11 @@ public abstract class Factory {
 		return curatorClient;
 	}
 
-	public static RabbitMQProcessLaunchConsumer getProcessLaunchConsumer() throws AiravataException {
-		if (processLaunchConsumer == null) {
-			processLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+	public static synchronized  Subscriber getProcessLaunchSubscriber() throws AiravataException {
+		if (processLaunchSubscriber == null) {
+			processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Subscriber.Type.PROCESS_LAUNCH);
 		}
-		return processLaunchConsumer;
+		return processLaunchSubscriber;
 	}
 
 	public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 6dc4c1d..001b679 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,7 +29,6 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
-import org.apache.airavata.registry.core.experiment.catalog.model.Process;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,7 +239,7 @@ public class GFacWorker implements Runnable {
 			try {
                 long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
                         processContext.getExperimentId(), processId);
-                Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+                Factory.getProcessLaunchSubscriber().sendAck(processDeliveryTag);
                 processContext.setAcknowledge(true);
                 log.info("expId: {}, processId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
                         processId, processDeliveryTag);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c59e199..a490d91 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,7 +28,6 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
@@ -37,11 +36,15 @@ import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.gfac.impl.GFacWorker;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.messaging.core.Subscriber;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
@@ -60,16 +63,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class GfacServerHandler implements GfacService.Iface {
     private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
-    private RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer;
+    private Subscriber processLaunchSubscriber;
     private static int requestCount=0;
     private ExperimentCatalog experimentCatalog;
     private AppCatalog appCatalog;
@@ -88,7 +89,6 @@ public class GfacServerHandler implements GfacService.Iface {
             initZkDataStructure();
             initAMQPClient();
 	        executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
-            startStatusUpdators(experimentCatalog, curatorClient, statusPublisher, rabbitMQProcessLaunchConsumer);
         } catch (Exception e) {
             throw new AiravataStartupException("Gfac Server Initialization error ", e);
         }
@@ -96,9 +96,10 @@ public class GfacServerHandler implements GfacService.Iface {
 
     private void initAMQPClient() throws AiravataException {
 	    // init process consumer
-        rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer();
-        rabbitMQProcessLaunchConsumer.listen(new ProcessLaunchMessageHandler());
-	    // init status publisher
+        List<String> routingKeys = new ArrayList<>();
+        routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName());
+        processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Subscriber.Type.PROCESS_LAUNCH);
+        // init status publisher
 	    statusPublisher = new RabbitMQStatusPublisher();
     }
 
@@ -173,25 +174,6 @@ public class GfacServerHandler implements GfacService.Iface {
         return false;
     }
 
-    public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, Publisher publisher,
-
-                                           RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer) {
-       /* try {
-            String[] listenerClassList = ServerSettings.getActivityListeners();
-            Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
-            for (String listenerClass : listenerClassList) {
-                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
-                AbstractActivityListener abstractActivityListener = aClass.newInstance();
-                activityListeners.add(abstractActivityListener);
-                abstractActivityListener.setup(statusPublisher, experimentCatalog, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
-                log.info("Registering listener: " + listenerClass);
-                statusPublisher.registerListener(abstractActivityListener);
-            }
-        } catch (Exception e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        }*/
-    }
-
     private class ProcessLaunchMessageHandler implements MessageHandler {
         private String experimentNode;
         private String gfacServerName;
@@ -201,15 +183,6 @@ public class GfacServerHandler implements GfacService.Iface {
             gfacServerName = ServerSettings.getGFacServerName();
         }
 
-        public Map<String, Object> getProperties() {
-            Map<String, Object> props = new HashMap<String, Object>();
-            ArrayList<String> keys = new ArrayList<String>();
-            keys.add(ServerSettings.getLaunchQueueName());
-            keys.add(ServerSettings.getCancelQueueName());
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
-            props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
-            return props;
-        }
 
         public void onMessage(MessageContext message) {
             log.info(" Message Received with message id '" + message.getMessageId()
@@ -232,7 +205,7 @@ public class GfacServerHandler implements GfacService.Iface {
 			                } catch (Exception e) {
 				                log.error("Error while updating delivery tag for redelivery message , messageId : " +
 						                message.getMessageId(), e);
-				                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+				                processLaunchSubscriber.sendAck(message.getDeliveryTag());
 			                }
 		                } else {
 			                // read process status from registry
@@ -264,7 +237,7 @@ public class GfacServerHandler implements GfacService.Iface {
                                 status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                                 Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
                                 publishProcessStatus(event, status);
-                                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+                                processLaunchSubscriber.sendAck(message.getDeliveryTag());
                                 return;
                             } else {
                                 setCancelData(event.getExperimentId(),event.getProcessId());
@@ -273,7 +246,7 @@ public class GfacServerHandler implements GfacService.Iface {
                         submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
-                        rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+                        processLaunchSubscriber.sendAck(message.getDeliveryTag());
                     }
                 } catch (TException e) {
                     log.error(e.getMessage(), e); //nobody is listening so nothing to throw
@@ -283,31 +256,6 @@ public class GfacServerHandler implements GfacService.Iface {
 	                log.error("Error while publishing process status", e);
                 }
             }
-            // TODO - Now there is no process termination type messages, use zookeeper instead of rabbitmq to do that. it is safe to remove this else part.
-            else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
-                ProcessTerminateEvent event = new ProcessTerminateEvent();
-                TBase messageEvent = message.getEvent();
-                try {
-                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                    ThriftUtils.createThriftFromBytes(bytes, event);
-	                boolean success = GFacUtils.setExperimentCancelRequest(event.getProcessId(), curatorClient,
-			                message.getDeliveryTag());
-	                if (success) {
-		                log.info("processId:{} - Process cancel request save successfully", event.getProcessId());
-	                }
-                } catch (Exception e) {
-	                log.error("processId:" + event.getProcessId() + " - Process cancel reqeust failed", e);
-                }finally {
-	                try {
-		                if (!rabbitMQProcessLaunchConsumer.isOpen()) {
-			                rabbitMQProcessLaunchConsumer.reconnect();
-		                }
-		                rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
-	                } catch (AiravataException e) {
-		                log.error("processId: " + event.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e);
-	                }
-                }
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
index 301934b..984aa59 100644
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
@@ -22,25 +22,32 @@
 package org.apache.airavata.messaging.client;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.commons.cli.*;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 
 public class RabbitMQListener {
@@ -48,12 +55,9 @@ public class RabbitMQListener {
     public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
     private final static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
     private static String gatewayId = "*";
-    private static boolean gatewayLevelMessages = false;
-    private static boolean experimentLevelMessages = false;
-    private static boolean jobLevelMessages = false;
     private static String experimentId = "*";
     private static String jobId = "*";
-    private static boolean allMessages = false;
+    private static LEVEL level = LEVEL.ALL;
 
     public static void main(String[] args) {
         File file = new File("/tmp/latency_client");
@@ -64,66 +68,39 @@ public class RabbitMQListener {
             String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
             System.out.println("broker url " + brokerUrl);
             final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-            consumer.listen(new MessageHandler() {
-                @Override
-                public Map<String, Object> getProperties() {
-                    Map<String, Object> props = new HashMap<String, Object>();
-                    List<String> routingKeys = new ArrayList<String>();
-                    if (allMessages){
-                        routingKeys.add("*");
-                        routingKeys.add("*.*");
-                        routingKeys.add("*.*.*");
-                        routingKeys.add("*.*.*.*");
-                        routingKeys.add("*.*.*.*.*");
-                    }else {
-                        if (gatewayLevelMessages){
-                            routingKeys.add(gatewayId);
-                            routingKeys.add(gatewayId + ".*");
-                            routingKeys.add(gatewayId + ".*.*");
-                            routingKeys.add(gatewayId + ".*.*.*");
-                            routingKeys.add(gatewayId + ".*.*.*.*");
-                        }else if (experimentLevelMessages){
-                            routingKeys.add(gatewayId);
-                            routingKeys.add(gatewayId + "." + experimentId);
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*");
-                        }else if  (jobLevelMessages){
-                            routingKeys.add(gatewayId);
-                            routingKeys.add(gatewayId + "." + experimentId);
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
-                            routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId);
-                        }
-                    }
-                    props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-                    return props;
-                }
+            List<String> routingKeys = getRoutingKeys(level);
+            Subscriber subscriber = MessagingFactory.getSubscriber(null, routingKeys, Subscriber.Type.STATUS);
+        } catch (ApplicationSettingsException e) {
+            logger.error("Error reading airavata server properties", e);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
 
-                @Override
-                public void onMessage(MessageContext message) {
-                    try {
-                        long latency = System.currentTimeMillis() - message.getUpdatedTime().getTime();
-                        bw.write(message.getMessageId() + " :" + latency);
-                        bw.newLine();
-                        bw.flush();
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-                    if (message.getType().equals(MessageType.EXPERIMENT)){
-                        try {
-                            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
-                                       " for Gateway " + event.getGatewayId());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }else if (message.getType().equals(MessageType.PROCESS)){
+    }
+
+    private static MessageHandler getMessageHandler(final BufferedWriter bw) {
+        return message -> {
+            try {
+                long latency = System.currentTimeMillis() - message.getUpdatedTime().getTime();
+                bw.write(message.getMessageId() + " :" + latency);
+                bw.newLine();
+                bw.flush();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            if (message.getType().equals(MessageType.EXPERIMENT)) {
+                try {
+                    ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+                            " for Gateway " + event.getGatewayId());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            } else if (message.getType().equals(MessageType.PROCESS)) {
                         /*try {
                             WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent();
                             TBase messageEvent = message.getEvent();
@@ -135,93 +112,132 @@ public class RabbitMQListener {
                         } catch (TException e) {
                             logger.error(e.getMessage(), e);
                         }*/
-                    }else if (message.getType().equals(MessageType.TASK)){
-                        try {
-                            TaskStatusChangeEvent event = new TaskStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
-                                    " for Gateway " + event.getTaskIdentity().getGatewayId());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }else if (message.getType().equals(MessageType.JOB)){
-                        try {
-                            JobStatusChangeEvent event = new JobStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
-                                    " for Gateway " + event.getJobIdentity().getGatewayId());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }
+            } else if (message.getType().equals(MessageType.TASK)) {
+                try {
+                    TaskStatusChangeEvent event = new TaskStatusChangeEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+                            " for Gateway " + event.getTaskIdentity().getGatewayId());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e);
                 }
-            });
-        } catch (ApplicationSettingsException e) {
-            logger.error("Error reading airavata server properties", e);
-        }catch (Exception e) {
-           logger.error(e.getMessage(), e);
-        }
+            } else if (message.getType().equals(MessageType.JOB)) {
+                try {
+                    JobStatusChangeEvent event = new JobStatusChangeEvent();
+                    TBase messageEvent = message.getEvent();
+                    byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                    ThriftUtils.createThriftFromBytes(bytes, event);
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+                            " for Gateway " + event.getJobIdentity().getGatewayId());
+                } catch (TException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        };
 
     }
 
+    private static List<String> getRoutingKeys(LEVEL level) {
+        List<String> routingKeys = new ArrayList<String>();
+        switch (level) {
+            case ALL:
+                routingKeys.add("*");
+                routingKeys.add("*.*");
+                routingKeys.add("*.*.*");
+                routingKeys.add("*.*.*.*");
+                routingKeys.add("*.*.*.*.*");
+                break;
+            case GATEWAY:
+                routingKeys.add(gatewayId);
+                routingKeys.add(gatewayId + ".*");
+                routingKeys.add(gatewayId + ".*.*");
+                routingKeys.add(gatewayId + ".*.*.*");
+                routingKeys.add(gatewayId + ".*.*.*.*");
+                break;
+            case EXPERIMENT:
+                routingKeys.add(gatewayId);
+                routingKeys.add(gatewayId + "." + experimentId);
+                routingKeys.add(gatewayId + "." + experimentId + ".*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*.*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*.*.*");
+                break;
+            case JOB:
+                routingKeys.add(gatewayId);
+                routingKeys.add(gatewayId + "." + experimentId);
+                routingKeys.add(gatewayId + "." + experimentId + ".*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*.*");
+                routingKeys.add(gatewayId + "." + experimentId + ".*." + jobId);
+                break;
+            default:
+                break;
+        }
+        return routingKeys;
+    }
+
     public static void parseArguments(String[] args) {
-        try{
+        try {
             Options options = new Options();
 
-            options.addOption("gId", true , "Gateway ID");
+            options.addOption("gId", true, "Gateway ID");
             options.addOption("eId", true, "Experiment ID");
             options.addOption("jId", true, "Job ID");
             options.addOption("a", false, "All Notifications");
 
             CommandLineParser parser = new PosixParser();
-            CommandLine cmd = parser.parse( options, args);
-            if (cmd.getOptions() == null || cmd.getOptions().length == 0){
+            CommandLine cmd = parser.parse(options, args);
+            if (cmd.getOptions() == null || cmd.getOptions().length == 0) {
                 logger.info("You have not specified any options. We assume you need to listen to all the messages...");
-                allMessages = true;
+                level = LEVEL.ALL;
                 gatewayId = "*";
             }
-            if (cmd.hasOption("a")){
+            if (cmd.hasOption("a")) {
                 logger.info("Listening to all the messages...");
-                allMessages = true;
+                level = LEVEL.ALL;
                 gatewayId = "*";
-            }else {
+            } else {
                 gatewayId = cmd.getOptionValue("gId");
-                if (gatewayId == null){
+                if (gatewayId == null) {
                     gatewayId = "*";
                     logger.info("You have not specified a gateway id. We assume you need to listen to all the messages...");
                 } else {
-                    gatewayLevelMessages = true;
+                    level = LEVEL.GATEWAY;
                 }
                 experimentId = cmd.getOptionValue("eId");
-                if (experimentId == null && !gatewayId.equals("*")){
+                if (experimentId == null && !gatewayId.equals("*")) {
                     experimentId = "*";
                     logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId);
                 } else if (experimentId == null && gatewayId.equals("*")) {
                     experimentId = "*";
                     logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages...");
-                }else {
-                    experimentLevelMessages = true;
+                } else {
+                    level = LEVEL.EXPERIMENT;
                 }
                 jobId = cmd.getOptionValue("jId");
-                if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){
+                if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")) {
                     jobId = "*";
                     logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId
-                            + " with experiment id : " + experimentId );
+                            + " with experiment id : " + experimentId);
                 } else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) {
                     jobId = "*";
                     logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages...");
-                }else {
-                    jobLevelMessages = true;
+                } else {
+                    level = LEVEL.JOB;
                 }
             }
         } catch (ParseException e) {
-            logger.error("Error while reading command line parameters" , e);
+            logger.error("Error while reading command line parameters", e);
         }
     }
+
+    private enum LEVEL {
+        ALL,
+        GATEWAY,
+        EXPERIMENT,
+        JOB;
+    }
 }
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
deleted file mode 100644
index eb557ac..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core;
-
-import org.apache.airavata.common.exception.AiravataException;
-
-/**
- * This is the basic consumer
- */
-public interface Consumer {
-    /**
-     * Start listening for messages, The binding properties are specified in the handler.
-     * Returns and unique id to this Consumer. This id can be used to stop the listening
-     * @param handler
-     * @return
-     * @throws AiravataException
-     */
-    public String listen(MessageHandler handler) throws AiravataException;
-
-    public void stopListen(final String id) throws AiravataException;
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index c5f8b3d..23646da 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -21,10 +21,8 @@
 
 package org.apache.airavata.messaging.core;
 
-import java.util.Map;
-
+@FunctionalInterface
 public interface MessageHandler {
-    Map<String, Object> getProperties();
 
     void onMessage(MessageContext message);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index 7fb77a9..a64d18d 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -21,10 +21,11 @@
 
 package org.apache.airavata.messaging.core;
 
+@Deprecated
 public abstract class MessagingConstants {
     public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
     public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
-    public static final String RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME = "rabbitmq.task.launch.exchange.name";
+    public static final String RABBITMQ_TASK_EXCHANGE_NAME = "rabbitmq.task.exchange.name";
 
     public static final String RABBIT_ROUTING_KEY = "routingKey";
     public static final String RABBIT_QUEUE= "queue";

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
deleted file mode 100644
index cbf41c1..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core;
-
-public class Metadata {
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index dea8b00..b8b586c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.AiravataException;
 /**
  * This is the basic publisher interface.
  */
+@FunctionalInterface
 public interface Publisher {
 
     /**

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 89fe7ce..daa9886 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -22,10 +22,8 @@
 package org.apache.airavata.messaging.core;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.thrift.TBase;
@@ -34,9 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 
 public class TestClient {
@@ -47,36 +43,10 @@ public class TestClient {
 
     public static void main(String[] args) {
         try {
-            String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
-            final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
-            RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-            consumer.listen(new MessageHandler() {
-                @Override
-                public Map<String, Object> getProperties() {
-                    Map<String, Object> props = new HashMap<String, Object>();
-                    List<String> routingKeys = new ArrayList<String>();
-                    routingKeys.add(experimentId);
-                    routingKeys.add(experimentId + ".*");
-                    props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
-                    return props;
-                }
-
-                @Override
-                public void onMessage(MessageContext message) {
-                    if (message.getType().equals(MessageType.EXPERIMENT)){
-                        try {
-                            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
-                            TBase messageEvent = message.getEvent();
-                            byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
-                            ThriftUtils.createThriftFromBytes(bytes, event);
-                            System.out.println(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString());
-                        } catch (TException e) {
-                            logger.error(e.getMessage(), e);
-                        }
-                    }
-                }
-            });
+            List<String> routingKeys = new ArrayList<>();
+            routingKeys.add(experimentId);
+            routingKeys.add(experimentId + ".*");
+            MessagingFactory.getSubscriber(getMessageHandler(),routingKeys,  Subscriber.Type.STATUS);
         } catch (ApplicationSettingsException e) {
             logger.error("Error reading airavata server properties", e);
         }catch (Exception e) {
@@ -84,4 +54,22 @@ public class TestClient {
         }
 
     }
+
+
+    private static MessageHandler getMessageHandler(){
+        return message -> {
+                if (message.getType().equals(MessageType.EXPERIMENT)){
+                    try {
+                        ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+                        TBase messageEvent = message.getEvent();
+                        byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+                        ThriftUtils.createThriftFromBytes(bytes, event);
+                        System.out.println(" Message Received with message id '" + message.getMessageId()
+                                + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString());
+                    } catch (TException e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            };
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
index 855ae27..456ca07 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -1,288 +1,288 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQProcessLaunchConsumer {
-    private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
-    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
-    private String taskLaunchExchangeName;
-    private String url;
-    private Connection connection;
-    private Channel channel;
-    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-    private boolean durableQueue;
-    private MessageHandler messageHandler;
-    private int prefetchCount;
-
-
-    public RabbitMQProcessLaunchConsumer() throws AiravataException {
-        try {
-            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
-            taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
-            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
-            createConnection();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-    }
-
-    public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
-        this.taskLaunchExchangeName = exchangeName;
-        this.url = brokerUrl;
-
-        createConnection();
-    }
-
-    private void createConnection() throws AiravataException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connectionFactory.setAutomaticRecoveryEnabled(true);
-            connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
-
-            channel = connection.createChannel();
-            channel.basicQos(prefetchCount);
-
-//            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void reconnect() throws AiravataException{
-        if(messageHandler!=null) {
-            try {
-                listen(messageHandler);
-            } catch (AiravataException e) {
-                String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-                log.error(msg);
-                throw new AiravataException(msg, e);
-
-            }
-        }
-    }
-    public String listen(final MessageHandler handler) throws AiravataException {
-        try {
-            messageHandler = handler;
-            Map<String, Object> props = handler.getProperties();
-            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
-            if (routing == null) {
-                throw new IllegalArgumentException("The routing key must be present");
-            }
-            List<String> keys = new ArrayList<String>();
-            if (routing instanceof List) {
-                for (Object o : (List)routing) {
-                    keys.add(o.toString());
-                }
-            } else if (routing instanceof String) {
-                keys.add((String) routing);
-            }
-
-            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
-            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
-            if (queueName == null) {
-                if (!channel.isOpen()) {
-                    channel = connection.createChannel();
-                    channel.basicQos(prefetchCount);
-//                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-                }
-                queueName = channel.queueDeclare().getQueue();
-            } else {
-
-                channel.queueDeclare(queueName, durableQueue, false, false, null);
-            }
-
-            final String id = getId(keys, queueName);
-            if (queueDetailsMap.containsKey(id)) {
-                throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
-                        "cannot define the same subscriber twice");
-            }
-
-            if (consumerTag == null) {
-                consumerTag = "default";
-            }
-
-            // bind all the routing keys
-//            for (String routingKey : keys) {
-//                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.messaging.core.impl;
+//
+//import com.rabbitmq.client.*;
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.AiravataUtils;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessageHandler;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TBase;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//public class RabbitMQProcessLaunchConsumer {
+//    private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
+//    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class);
+//
+//    private String taskLaunchExchangeName;
+//    private String url;
+//    private Connection connection;
+//    private Channel channel;
+//    private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+//    private boolean durableQueue;
+//    private MessageHandler messageHandler;
+//    private int prefetchCount;
+//
+//
+//    public RabbitMQProcessLaunchConsumer() throws AiravataException {
+//        try {
+//            url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+//            durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
+//            taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+//            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
+//            createConnection();
+//        } catch (ApplicationSettingsException e) {
+//            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+//            log.error(message, e);
+//            throw new AiravataException(message, e);
+//        }
+//    }
+//
+//    public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+//        this.taskLaunchExchangeName = exchangeName;
+//        this.url = brokerUrl;
+//
+//        createConnection();
+//    }
+//
+//    private void createConnection() throws AiravataException {
+//        try {
+//            ConnectionFactory connectionFactory = new ConnectionFactory();
+//            connectionFactory.setUri(url);
+//            connectionFactory.setAutomaticRecoveryEnabled(true);
+//            connection = connectionFactory.newConnection();
+//            connection.addShutdownListener(new ShutdownListener() {
+//                public void shutdownCompleted(ShutdownSignalException cause) {
+//                }
+//            });
+//            log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
+//
+//            channel = connection.createChannel();
+//            channel.basicQos(prefetchCount);
+//
+////            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+//
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public void reconnect() throws AiravataException{
+//        if(messageHandler!=null) {
+//            try {
+//                listen(messageHandler);
+//            } catch (AiravataException e) {
+//                String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+//                log.error(msg);
+//                throw new AiravataException(msg, e);
+//
 //            }
-            // autoAck=false, we will ack after task is done
-            channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag,
-                                           Envelope envelope,
-                                           AMQP.BasicProperties properties,
-                                           byte[] body) {
-                    Message message = new Message();
-
-                    try {
-                        ThriftUtils.createThriftFromBytes(body, message);
-                        TBase event = null;
-                        String gatewayId = null;
-                        long deliveryTag = envelope.getDeliveryTag();
-	                    if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
-		                    ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
-		                    ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
-		                    log.debug(" Message Received with message id '" + message.getMessageId()
-				                    + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
-				                    " " +
-				                    processSubmitEvent.getProcessId());
-		                    event = processSubmitEvent;
-		                    gatewayId = processSubmitEvent.getGatewayId();
-		                    MessageContext messageContext = new MessageContext(event, message.getMessageType(),
-				                    message.getMessageId(), gatewayId, deliveryTag);
-		                    messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-		                    messageContext.setIsRedeliver(envelope.isRedeliver());
-		                    handler.onMessage(messageContext);
-	                    } else {
-		                    log.error("{} message type is not handle in ProcessLaunch Consumer. Sending ack for " +
-				                    "delivery tag {} ", message.getMessageType().name(), deliveryTag);
-		                    sendAck(deliveryTag);
-	                    }
-                    } catch (TException e) {
-                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
-                        log.warn(msg, e);
-                    }
-                }
-
-                @Override
-                public void handleCancel(String consumerTag) throws IOException {
-                    super.handleCancel(consumerTag);
-                    log.info("Consumer cancelled : " + consumerTag);
-                }
-            });
-
-            // save the name for deleting the queue
-            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
-            return id;
-        } catch (Exception e) {
-            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void stopListen(final String id) throws AiravataException {
-        QueueDetails details = queueDetailsMap.get(id);
-        if (details != null) {
-            try {
-                for (String key : details.getRoutingKeys()) {
-                    channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
-                }
-            } catch (IOException e) {
-                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
-                log.debug(msg);
-            }
-        }
-    }
-
-    /**
-     * Private class for holding some information about the consumers registered
-     */
-    private class QueueDetails {
-        String queueName;
-
-        List<String> routingKeys;
-
-        private QueueDetails(String queueName, List<String> routingKeys) {
-            this.queueName = queueName;
-            this.routingKeys = routingKeys;
-        }
-
-        public String getQueueName() {
-            return queueName;
-        }
-
-        public List<String> getRoutingKeys() {
-            return routingKeys;
-        }
-    }
-
-    private String getId(List<String> routingKeys, String queueName) {
-        String id = "";
-        for (String key : routingKeys) {
-            id = id + "_" + key;
-        }
-        return id + "_" + queueName;
-    }
-
-    public void close() {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (IOException ignore) {
-            }
-        }
-    }
-    public boolean isOpen(){
-        if(connection!=null){
-            return connection.isOpen();
-        }
-        return false;
-    }
-
-    public void sendAck(long deliveryTag){
-        try {
-            if (channel.isOpen()){
-                channel.basicAck(deliveryTag,false);
-            }else {
-                channel = connection.createChannel();
-                channel.basicQos(prefetchCount);
-                channel.basicAck(deliveryTag, false);
-            }
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-}
+//        }
+//    }
+//    public String listen(final MessageHandler handler) throws AiravataException {
+//        try {
+//            messageHandler = handler;
+//            Map<String, Object> props = handler.getProperties();
+//            final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+//            if (routing == null) {
+//                throw new IllegalArgumentException("The routing key must be present");
+//            }
+//            List<String> keys = new ArrayList<String>();
+//            if (routing instanceof List) {
+//                for (Object o : (List)routing) {
+//                    keys.add(o.toString());
+//                }
+//            } else if (routing instanceof String) {
+//                keys.add((String) routing);
+//            }
+//
+//            String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+//            String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+//            if (queueName == null) {
+//                if (!channel.isOpen()) {
+//                    channel = connection.createChannel();
+//                    channel.basicQos(prefetchCount);
+////                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+//                }
+//                queueName = channel.queueDeclare().getQueue();
+//            } else {
+//
+//                channel.queueDeclare(queueName, durableQueue, false, false, null);
+//            }
+//
+//            final String id = getId(keys, queueName);
+//            if (queueDetailsMap.containsKey(id)) {
+//                throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
+//                        "cannot define the same subscriber twice");
+//            }
+//
+//            if (consumerTag == null) {
+//                consumerTag = "default";
+//            }
+//
+//            // bind all the routing keys
+////            for (String routingKey : keys) {
+////                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+////            }
+//            // autoAck=false, we will ack after task is done
+//            channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
+//                @Override
+//                public void handleDelivery(String consumerTag,
+//                                           Envelope envelope,
+//                                           AMQP.BasicProperties properties,
+//                                           byte[] body) {
+//                    Message message = new Message();
+//
+//                    try {
+//                        ThriftUtils.createThriftFromBytes(body, message);
+//                        TBase event = null;
+//                        String gatewayId = null;
+//                        long deliveryTag = envelope.getDeliveryTag();
+//	                    if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+//		                    ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+//		                    ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+//		                    log.debug(" Message Received with message id '" + message.getMessageId()
+//				                    + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
+//				                    " " +
+//				                    processSubmitEvent.getProcessId());
+//		                    event = processSubmitEvent;
+//		                    gatewayId = processSubmitEvent.getGatewayId();
+//		                    MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+//				                    message.getMessageId(), gatewayId, deliveryTag);
+//		                    messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+//		                    messageContext.setIsRedeliver(envelope.isRedeliver());
+//		                    handler.onMessage(messageContext);
+//	                    } else {
+//		                    log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
+//				                    "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+//		                    sendAck(deliveryTag);
+//	                    }
+//                    } catch (TException e) {
+//                        String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+//                        log.warn(msg, e);
+//                    }
+//                }
+//
+//                @Override
+//                public void handleCancel(String consumerTag) throws IOException {
+//                    super.handleCancel(consumerTag);
+//                    log.info("Subscriber cancelled : " + consumerTag);
+//                }
+//            });
+//
+//            // save the name for deleting the queue
+//            queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+//            return id;
+//        } catch (Exception e) {
+//            String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public void stopListen(final String id) throws AiravataException {
+//        QueueDetails details = queueDetailsMap.get(id);
+//        if (details != null) {
+//            try {
+//                for (String key : details.getRoutingKeys()) {
+//                    channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
+//                }
+//            } catch (IOException e) {
+//                String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
+//                log.debug(msg);
+//            }
+//        }
+//    }
+//
+//    /**
+//     * Private class for holding some information about the consumers registered
+//     */
+//    private class QueueDetails {
+//        String queueName;
+//
+//        List<String> routingKeys;
+//
+//        private QueueDetails(String queueName, List<String> routingKeys) {
+//            this.queueName = queueName;
+//            this.routingKeys = routingKeys;
+//        }
+//
+//        public String getQueueName() {
+//            return queueName;
+//        }
+//
+//        public List<String> getRoutingKeys() {
+//            return routingKeys;
+//        }
+//    }
+//
+//    private String getId(List<String> routingKeys, String queueName) {
+//        String id = "";
+//        for (String key : routingKeys) {
+//            id = id + "_" + key;
+//        }
+//        return id + "_" + queueName;
+//    }
+//
+//    public void close() {
+//        if (connection != null) {
+//            try {
+//                connection.close();
+//            } catch (IOException ignore) {
+//            }
+//        }
+//    }
+//    public boolean isOpen(){
+//        if(connection!=null){
+//            return connection.isOpen();
+//        }
+//        return false;
+//    }
+//
+//    public void sendAck(long deliveryTag){
+//        try {
+//            if (channel.isOpen()){
+//                channel.basicAck(deliveryTag,false);
+//            }else {
+//                channel = connection.createChannel();
+//                channel.basicQos(prefetchCount);
+//                channel.basicAck(deliveryTag, false);
+//            }
+//        } catch (IOException e) {
+//            logger.error(e.getMessage(), e);
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
index e488f26..5cf960e 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
@@ -42,7 +42,7 @@ public class RabbitMQProcessLaunchPublisher implements Publisher{
         String brokerUrl;
         try {
             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            launchTask = ServerSettings.getLaunchQueueName();
+            launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName();
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
             log.error(message, e);


[4/4] airavata git commit: Merge branch 'messaging-refactor' into develop

Posted by sh...@apache.org.
Merge branch 'messaging-refactor' into develop


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/20b3d251
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/20b3d251
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/20b3d251

Branch: refs/heads/develop
Commit: 20b3d251a2ab6d0c97e4460b5743de8a9655db04
Parents: 4157065 e247b00
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Tue Aug 9 18:19:22 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Tue Aug 9 18:19:22 2016 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/ServerSettings.java   |  42 +-
 .../main/resources/airavata-server.properties   |  12 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |  26 +-
 .../apache/airavata/gfac/impl/GFacWorker.java   |   3 +-
 .../airavata/gfac/server/GfacServerHandler.java |  82 +--
 .../messaging/client/RabbitMQListener.java      | 258 +++++----
 .../airavata/messaging/core/Consumer.java       |  40 --
 .../airavata/messaging/core/MessageHandler.java |   4 +-
 .../messaging/core/MessagingConstants.java      |   3 +-
 .../messaging/core/MessagingFactory.java        |  88 +++
 .../airavata/messaging/core/Metadata.java       |  25 -
 .../airavata/messaging/core/Publisher.java      |   1 +
 .../airavata/messaging/core/Subscriber.java     |  58 ++
 .../messaging/core/SubscriberProperties.java    | 125 ++++
 .../airavata/messaging/core/TestClient.java     |  56 +-
 .../messaging/core/impl/ExperimentConsumer.java |  42 ++
 .../messaging/core/impl/ProcessConsumer.java    | 114 ++++
 .../impl/RabbitMQProcessLaunchConsumer.java     | 574 +++++++++----------
 .../impl/RabbitMQProcessLaunchPublisher.java    |   2 +-
 .../core/impl/RabbitMQStatusConsumer.java       | 286 ---------
 .../core/impl/RabbitMQStatusSubscriber.java     | 287 ++++++++++
 .../messaging/core/impl/RabbitMQSubscriber.java | 189 ++++++
 .../messaging/core/impl/StatusConsumer.java     | 143 +++++
 .../server/OrchestratorServerHandler.java       |  47 +-
 .../ExperimentExecution.java                    | 391 +++++++------
 .../workflow/core/WorkflowEnactmentService.java |  60 +-
 .../workflow/core/WorkflowInterpreter.java      |  21 +-
 27 files changed, 1831 insertions(+), 1148 deletions(-)
----------------------------------------------------------------------