You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/10 19:19:40 UTC

[1/2] airavata git commit: Refactored messaging module to remove duplicate code and support multiple publishers

Repository: airavata
Updated Branches:
  refs/heads/develop 3a70c5905 -> 12d9efb90


Refactored messaging module to remove duplicate code and support multiple publishers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e4cc54d9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e4cc54d9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e4cc54d9

Branch: refs/heads/develop
Commit: e4cc54d923d406542ed5d007aa1fb48a8094d29f
Parents: 2df4bb2
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Aug 10 15:18:53 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Aug 10 15:18:53 2016 -0400

----------------------------------------------------------------------
 .../org/apache/airavata/gfac/impl/Factory.java  |   9 +-
 .../airavata/gfac/server/GfacServerHandler.java |   6 +-
 .../messaging/client/RabbitMQListener.java      |   3 +-
 .../messaging/core/MessagingFactory.java        |  86 +++-
 .../airavata/messaging/core/Publisher.java      |   4 +-
 .../messaging/core/RabbitMQProperties.java      | 125 ++++++
 .../airavata/messaging/core/Subscriber.java     |   5 -
 .../messaging/core/SubscriberProperties.java    | 125 ------
 .../airavata/messaging/core/TestClient.java     |   2 +-
 .../apache/airavata/messaging/core/Type.java    |  27 ++
 .../impl/RabbitMQProcessLaunchPublisher.java    | 156 +++----
 .../messaging/core/impl/RabbitMQProducer.java   | 440 +++++++++----------
 .../messaging/core/impl/RabbitMQPublisher.java  | 118 +++++
 .../core/impl/RabbitMQStatusPublisher.java      | 212 ++++-----
 .../messaging/core/impl/RabbitMQSubscriber.java |   7 +-
 .../server/OrchestratorServerHandler.java       |   3 +-
 .../ExperimentExecution.java                    |   3 +-
 .../workflow/core/WorkflowEnactmentService.java |   7 +-
 .../workflow/core/WorkflowInterpreter.java      |   8 +-
 19 files changed, 777 insertions(+), 569 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index d105c18..673f37b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -66,7 +66,8 @@ import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
 import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
+import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
 import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
@@ -145,9 +146,9 @@ public abstract class Factory {
 
 	public static Publisher getStatusPublisher() throws AiravataException {
 		if (statusPublisher == null) {
-			synchronized (RabbitMQStatusPublisher.class) {
+			synchronized (RabbitMQPublisher.class) {
 				if (statusPublisher == null) {
-					statusPublisher = new RabbitMQStatusPublisher();
+					statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
 				}
 			}
 		}
@@ -169,7 +170,7 @@ public abstract class Factory {
 
 	public static synchronized  Subscriber getProcessLaunchSubscriber() throws AiravataException {
 		if (processLaunchSubscriber == null) {
-			processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Subscriber.Type.PROCESS_LAUNCH);
+			processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Type.PROCESS_LAUNCH);
 		}
 		return processLaunchSubscriber;
 	}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index a490d91..44073dc 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -39,7 +39,7 @@ import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
+import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessIdentifier;
 import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
@@ -98,9 +98,9 @@ public class GfacServerHandler implements GfacService.Iface {
 	    // init process consumer
         List<String> routingKeys = new ArrayList<>();
         routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName());
-        processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Subscriber.Type.PROCESS_LAUNCH);
+        processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Type.PROCESS_LAUNCH);
         // init status publisher
-	    statusPublisher = new RabbitMQStatusPublisher();
+	    statusPublisher = Factory.getStatusPublisher();
     }
 
     private void startCuratorClient() throws ApplicationSettingsException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
index 984aa59..9003897 100644
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
@@ -27,6 +27,7 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
@@ -69,7 +70,7 @@ public class RabbitMQListener {
             System.out.println("broker url " + brokerUrl);
             final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
             List<String> routingKeys = getRoutingKeys(level);
-            Subscriber subscriber = MessagingFactory.getSubscriber(null, routingKeys, Subscriber.Type.STATUS);
+            Subscriber subscriber = MessagingFactory.getSubscriber(message -> {}, routingKeys, Type.STATUS);
         } catch (ApplicationSettingsException e) {
             logger.error("Error reading airavata server properties", e);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
index ee68d0c..99c11b8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -23,29 +23,37 @@ package org.apache.airavata.messaging.core;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.messaging.core.impl.ProcessConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQPublisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQSubscriber;
 import org.apache.airavata.messaging.core.impl.StatusConsumer;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 
-import java.util.ArrayList;
 import java.util.List;
 
 public class MessagingFactory {
 
-    public static Subscriber getSubscriber(final MessageHandler messageHandler,List<String> routingKeys, Subscriber.Type type) throws AiravataException {
+    public static Subscriber getSubscriber(final MessageHandler messageHandler,List<String> routingKeys, Type type) throws AiravataException {
         Subscriber subscriber = null;
-        SubscriberProperties sp = getSubscriberProperties();
+        RabbitMQProperties rProperties = getProperties();
 
         switch (type) {
             case EXPERIMENT_LAUNCH:
                 break;
             case PROCESS_LAUNCH:
-                subscriber = getProcessSubscriber(sp);
+                subscriber = getProcessSubscriber(rProperties);
                 subscriber.listen((connection ,channel) -> new ProcessConsumer(messageHandler, connection, channel),
                         null,
                         routingKeys);
                 break;
             case STATUS:
-                subscriber = getStatusSubscriber(sp);
+                subscriber = getStatusSubscriber(rProperties);
                 subscriber.listen((connection, channel) -> new StatusConsumer(messageHandler, connection, channel),
                         null,
                         routingKeys);
@@ -57,24 +65,53 @@ public class MessagingFactory {
         return subscriber;
     }
 
-    private static SubscriberProperties getSubscriberProperties() {
-        return new SubscriberProperties()
+    public static Publisher getPublisher(Type type) throws AiravataException {
+        RabbitMQProperties rProperties = getProperties();
+        Publisher publiser = null;
+        switch (type) {
+            case EXPERIMENT_LAUNCH:
+                break;
+            case PROCESS_LAUNCH:
+                publiser = gerProcessPublisher(rProperties);
+                break;
+            case STATUS:
+                publiser = getStatusPublisher(rProperties);
+                break;
+            default:
+                throw new IllegalArgumentException("Publisher " + type + " is not handled");
+        }
+
+        return publiser;
+    }
+
+    private static Publisher getStatusPublisher(RabbitMQProperties rProperties) throws AiravataException {
+        rProperties.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName());
+        return new RabbitMQPublisher(rProperties, MessagingFactory::statusRoutingkey);
+    }
+
+    private static Publisher gerProcessPublisher(RabbitMQProperties rProperties) throws AiravataException {
+        rProperties.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName());
+        return new RabbitMQPublisher(rProperties, messageContext -> rProperties.getExchangeName());
+    }
+
+    private static RabbitMQProperties getProperties() {
+        return new RabbitMQProperties()
                 .setBrokerUrl(ServerSettings.RABBITMQ_BROKER_URL)
                 .setDurable(ServerSettings.getRabbitmqDurableQueue())
                 .setPrefetchCount(ServerSettings.getRabbitmqPrefetchCount())
                 .setAutoRecoveryEnable(true)
                 .setConsumerTag("default")
-                .setExchangeType(SubscriberProperties.EXCHANGE_TYPE.TOPIC);
+                .setExchangeType(RabbitMQProperties.EXCHANGE_TYPE.TOPIC);
     }
 
-    private static RabbitMQSubscriber getStatusSubscriber(SubscriberProperties sp) throws AiravataException {
+    private static RabbitMQSubscriber getStatusSubscriber(RabbitMQProperties sp) throws AiravataException {
         sp.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName())
                 .setAutoAck(true);
         return new RabbitMQSubscriber(sp);
     }
 
 
-    private static RabbitMQSubscriber getProcessSubscriber(SubscriberProperties sp) throws AiravataException {
+    private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException {
         sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName())
                 .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName())
                 .setAutoAck(false);
@@ -82,7 +119,34 @@ public class MessagingFactory {
     }
 
 
-
+    private static String statusRoutingkey(MessageContext msgCtx) {
+        String gatewayId = msgCtx.getGatewayId();
+        String routingKey = null;
+        if (msgCtx.getType() == MessageType.EXPERIMENT) {
+            ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
+            routingKey = gatewayId + "." + event.getExperimentId();
+        } else if (msgCtx.getType() == MessageType.TASK) {
+            TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+            routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+                    event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+        } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
+            TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+            routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+                    event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+        } else if (msgCtx.getType() == MessageType.PROCESS) {
+            ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent();
+            ProcessIdentifier processIdentifier = event.getProcessIdentity();
+            routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId();
+        } else if (msgCtx.getType() == MessageType.JOB) {
+            JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
+            JobIdentifier identity = event.getJobIdentity();
+            routingKey = gatewayId + "." + identity.getExperimentId() + "." +
+                    identity.getProcessId() + "." +
+                    identity.getTaskId() + "." +
+                    identity.getJobId();
+        }
+        return routingKey;
+    }
 
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index b8b586c..28a9a06 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -31,8 +31,8 @@ public interface Publisher {
 
     /**
      *
-     * @param message object of message context which will include actual event and other information
+     * @param messageContext object of message context which will include actual event and other information
      * @throws AiravataException
      */
-    public void publish(MessageContext message) throws AiravataException;
+    public void publish(MessageContext messageContext) throws AiravataException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java
new file mode 100644
index 0000000..5bb1929
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/RabbitMQProperties.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+public class RabbitMQProperties {
+    private String brokerUrl;
+    private EXCHANGE_TYPE exchangeType;
+    private String exchangeName;
+    private int prefetchCount;
+    private boolean durable;
+    private String queueName;
+    private String consumerTag = "default";
+    private boolean autoRecoveryEnable;
+    private boolean autoAck;
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public RabbitMQProperties setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+        return this;
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public RabbitMQProperties setDurable(boolean durable) {
+        this.durable = durable;
+        return this;
+    }
+
+    public String getExchangeName() {
+        return exchangeName;
+    }
+
+    public RabbitMQProperties setExchangeName(String exchangeName) {
+        this.exchangeName = exchangeName;
+        return this;
+    }
+
+    public int getPrefetchCount() {
+        return prefetchCount;
+    }
+
+    public RabbitMQProperties setPrefetchCount(int prefetchCount) {
+        this.prefetchCount = prefetchCount;
+        return this;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public RabbitMQProperties setQueueName(String queueName) {
+        this.queueName = queueName;
+        return this;
+    }
+
+    public String getConsumerTag() {
+        return consumerTag;
+    }
+
+    public RabbitMQProperties setConsumerTag(String consumerTag) {
+        this.consumerTag = consumerTag;
+        return this;
+    }
+
+    public boolean isAutoRecoveryEnable() {
+        return autoRecoveryEnable;
+    }
+
+    public RabbitMQProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) {
+        this.autoRecoveryEnable = autoRecoveryEnable;
+        return this;
+    }
+
+    public String getExchangeType() {
+        return exchangeType.type;
+    }
+
+    public RabbitMQProperties setExchangeType(EXCHANGE_TYPE exchangeType) {
+        this.exchangeType = exchangeType;
+        return this;
+    }
+
+    public boolean isAutoAck() {
+        return autoAck;
+    }
+
+    public RabbitMQProperties setAutoAck(boolean autoAck) {
+        this.autoAck = autoAck;
+        return this;
+    }
+
+    public enum EXCHANGE_TYPE{
+        TOPIC("topic"),
+        FANOUT("fanout");
+
+        private String type;
+
+        EXCHANGE_TYPE(String type) {
+            this.type = type;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
index 7952cb3..cc357a0 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Subscriber.java
@@ -50,9 +50,4 @@ public interface Subscriber {
 
     void sendAck(long deliveryTag);
 
-    enum Type {
-        EXPERIMENT_LAUNCH,
-        PROCESS_LAUNCH,
-        STATUS
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
deleted file mode 100644
index 025e93b..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/SubscriberProperties.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.messaging.core;
-
-public class SubscriberProperties {
-    private String brokerUrl;
-    private EXCHANGE_TYPE exchangeType;
-    private String exchangeName;
-    private int prefetchCount;
-    private boolean durable;
-    private String queueName;
-    private String consumerTag = "default";
-    private boolean autoRecoveryEnable;
-    private boolean autoAck;
-
-    public String getBrokerUrl() {
-        return brokerUrl;
-    }
-
-    public SubscriberProperties setBrokerUrl(String brokerUrl) {
-        this.brokerUrl = brokerUrl;
-        return this;
-    }
-
-    public boolean isDurable() {
-        return durable;
-    }
-
-    public SubscriberProperties setDurable(boolean durable) {
-        this.durable = durable;
-        return this;
-    }
-
-    public String getExchangeName() {
-        return exchangeName;
-    }
-
-    public SubscriberProperties setExchangeName(String exchangeName) {
-        this.exchangeName = exchangeName;
-        return this;
-    }
-
-    public int getPrefetchCount() {
-        return prefetchCount;
-    }
-
-    public SubscriberProperties setPrefetchCount(int prefetchCount) {
-        this.prefetchCount = prefetchCount;
-        return this;
-    }
-
-    public String getQueueName() {
-        return queueName;
-    }
-
-    public SubscriberProperties setQueueName(String queueName) {
-        this.queueName = queueName;
-        return this;
-    }
-
-    public String getConsumerTag() {
-        return consumerTag;
-    }
-
-    public SubscriberProperties setConsumerTag(String consumerTag) {
-        this.consumerTag = consumerTag;
-        return this;
-    }
-
-    public boolean isAutoRecoveryEnable() {
-        return autoRecoveryEnable;
-    }
-
-    public SubscriberProperties setAutoRecoveryEnable(boolean autoRecoveryEnable) {
-        this.autoRecoveryEnable = autoRecoveryEnable;
-        return this;
-    }
-
-    public String getExchangeType() {
-        return exchangeType.type;
-    }
-
-    public SubscriberProperties setExchangeType(EXCHANGE_TYPE exchangeType) {
-        this.exchangeType = exchangeType;
-        return this;
-    }
-
-    public boolean isAutoAck() {
-        return autoAck;
-    }
-
-    public SubscriberProperties setAutoAck(boolean autoAck) {
-        this.autoAck = autoAck;
-        return this;
-    }
-
-    public enum EXCHANGE_TYPE{
-        TOPIC("topic"),
-        FANOUT("fanout");
-
-        private String type;
-
-        EXCHANGE_TYPE(String type) {
-            this.type = type;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index daa9886..65781fe 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -46,7 +46,7 @@ public class TestClient {
             List<String> routingKeys = new ArrayList<>();
             routingKeys.add(experimentId);
             routingKeys.add(experimentId + ".*");
-            MessagingFactory.getSubscriber(getMessageHandler(),routingKeys,  Subscriber.Type.STATUS);
+            MessagingFactory.getSubscriber(getMessageHandler(),routingKeys,  Type.STATUS);
         } catch (ApplicationSettingsException e) {
             logger.error("Error reading airavata server properties", e);
         }catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java
new file mode 100644
index 0000000..6980e1c
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Type.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core;
+
+public enum Type {
+    EXPERIMENT_LAUNCH,
+    PROCESS_LAUNCH,
+    STATUS
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
index 5cf960e..ba4b6f2 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
@@ -1,78 +1,78 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMQProcessLaunchPublisher implements Publisher{
-    private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class);
-    private  String launchTask;
-    
-    private RabbitMQProducer rabbitMQProducer;
-
-    public RabbitMQProcessLaunchPublisher() throws Exception {
-        String brokerUrl;
-        try {
-            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName();
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-        rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
-        rabbitMQProducer.open();
-    }
-
-    public void publish(MessageContext msgCtx) throws AiravataException {
-        try {
-            log.info("Publishing to launch queue ...");
-            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
-            Message message = new Message();
-            message.setEvent(body);
-            message.setMessageId(msgCtx.getMessageId());
-            message.setMessageType(msgCtx.getType());
-            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
-            String routingKey = launchTask;
-            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
-            rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
-            log.info("Successfully published to launch queue ...");
-        } catch (TException e) {
-            String msg = "Error while deserializing the object";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        } catch (Exception e) {
-            String msg = "Error while sending to rabbitmq";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        }
-    }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.messaging.core.impl;
+//
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.messaging.core.Publisher;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class RabbitMQProcessLaunchPublisher implements Publisher{
+//    private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class);
+//    private  String launchTask;
+//
+//    private RabbitMQProducer rabbitMQProducer;
+//
+//    public RabbitMQProcessLaunchPublisher() throws Exception {
+//        String brokerUrl;
+//        try {
+//            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+//            launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName();
+//        } catch (ApplicationSettingsException e) {
+//            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+//            log.error(message, e);
+//            throw new AiravataException(message, e);
+//        }
+//        rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
+//        rabbitMQProducer.open();
+//    }
+//
+//    public void publish(MessageContext msgCtx) throws AiravataException {
+//        try {
+//            log.info("Publishing to launch queue ...");
+//            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+//            Message message = new Message();
+//            message.setEvent(body);
+//            message.setMessageId(msgCtx.getMessageId());
+//            message.setMessageType(msgCtx.getType());
+//            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+//            String routingKey = launchTask;
+//            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+//            rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
+//            log.info("Successfully published to launch queue ...");
+//        } catch (TException e) {
+//            String msg = "Error while deserializing the object";
+//            log.error(msg, e);
+//            throw new AiravataException(msg, e);
+//        } catch (Exception e) {
+//            String msg = "Error while sending to rabbitmq";
+//            log.error(msg, e);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index fc494d4..73138c7 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -1,220 +1,220 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class RabbitMQProducer {
-    public static final int DEFAULT_PRE_FETCH = 64;
-
-    private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class);
-
-    private Connection connection;
-
-    private Channel channel;
-
-    private QueueingConsumer consumer;
-
-    private String consumerTag;
-
-    private String exchangeName;
-
-    private int prefetchCount = DEFAULT_PRE_FETCH;
-
-    private boolean isReQueueOnFail = false;
-
-    private String url;
-
-    private String getExchangeType = "topic";
-
-
-    public RabbitMQProducer(String url, String exchangeName,String getExchangeType) {
-        this.exchangeName = exchangeName;
-        this.url = url;
-        this.getExchangeType = getExchangeType;
-    }
-
-    public RabbitMQProducer(String url, String exchangeName) {
-        this.exchangeName = exchangeName;
-        this.url = url;
-    }
-
-    public void setPrefetchCount(int prefetchCount) {
-        this.prefetchCount = prefetchCount;
-    }
-
-    public void setReQueueOnFail(boolean isReQueueOnFail) {
-        this.isReQueueOnFail = isReQueueOnFail;
-    }
-
-    private void reset() {
-        consumerTag = null;
-    }
-
-    private void reInitIfNecessary() throws Exception {
-        if (consumerTag == null || consumer == null) {
-            close();
-            open();
-        }
-    }
-
-    public void close() {
-        log.info("Closing channel to exchange {}", exchangeName);
-        try {
-            if (channel != null && channel.isOpen()) {
-                if (consumerTag != null) {
-                    channel.basicCancel(consumerTag);
-                }
-                channel.close();
-            }
-        } catch (Exception e) {
-            log.debug("error closing channel and/or cancelling consumer", e);
-        }
-        try {
-            log.info("closing connection to rabbitmq: " + connection);
-            connection.close();
-        } catch (Exception e) {
-            log.debug("error closing connection", e);
-        }
-        consumer = null;
-        consumerTag = null;
-        channel = null;
-        connection = null;
-    }
-
-    public void open() throws AiravataException {
-        try {
-            connection = createConnection();
-            channel = connection.createChannel();
-            if (prefetchCount > 0) {
-                log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
-                channel.basicQos(prefetchCount);
-            }
-            if(exchangeName!=null) {
-                channel.exchangeDeclare(exchangeName, getExchangeType, false);
-            }
-            } catch (Exception e) {
-            reset();
-            String msg = "could not open channel for exchange " + exchangeName;
-            log.error(msg);
-            throw new AiravataException(msg, e);
-        }
-    }
-
-    public void send(byte []message, String routingKey) throws Exception {
-        try {
-            channel.basicPublish(exchangeName, routingKey, null, message);
-        } catch (IOException e) {
-            String msg = "Failed to publish message to exchange: " + exchangeName;
-            log.error(msg, e);
-            throw new Exception(msg, e);
-        }
-    }
-
-    public void sendToWorkerQueue(byte []message, String routingKey) throws Exception {
-        try {
-            channel.basicPublish( "", routingKey,
-                    MessageProperties.PERSISTENT_TEXT_PLAIN,
-                    message);
-        } catch (IOException e) {
-            String msg = "Failed to publish message to exchange: " + exchangeName;
-            log.error(msg, e);
-            throw new Exception(msg, e);
-        }
-    }
-
-    private Connection createConnection() throws IOException {
-        try {
-            ConnectionFactory connectionFactory = new ConnectionFactory();
-            connectionFactory.setUri(url);
-            connectionFactory.setAutomaticRecoveryEnabled(true);
-            Connection connection = connectionFactory.newConnection();
-            connection.addShutdownListener(new ShutdownListener() {
-                public void shutdownCompleted(ShutdownSignalException cause) {
-                }
-            });
-            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-            return connection;
-        } catch (Exception e) {
-            log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName);
-            return null;
-        }
-    }
-
-    public void ackMessage(Long msgId) throws Exception {
-        try {
-            channel.basicAck(msgId, false);
-        } catch (ShutdownSignalException sse) {
-            reset();
-            String msg = "shutdown signal received while attempting to ack message";
-            log.error(msg, sse);
-            throw new Exception(msg, sse);
-        } catch (Exception e) {
-            String s = "could not ack for msgId: " + msgId;
-            log.error(s, e);
-            throw new Exception(s, e);
-        }
-    }
-
-    public void failMessage(Long msgId) throws Exception {
-        if (isReQueueOnFail) {
-            failWithRedelivery(msgId);
-        } else {
-            deadLetter(msgId);
-        }
-    }
-
-    public void failWithRedelivery(Long msgId) throws Exception {
-        try {
-            channel.basicReject(msgId, true);
-        } catch (ShutdownSignalException sse) {
-            reset();
-            String msg = "shutdown signal received while attempting to fail with redelivery";
-            log.error(msg, sse);
-            throw new Exception(msg, sse);
-        } catch (Exception e) {
-            String msg = "could not fail with redelivery for msgId: " + msgId;
-            log.error(msg, e);
-            throw new Exception(msg, e);
-        }
-    }
-
-    public void deadLetter(Long msgId) throws Exception {
-        try {
-            channel.basicReject(msgId, false);
-        } catch (ShutdownSignalException sse) {
-            reset();
-            String msg = "shutdown signal received while attempting to fail with no redelivery";
-            log.error(msg, sse);
-            throw new Exception(msg, sse);
-        } catch (Exception e) {
-            String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId;
-            log.error(msg, e);
-            throw new Exception(msg, e);
-        }
-    }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+// */
+//
+//package org.apache.airavata.messaging.core.impl;
+//
+//import com.rabbitmq.client.*;
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//
+//public class RabbitMQProducer {
+//    public static final int DEFAULT_PRE_FETCH = 64;
+//
+//    private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class);
+//
+//    private Connection connection;
+//
+//    private Channel channel;
+//
+//    private QueueingConsumer consumer;
+//
+//    private String consumerTag;
+//
+//    private String exchangeName;
+//
+//    private int prefetchCount = DEFAULT_PRE_FETCH;
+//
+//    private boolean isReQueueOnFail = false;
+//
+//    private String url;
+//
+//    private String getExchangeType = "topic";
+//
+//
+//    public RabbitMQProducer(String url, String exchangeName,String getExchangeType) {
+//        this.exchangeName = exchangeName;
+//        this.url = url;
+//        this.getExchangeType = getExchangeType;
+//    }
+//
+//    public RabbitMQProducer(String url, String exchangeName) {
+//        this.exchangeName = exchangeName;
+//        this.url = url;
+//    }
+//
+//    public void setPrefetchCount(int prefetchCount) {
+//        this.prefetchCount = prefetchCount;
+//    }
+//
+//    public void setReQueueOnFail(boolean isReQueueOnFail) {
+//        this.isReQueueOnFail = isReQueueOnFail;
+//    }
+//
+//    private void reset() {
+//        consumerTag = null;
+//    }
+//
+//    private void reInitIfNecessary() throws Exception {
+//        if (consumerTag == null || consumer == null) {
+//            close();
+//            open();
+//        }
+//    }
+//
+//    public void close() {
+//        log.info("Closing channel to exchange {}", exchangeName);
+//        try {
+//            if (channel != null && channel.isOpen()) {
+//                if (consumerTag != null) {
+//                    channel.basicCancel(consumerTag);
+//                }
+//                channel.close();
+//            }
+//        } catch (Exception e) {
+//            log.debug("error closing channel and/or cancelling consumer", e);
+//        }
+//        try {
+//            log.info("closing connection to rabbitmq: " + connection);
+//            connection.close();
+//        } catch (Exception e) {
+//            log.debug("error closing connection", e);
+//        }
+//        consumer = null;
+//        consumerTag = null;
+//        channel = null;
+//        connection = null;
+//    }
+//
+//    public void open() throws AiravataException {
+//        try {
+//            connection = createConnection();
+//            channel = connection.createChannel();
+//            if (prefetchCount > 0) {
+//                log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
+//                channel.basicQos(prefetchCount);
+//            }
+//            if(exchangeName!=null) {
+//                channel.exchangeDeclare(exchangeName, getExchangeType, false);
+//            }
+//            } catch (Exception e) {
+//            reset();
+//            String msg = "could not open channel for exchange " + exchangeName;
+//            log.error(msg);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//
+//    public void send(byte []message, String routingKey) throws Exception {
+//        try {
+//            channel.basicPublish(exchangeName, routingKey, null, message);
+//        } catch (IOException e) {
+//            String msg = "Failed to publish message to exchange: " + exchangeName;
+//            log.error(msg, e);
+//            throw new Exception(msg, e);
+//        }
+//    }
+//
+//    public void sendToWorkerQueue(byte []message, String routingKey) throws Exception {
+//        try {
+//            channel.basicPublish( "", routingKey,
+//                    MessageProperties.PERSISTENT_TEXT_PLAIN,
+//                    message);
+//        } catch (IOException e) {
+//            String msg = "Failed to publish message to exchange: " + exchangeName;
+//            log.error(msg, e);
+//            throw new Exception(msg, e);
+//        }
+//    }
+//
+//    private Connection createConnection() throws IOException {
+//        try {
+//            ConnectionFactory connectionFactory = new ConnectionFactory();
+//            connectionFactory.setUri(url);
+//            connectionFactory.setAutomaticRecoveryEnabled(true);
+//            Connection connection = connectionFactory.newConnection();
+//            connection.addShutdownListener(new ShutdownListener() {
+//                public void shutdownCompleted(ShutdownSignalException cause) {
+//                }
+//            });
+//            log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+//            return connection;
+//        } catch (Exception e) {
+//            log.info("connection failed to rabbitmq: " + connection + " for " + exchangeName);
+//            return null;
+//        }
+//    }
+//
+//    public void ackMessage(Long msgId) throws Exception {
+//        try {
+//            channel.basicAck(msgId, false);
+//        } catch (ShutdownSignalException sse) {
+//            reset();
+//            String msg = "shutdown signal received while attempting to ack message";
+//            log.error(msg, sse);
+//            throw new Exception(msg, sse);
+//        } catch (Exception e) {
+//            String s = "could not ack for msgId: " + msgId;
+//            log.error(s, e);
+//            throw new Exception(s, e);
+//        }
+//    }
+//
+//    public void failMessage(Long msgId) throws Exception {
+//        if (isReQueueOnFail) {
+//            failWithRedelivery(msgId);
+//        } else {
+//            deadLetter(msgId);
+//        }
+//    }
+//
+//    public void failWithRedelivery(Long msgId) throws Exception {
+//        try {
+//            channel.basicReject(msgId, true);
+//        } catch (ShutdownSignalException sse) {
+//            reset();
+//            String msg = "shutdown signal received while attempting to fail with redelivery";
+//            log.error(msg, sse);
+//            throw new Exception(msg, sse);
+//        } catch (Exception e) {
+//            String msg = "could not fail with redelivery for msgId: " + msgId;
+//            log.error(msg, e);
+//            throw new Exception(msg, e);
+//        }
+//    }
+//
+//    public void deadLetter(Long msgId) throws Exception {
+//        try {
+//            channel.basicReject(msgId, false);
+//        } catch (ShutdownSignalException sse) {
+//            reset();
+//            String msg = "shutdown signal received while attempting to fail with no redelivery";
+//            log.error(msg, sse);
+//            throw new Exception(msg, sse);
+//        } catch (Exception e) {
+//            String msg = "could not fail with dead-lettering (when configured) for msgId: " + msgId;
+//            log.error(msg, e);
+//            throw new Exception(msg, e);
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
new file mode 100644
index 0000000..3fdb3a1
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.RabbitMQProperties;
+import org.apache.airavata.model.messaging.event.Message;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+public class RabbitMQPublisher implements Publisher {
+    private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class);
+    private final RabbitMQProperties properties;
+    private final Function<MessageContext, String> routingKeySupplier;
+    private Connection connection;
+    private Channel channel;
+
+    public RabbitMQPublisher(RabbitMQProperties properties, Function<MessageContext, String> routingKeySupplier) throws AiravataException {
+        this.properties = properties;
+        this.routingKeySupplier = routingKeySupplier;
+        connect();
+    }
+
+    private void connect() throws AiravataException {
+        try {
+            ConnectionFactory connectionFactory = new ConnectionFactory();
+            connectionFactory.setUri(properties.getBrokerUrl());
+            connectionFactory.setAutomaticRecoveryEnabled(properties.isAutoRecoveryEnable());
+            connection = connectionFactory.newConnection();
+            connection.addShutdownListener(new ShutdownListener() {
+                public void shutdownCompleted(ShutdownSignalException cause) {
+                }
+            });
+            log.info("connected to rabbitmq: " + connection + " for " + properties.getExchangeName());
+            channel = connection.createChannel();
+            if (properties.getPrefetchCount() > 0) {
+                channel.basicQos(properties.getPrefetchCount());
+            }
+
+            if (properties.getExchangeName() != null) {
+                channel.exchangeDeclare(properties.getExchangeName(),
+                                        properties.getExchangeType(),
+                                        false);
+            }
+        } catch (Exception e) {
+            String msg = "RabbitMQ connection issue for exchange : " + properties.getExchangeName();
+            log.error(msg);
+            throw new AiravataException(msg, e);
+        }
+
+
+    }
+
+
+    @Override
+    public void publish(MessageContext messageContext) throws AiravataException {
+        try {
+            byte[] body = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+            Message message = new Message();
+            message.setEvent(body);
+            message.setMessageId(messageContext.getMessageId());
+            message.setMessageType(messageContext.getType());
+            message.setUpdatedTime(messageContext.getUpdatedTime().getTime());
+            String routingKey = routingKeySupplier.apply(messageContext);
+            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+            send(messageBody, routingKey);
+        } catch (TException e) {
+            String msg = "Error while deserializing the object";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        } catch (Exception e) {
+            String msg = "Error while sending to rabbitmq";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        }
+    }
+
+    public void send(byte []message, String routingKey) throws Exception {
+        try {
+            channel.basicPublish(properties.getExchangeName(), routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message);
+        } catch (IOException e) {
+            String msg = "Failed to publish message to exchange: " + properties.getExchangeName();
+            log.error(msg, e);
+            throw new Exception(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index 75077e9..6f728fe 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -1,106 +1,106 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMQStatusPublisher implements Publisher {
-
-    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class);
-
-    private RabbitMQProducer rabbitMQProducer;
-
-//    StatCounter statCounter = StatCounter.getInstance();
-
-    public RabbitMQStatusPublisher() throws AiravataException {
-        String brokerUrl;
-        String exchangeName;
-        try {
-            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-        } catch (ApplicationSettingsException e) {
-            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
-            log.error(message, e);
-            throw new AiravataException(message, e);
-        }
-        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
-        rabbitMQProducer.open();
-    }
-
-    public void publish(MessageContext msgCtx) throws AiravataException {
-        try {
-            log.info("Publishing status to rabbitmq...");
-            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
-            Message message = new Message();
-            message.setEvent(body);
-            message.setMessageId(msgCtx.getMessageId());
-            message.setMessageType(msgCtx.getType());
-            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
-            String gatewayId = msgCtx.getGatewayId();
-            String routingKey = null;
-            if (msgCtx.getType() == MessageType.EXPERIMENT) {
-                ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
-                routingKey = gatewayId + "." + event.getExperimentId();
-            } else if (msgCtx.getType() == MessageType.TASK) {
-                TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
-                routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
-                        event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
-            } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
-                TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
-                routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
-                        event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
-            } else if (msgCtx.getType() == MessageType.PROCESS) {
-                ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent();
-                ProcessIdentifier processIdentifier = event.getProcessIdentity();
-                routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId();
-            } else if (msgCtx.getType() == MessageType.JOB) {
-                JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
-                JobIdentifier identity = event.getJobIdentity();
-                routingKey = gatewayId + "." + identity.getExperimentId() + "." +
-                        identity.getProcessId() + "." +
-                        identity.getTaskId() + "." +
-                        identity.getJobId();
-            }
-            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
-            rabbitMQProducer.send(messageBody, routingKey);
-//            statCounter.add(message);
-        } catch (TException e) {
-            String msg = "Error while deserializing the object";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        } catch (Exception e) {
-            String msg = "Error while sending to rabbitmq";
-            log.error(msg, e);
-            throw new AiravataException(msg, e);
-        }
-    }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *   http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+// */
+//
+//package org.apache.airavata.messaging.core.impl;
+//
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.messaging.core.Publisher;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class RabbitMQStatusPublisher implements Publisher {
+//
+//    private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class);
+//
+//    private RabbitMQProducer rabbitMQProducer;
+//
+////    StatCounter statCounter = StatCounter.getInstance();
+//
+//    public RabbitMQStatusPublisher() throws AiravataException {
+//        String brokerUrl;
+//        String exchangeName;
+//        try {
+//            brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+//            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+//        } catch (ApplicationSettingsException e) {
+//            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+//            log.error(message, e);
+//            throw new AiravataException(message, e);
+//        }
+//        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+//        rabbitMQProducer.open();
+//    }
+//
+//    public void publish(MessageContext msgCtx) throws AiravataException {
+//        try {
+//            log.info("Publishing status to rabbitmq...");
+//            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+//            Message message = new Message();
+//            message.setEvent(body);
+//            message.setMessageId(msgCtx.getMessageId());
+//            message.setMessageType(msgCtx.getType());
+//            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+//            String gatewayId = msgCtx.getGatewayId();
+//            String routingKey = null;
+//            if (msgCtx.getType() == MessageType.EXPERIMENT) {
+//                ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
+//                routingKey = gatewayId + "." + event.getExperimentId();
+//            } else if (msgCtx.getType() == MessageType.TASK) {
+//                TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+//                routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+//                        event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+//            } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
+//                TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+//                routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
+//                        event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
+//            } else if (msgCtx.getType() == MessageType.PROCESS) {
+//                ProcessStatusChangeEvent event = (ProcessStatusChangeEvent) msgCtx.getEvent();
+//                ProcessIdentifier processIdentifier = event.getProcessIdentity();
+//                routingKey = gatewayId + "." + processIdentifier.getExperimentId() + "." + processIdentifier.getProcessId();
+//            } else if (msgCtx.getType() == MessageType.JOB) {
+//                JobStatusChangeEvent event = (JobStatusChangeEvent) msgCtx.getEvent();
+//                JobIdentifier identity = event.getJobIdentity();
+//                routingKey = gatewayId + "." + identity.getExperimentId() + "." +
+//                        identity.getProcessId() + "." +
+//                        identity.getTaskId() + "." +
+//                        identity.getJobId();
+//            }
+//            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+//            rabbitMQProducer.send(messageBody, routingKey);
+////            statCounter.add(message);
+//        } catch (TException e) {
+//            String msg = "Error while deserializing the object";
+//            log.error(msg, e);
+//            throw new AiravataException(msg, e);
+//        } catch (Exception e) {
+//            String msg = "Error while sending to rabbitmq";
+//            log.error(msg, e);
+//            throw new AiravataException(msg, e);
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
index 188847f..441281d 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
@@ -27,9 +27,8 @@ import com.rabbitmq.client.Consumer;
 import com.rabbitmq.client.ShutdownListener;
 import com.rabbitmq.client.ShutdownSignalException;
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.SubscriberProperties;
+import org.apache.airavata.messaging.core.RabbitMQProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,9 +45,9 @@ public class RabbitMQSubscriber implements Subscriber {
     private Connection connection;
     private Channel channel;
     private Map<String, QueueDetail> queueDetailMap = new HashMap<>();
-    private SubscriberProperties properties;
+    private RabbitMQProperties properties;
 
-    public RabbitMQSubscriber(SubscriberProperties properties) throws AiravataException {
+    public RabbitMQSubscriber(RabbitMQProperties properties) throws AiravataException {
         this.properties = properties;
         createConnection();
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index f5c4d2a..5d02100 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -35,6 +35,7 @@ import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.PublisherFactory;
 import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -130,7 +131,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 //			routingKeys.add("*"); // listen for gateway level messages
 //			routingKeys.add("*.*"); // listen for gateway/experiment level messages
 			routingKeys.add("*.*.*"); // listen for gateway/experiment/process level messages
-			statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Subscriber.Type.STATUS);
+			statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Type.STATUS);
 			startCurator();
 		} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) {
 			log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
----------------------------------------------------------------------
diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
index fa4c3de..1e6edd8 100644
--- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
+++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.commons.ErrorModel;
@@ -187,7 +188,7 @@ public class ExperimentExecution {
         String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT);
         System.out.println("broker url " + brokerUrl);
         final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT);
-        Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Subscriber.Type.STATUS);
+        Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Type.STATUS);
     }
 
     private List<String> getRoutingKeys() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index a492ef2..33f11c8 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -25,8 +25,9 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.Subscriber;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
+import org.apache.airavata.messaging.core.Type;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessIdentifier;
 import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
@@ -55,7 +56,7 @@ public class WorkflowEnactmentService {
         workflowMap = new ConcurrentHashMap<>();
         statusSubscriber = MessagingFactory.getSubscriber((message -> executor.execute(new StatusHandler(message))),
                                                            getRoutingKeys(),
-                                                           Subscriber.Type.STATUS);
+                                                           Type.STATUS);
         // register the shutdown hook to un-bind status consumer.
         Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
     }
@@ -74,7 +75,7 @@ public class WorkflowEnactmentService {
     public void submitWorkflow(String experimentId,
                                   String credentialToken,
                                   String gatewayName,
-                                  RabbitMQProcessLaunchPublisher publisher) throws Exception {
+                                  Publisher publisher) throws Exception {
 
         WorkflowInterpreter workflowInterpreter = new WorkflowInterpreter(
                 experimentId, credentialToken,gatewayName, publisher);

http://git-wip-us.apache.org/repos/asf/airavata/blob/e4cc54d9/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
index ecfdeea..e637c29 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
@@ -22,7 +22,7 @@
 package org.apache.airavata.workflow.core;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.ComponentState;
 import org.apache.airavata.model.ComponentStatus;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -75,18 +75,18 @@ class WorkflowInterpreter {
     private Map<String, WorkflowNode> completeList = new HashMap<>();
     private Registry registry;
     private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
-    private RabbitMQProcessLaunchPublisher publisher;
+    private Publisher publisher;
     private String consumerId;
     private boolean continueWorkflow = true;
 
-    public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException {
+    public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, Publisher publisher) throws RegistryException {
         this.gatewayName = gatewayName;
         setExperiment(experimentId);
         this.credentialToken = credentialToken;
         this.publisher = publisher;
     }
 
-    public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) {
+    public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, Publisher publisher) {
         this.gatewayName = gatewayName;
         this.experiment = experiment;
         this.credentialToken = credentialStoreToken;


[2/2] airavata git commit: Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/airavata into develop

Posted by sh...@apache.org.
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/airavata into develop


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/12d9efb9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/12d9efb9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/12d9efb9

Branch: refs/heads/develop
Commit: 12d9efb90763b7b3cb20c6733804ea0a712186dd
Parents: e4cc54d 3a70c59
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Aug 10 15:19:30 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Aug 10 15:19:30 2016 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |   8 +-
 .../lib/airavata/workspace_model_types.cpp      |  22 ++++
 .../lib/airavata/workspace_model_types.h        |  12 +-
 .../lib/Airavata/Model/Workspace/Types.php      |  23 ++++
 .../apache/airavata/model/workspace/ttypes.py   |  15 ++-
 .../airavata/model/workspace/Gateway.java       | 111 ++++++++++++++++++-
 .../catalog/impl/GatewayRegistry.java           |   3 +
 .../core/experiment/catalog/model/Gateway.java  |  11 ++
 .../catalog/resources/GatewayResource.java      |  12 ++
 .../experiment/catalog/resources/Utils.java     |   3 +
 .../utils/ThriftDataModelConversion.java        |   1 +
 .../service/handler/RegistryServerHandler.java  |  12 ++
 .../workspace_model.thrift                      |   3 +-
 13 files changed, 227 insertions(+), 9 deletions(-)
----------------------------------------------------------------------