You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/25 21:24:38 UTC
git commit: updating publish method in listeners
Repository: airavata
Updated Branches:
refs/heads/messaging_framework 61214dbfa -> cc71bac08
updating publish method in listeners
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/cc71bac0
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/cc71bac0
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/cc71bac0
Branch: refs/heads/messaging_framework
Commit: cc71bac08a8539da4190eb4ef81b850fc1d75743
Parents: 61214db
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Thu Sep 25 15:24:33 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Thu Sep 25 15:24:33 2014 -0400
----------------------------------------------------------------------
.../AiravataExperimentStatusUpdator.java | 28 +++++++++---------
modules/commons/utils/pom.xml | 5 ++++
.../airavata/common/utils/ServerSettings.java | 7 +++++
.../main/resources/airavata-server.properties | 6 ++--
.../core/monitor/AiravataJobStatusUpdator.java | 28 ++++++++----------
.../core/monitor/AiravataTaskStatusUpdator.java | 28 +++++++++---------
.../AiravataWorkflowNodeStatusUpdator.java | 30 ++++++++++----------
.../airavata/messaging/core/MessageContext.java | 21 ++++++++++++--
.../messaging/core/impl/RabbitMQPublisher.java | 11 +++++--
9 files changed, 96 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index c692a8c..e448fec 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -20,16 +20,17 @@
*/
package org.apache.airavata.api.server.listener;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
import org.apache.airavata.api.server.util.DataModelUtils;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
import org.apache.airavata.model.util.ExecutionType;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
@@ -38,7 +39,7 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
@@ -90,15 +91,12 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString());
monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId());
- Message message = new Message();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(experimentStatusChangeEvent);
- message.setEvent(baos.toByteArray());
- message.setMessageType(MessageType.EXPERIMENT);
- message.setMessageLevel(MessageLevel.INFO);
- message.setMessageId(AiravataUtils.getId("EXP"));
- publisher.publish(message);
+ String messageId = AiravataUtils.getId("EXPERIMENT");
+ MessageContext msgCntxt = new MessageContext(experimentStatusChangeEvent, MessageType.EXPERIMENT, messageId);
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ if ( ServerSettings.isRabbitMqPublishEnabled()){
+ publisher.publish(msgCntxt);
+ }
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
throw new Exception("Error persisting experiment status..", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index bb198ed..5b93732 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -132,6 +132,11 @@
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>${thrift.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 6594ecc..94a6b07 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -68,6 +68,7 @@ public class ServerSettings extends ApplicationSettings{
private static final String MY_PROXY_LIFETIME = "myproxy.life";
private static final String ACTIVITY_PUBLISHER = "activity.publisher";
private static final String ACTIVITY_LISTENERS = "activity.listeners";
+ public static final String PUBLISH_RABBITMQ = "publish.rabbitmq";
private static boolean stopAllThreads = false;
@@ -224,6 +225,12 @@ public class ServerSettings extends ApplicationSettings{
public static String getActivityPublisher() throws ApplicationSettingsException{
return getSetting(ACTIVITY_PUBLISHER);
}
+
+ public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{
+ String setting = getSetting(PUBLISH_RABBITMQ);
+ return Boolean.parseBoolean(setting);
+ }
+
public static boolean isEmbeddedZK() {
return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 51d71e1..9e54f9f 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -186,10 +186,12 @@ monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apach
amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
#publisher
-activity.publisher=org.apache.airavata.messaging.core.impl.AiravataRabbitMQPublisher
+activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
+publish.rabbitmq=false
+activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
rabbitmq.broker.url=http://localhost
+rabbitmq.exchange.name=airavata_rabbitmq_exchange
###---------------------------Orchestrator module Configurations---------------------------###
#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index 4f1f861..c31aee3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -20,15 +20,15 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.registry.cpi.CompositeIdentifier;
@@ -37,11 +37,10 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
public class AiravataJobStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
-
private Registry airavataRegistry;
private MonitorPublisher monitorPublisher;
@@ -71,15 +70,12 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString());
monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
JobStatusChangeEvent changeEvent = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity());
- Message message = new Message();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(changeEvent);
- message.setEvent(baos.toByteArray());
- message.setMessageType(MessageType.JOB);
- message.setMessageLevel(MessageLevel.INFO);
- message.setMessageId(AiravataUtils.getId("JOB"));
- publisher.publish(message);
+ String messageId = AiravataUtils.getId("JOB");
+ MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.JOB, messageId);
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ if ( ServerSettings.isRabbitMqPublishEnabled()){
+ publisher.publish(msgCntxt);
+ }
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
throw new Exception("Error persisting job status..", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index 4e9bf29..c9a9b03 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -20,16 +20,17 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.TaskIdentity;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.registry.cpi.Registry;
@@ -37,7 +38,7 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
public class AiravataTaskStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
@@ -97,15 +98,12 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
jobStatus.getJobIdentity().getExperimentId());
monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
TaskStatusChangeEvent changeEvent = new TaskStatusChangeEvent(state, taskIdentity);
- Message message = new Message();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(changeEvent);
- message.setEvent(baos.toByteArray());
- message.setMessageType(MessageType.TASK);
- message.setMessageLevel(MessageLevel.INFO);
- message.setMessageId(AiravataUtils.getId("TASK"));
- publisher.publish(message);
+ String messageId = AiravataUtils.getId("TASK");
+ MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.TASK, messageId);
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ if ( ServerSettings.isRabbitMqPublishEnabled()){
+ publisher.publish(msgCntxt);
+ }
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index f2ef855..5632ef9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -20,15 +20,17 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.util.Calendar;
-
+import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.WorkflowIdentity;
+import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
@@ -37,7 +39,7 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
@@ -82,15 +84,13 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId());
monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
WorkflowNodeStatusChangeEvent changeEvent = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
- Message message = new Message();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(changeEvent);
- message.setEvent(baos.toByteArray());
- message.setMessageType(MessageType.WORKFLOWNODE);
- message.setMessageLevel(MessageLevel.INFO);
- message.setMessageId(AiravataUtils.getId("NODE"));
- publisher.publish(message);
+ String messageId = AiravataUtils.getId("WFNODE");
+ MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.WORKFLOWNODE, messageId);
+ msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+
+ if ( ServerSettings.isRabbitMqPublishEnabled()){
+ publisher.publish(msgCntxt);
+ }
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
throw new Exception("Error persisting workflow node status..", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
index 48fff59..00a22bb 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -24,14 +24,19 @@ package org.apache.airavata.messaging.core;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.thrift.TBase;
+import java.sql.Timestamp;
+
public class MessageContext {
private final TBase event;
-
private final MessageType type;
+ private final String messageId;
+ private Timestamp updatedTime;
+
- public MessageContext(TBase message, MessageType type) {
+ public MessageContext(TBase message, MessageType type, String messageId) {
this.event = message;
this.type = type;
+ this.messageId = messageId;
}
public TBase getEvent() {
@@ -41,4 +46,16 @@ public class MessageContext {
public MessageType getType() {
return type;
}
+
+ public Timestamp getUpdatedTime() {
+ return updatedTime;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public void setUpdatedTime(Timestamp updatedTime) {
+ this.updatedTime = updatedTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cc71bac0/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
index d9ad7e4..490d33c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -39,6 +39,7 @@ public class RabbitMQPublisher implements Publisher {
private RabbitMQProducer rabbitMQProducer;
+
public RabbitMQPublisher() throws Exception {
String brokerUrl;
String exchangeName;
@@ -51,14 +52,17 @@ public class RabbitMQPublisher implements Publisher {
throw new AiravataException(message, e);
}
rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
- rabbitMQProducer.open();
}
public void publish(MessageContext msgCtx) throws AiravataException {
try {
- byte []body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+ rabbitMQProducer.open();
+ byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
Message message = new Message();
message.setEvent(body);
+ message.setMessageId(msgCtx.getMessageId());
+ message.setMessageType(msgCtx.getType());
+ message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
String routingKey = null;
if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
@@ -79,7 +83,8 @@ public class RabbitMQPublisher implements Publisher {
identity.getTaskId() + "." +
identity.getJobId();
}
- rabbitMQProducer.send(body, routingKey);
+ byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+ rabbitMQProducer.send(messageBody, routingKey);
} catch (TException e) {
String msg = "Error while deserializing the object";
log.error(msg, e);