You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/25 21:24:38 UTC

git commit: updating publish method in listeners

Repository: airavata
Updated Branches:
  refs/heads/messaging_framework 61214dbfa -> cc71bac08


updating publish method in listeners


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cc71bac0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cc71bac0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cc71bac0

Branch: refs/heads/messaging_framework
Commit: cc71bac08a8539da4190eb4ef81b850fc1d75743
Parents: 61214db
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Thu Sep 25 15:24:33 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Thu Sep 25 15:24:33 2014 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        | 28 +++++++++---------
 modules/commons/utils/pom.xml                   |  5 ++++
 .../airavata/common/utils/ServerSettings.java   |  7 +++++
 .../main/resources/airavata-server.properties   |  6 ++--
 .../core/monitor/AiravataJobStatusUpdator.java  | 28 ++++++++----------
 .../core/monitor/AiravataTaskStatusUpdator.java | 28 +++++++++---------
 .../AiravataWorkflowNodeStatusUpdator.java      | 30 ++++++++++----------
 .../airavata/messaging/core/MessageContext.java | 21 ++++++++++++--
 .../messaging/core/impl/RabbitMQPublisher.java  | 11 +++++--
 9 files changed, 96 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index c692a8c..e448fec 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -20,16 +20,17 @@
 */
 package org.apache.airavata.api.server.listener;
 
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.api.server.util.DataModelUtils;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
 import org.apache.airavata.model.util.ExecutionType;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
@@ -38,7 +39,7 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
 
 public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
@@ -90,15 +91,12 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 			logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString());
 			monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
             ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId());
-            Message message = new Message();
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(experimentStatusChangeEvent);
-            message.setEvent(baos.toByteArray());
-            message.setMessageType(MessageType.EXPERIMENT);
-            message.setMessageLevel(MessageLevel.INFO);
-            message.setMessageId(AiravataUtils.getId("EXP"));
-            publisher.publish(message);
+            String messageId = AiravataUtils.getId("EXPERIMENT");
+            MessageContext msgCntxt = new MessageContext(experimentStatusChangeEvent, MessageType.EXPERIMENT, messageId);
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            if ( ServerSettings.isRabbitMqPublishEnabled()){
+                publisher.publish(msgCntxt);
+            }
 		} catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
             throw new Exception("Error persisting experiment status..", e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index bb198ed..5b93732 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -132,6 +132,11 @@
             <artifactId>zookeeper</artifactId>
             <version>3.4.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>${thrift.version}</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 6594ecc..94a6b07 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -68,6 +68,7 @@ public class ServerSettings extends ApplicationSettings{
     private static final String MY_PROXY_LIFETIME = "myproxy.life";
     private static final String ACTIVITY_PUBLISHER = "activity.publisher";
     private static final String ACTIVITY_LISTENERS = "activity.listeners";
+    public static final String PUBLISH_RABBITMQ = "publish.rabbitmq";
 
     private static boolean stopAllThreads = false;
 
@@ -224,6 +225,12 @@ public class ServerSettings extends ApplicationSettings{
     public static String getActivityPublisher() throws ApplicationSettingsException{
         return getSetting(ACTIVITY_PUBLISHER);
     }
+
+    public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{
+        String setting = getSetting(PUBLISH_RABBITMQ);
+        return Boolean.parseBoolean(setting);
+    }
+
     public static boolean isEmbeddedZK() {
         return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 51d71e1..9e54f9f 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -186,10 +186,12 @@ monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apach
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
 #publisher
-activity.publisher=org.apache.airavata.messaging.core.impl.AiravataRabbitMQPublisher
+activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
+publish.rabbitmq=false
+activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
 rabbitmq.broker.url=http://localhost
+rabbitmq.exchange.name=airavata_rabbitmq_exchange
 
 ###---------------------------Orchestrator module Configurations---------------------------###
 #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index 4f1f861..c31aee3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -20,15 +20,15 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.registry.cpi.CompositeIdentifier;
@@ -37,11 +37,10 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
 
 public class AiravataJobStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
-
     private Registry airavataRegistry;
 
     private MonitorPublisher monitorPublisher;
@@ -71,15 +70,12 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
     			logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString());
             	monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
                 JobStatusChangeEvent changeEvent = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
-                Message message = new Message();
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                ObjectOutputStream oos = new ObjectOutputStream(baos);
-                oos.writeObject(changeEvent);
-                message.setEvent(baos.toByteArray());
-                message.setMessageType(MessageType.JOB);
-                message.setMessageLevel(MessageLevel.INFO);
-                message.setMessageId(AiravataUtils.getId("JOB"));
-                publisher.publish(message);
+                String messageId = AiravataUtils.getId("JOB");
+                MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.JOB, messageId);
+                msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                if ( ServerSettings.isRabbitMqPublishEnabled()){
+                    publisher.publish(msgCntxt);
+                }
             } catch (Exception e) {
                 logger.error("Error persisting data" + e.getLocalizedMessage(), e);
                 throw new Exception("Error persisting job status..", e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index 4e9bf29..c9a9b03 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -20,16 +20,17 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.TaskIdentity;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.registry.cpi.Registry;
@@ -37,7 +38,7 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
 
 public class AiravataTaskStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
@@ -97,15 +98,12 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
                                                          jobStatus.getJobIdentity().getExperimentId());
             monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
             TaskStatusChangeEvent changeEvent = new TaskStatusChangeEvent(state, taskIdentity);
-            Message message = new Message();
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(changeEvent);
-            message.setEvent(baos.toByteArray());
-            message.setMessageType(MessageType.TASK);
-            message.setMessageLevel(MessageLevel.INFO);
-            message.setMessageId(AiravataUtils.getId("TASK"));
-            publisher.publish(message);
+            String messageId = AiravataUtils.getId("TASK");
+            MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.TASK, messageId);
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            if ( ServerSettings.isRabbitMqPublishEnabled()){
+                publisher.publish(msgCntxt);
+            }
 
         } catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index f2ef855..5632ef9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -20,15 +20,17 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.WorkflowIdentity;
+import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
@@ -37,7 +39,7 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
 
 public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
@@ -82,15 +84,13 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
             WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId());
             monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
             WorkflowNodeStatusChangeEvent changeEvent = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
-            Message message = new Message();
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(changeEvent);
-            message.setEvent(baos.toByteArray());
-            message.setMessageType(MessageType.WORKFLOWNODE);
-            message.setMessageLevel(MessageLevel.INFO);
-            message.setMessageId(AiravataUtils.getId("NODE"));
-            publisher.publish(message);
+            String messageId = AiravataUtils.getId("WFNODE");
+            MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.WORKFLOWNODE, messageId);
+            msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+
+            if ( ServerSettings.isRabbitMqPublishEnabled()){
+                publisher.publish(msgCntxt);
+            }
 		} catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
             throw new Exception("Error persisting workflow node status..", e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
index 48fff59..00a22bb 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -24,14 +24,19 @@ package org.apache.airavata.messaging.core;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.thrift.TBase;
 
+import java.sql.Timestamp;
+
 public class MessageContext {
     private final TBase event;
-
     private final MessageType type;
+    private final String messageId;
+    private Timestamp updatedTime;
+
 
-    public MessageContext(TBase message, MessageType type) {
+    public MessageContext(TBase message, MessageType type, String messageId) {
         this.event = message;
         this.type = type;
+        this.messageId = messageId;
     }
 
     public TBase getEvent() {
@@ -41,4 +46,16 @@ public class MessageContext {
     public MessageType getType() {
         return type;
     }
+
+    public Timestamp getUpdatedTime() {
+        return updatedTime;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+
+    public void setUpdatedTime(Timestamp updatedTime) {
+        this.updatedTime = updatedTime;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
index d9ad7e4..490d33c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -39,6 +39,7 @@ public class RabbitMQPublisher implements Publisher {
 
     private RabbitMQProducer rabbitMQProducer;
 
+
     public RabbitMQPublisher() throws Exception {
         String brokerUrl;
         String exchangeName;
@@ -51,14 +52,17 @@ public class RabbitMQPublisher implements Publisher {
             throw new AiravataException(message, e);
         }
         rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
-        rabbitMQProducer.open();
     }
 
     public void publish(MessageContext msgCtx) throws AiravataException {
         try {
-            byte []body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+            rabbitMQProducer.open();
+            byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
             Message message = new Message();
             message.setEvent(body);
+            message.setMessageId(msgCtx.getMessageId());
+            message.setMessageType(msgCtx.getType());
+            message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
             String routingKey = null;
             if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
                 ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
@@ -79,7 +83,8 @@ public class RabbitMQPublisher implements Publisher {
                         identity.getTaskId() + "." +
                         identity.getJobId();
             }
-            rabbitMQProducer.send(body, routingKey);
+            byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+            rabbitMQProducer.send(messageBody, routingKey);
         } catch (TException e) {
             String msg = "Error while deserializing the object";
             log.error(msg, e);