You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/12/31 06:53:55 UTC
[airavata] branch AIRAVATA-3549 updated: Fixing the bug in deserializing intermediate output event
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch AIRAVATA-3549
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/AIRAVATA-3549 by this push:
new 88cb8a9 Fixing the bug in deserializing intermediate output event
88cb8a9 is described below
commit 88cb8a965bd8a311d01c9d3ca2364a13f85f7450
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Dec 31 01:53:42 2021 -0500
Fixing the bug in deserializing intermediate output event
---
.../messaging/core/impl/ExperimentConsumer.java | 33 ++++++++++++++++------
1 file changed, 25 insertions(+), 8 deletions(-)
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
index b761380..16b7527 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -29,10 +29,7 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
-import org.apache.airavata.model.messaging.event.Message;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.*;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -66,9 +63,12 @@ public class ExperimentConsumer extends QueueingConsumer {
try {
ThriftUtils.createThriftFromBytes(body, message);
long deliveryTag = envelope.getDeliveryTag();
- if (message.getMessageType() == MessageType.EXPERIMENT || message.getMessageType() == MessageType.EXPERIMENT_CANCEL) {
- TBase event = null;
- String gatewayId = null;
+
+ TBase event = null;
+ String gatewayId = null;
+ if (message.getMessageType() == MessageType.EXPERIMENT ||
+ message.getMessageType() == MessageType.EXPERIMENT_CANCEL) {
+
ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent);
log.info(" Message Received with message id '" + message.getMessageId()
@@ -82,7 +82,24 @@ public class ExperimentConsumer extends QueueingConsumer {
messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
messageContext.setIsRedeliver(envelope.isRedeliver());
handler.onMessage(messageContext);
- } else {
+
+ } else if (message.getMessageType() == MessageType.INTERMEDIATE_OUTPUTS) {
+
+ ExperimentIntermediateOutputsEvent intermediateOutEvt = new ExperimentIntermediateOutputsEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), intermediateOutEvt);
+ log.info(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId:" +
+ " " +
+ intermediateOutEvt.getExperimentId());
+ event = intermediateOutEvt;
+ gatewayId = intermediateOutEvt.getGatewayId();
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+ message.getMessageId(), gatewayId, deliveryTag);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ messageContext.setIsRedeliver(envelope.isRedeliver());
+ handler.onMessage(messageContext);
+
+ }else {
log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
"delivery tag {} ", message.getMessageType().name(), deliveryTag);
sendAck(deliveryTag);