You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/12/31 06:53:55 UTC

[airavata] branch AIRAVATA-3549 updated: Fixing the bug in deserializing intermediate output event

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch AIRAVATA-3549
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/AIRAVATA-3549 by this push:
     new 88cb8a9  Fixing the bug in deserializing intermediate output event
88cb8a9 is described below

commit 88cb8a965bd8a311d01c9d3ca2364a13f85f7450
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Dec 31 01:53:42 2021 -0500

    Fixing the bug in deserializing intermediate output event
---
 .../messaging/core/impl/ExperimentConsumer.java    | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
index b761380..16b7527 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -29,10 +29,7 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
-import org.apache.airavata.model.messaging.event.Message;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.*;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -66,9 +63,12 @@ public class ExperimentConsumer extends QueueingConsumer {
         try {
             ThriftUtils.createThriftFromBytes(body, message);
             long deliveryTag = envelope.getDeliveryTag();
-            if (message.getMessageType() == MessageType.EXPERIMENT || message.getMessageType() == MessageType.EXPERIMENT_CANCEL) {
-                TBase event = null;
-                String gatewayId = null;
+
+            TBase event = null;
+            String gatewayId = null;
+            if (message.getMessageType() == MessageType.EXPERIMENT ||
+                    message.getMessageType() == MessageType.EXPERIMENT_CANCEL) {
+
                 ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent();
                 ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent);
                 log.info(" Message Received with message id '" + message.getMessageId()
@@ -82,7 +82,24 @@ public class ExperimentConsumer extends QueueingConsumer {
                 messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                 messageContext.setIsRedeliver(envelope.isRedeliver());
                 handler.onMessage(messageContext);
-            } else {
+
+            } else if (message.getMessageType() == MessageType.INTERMEDIATE_OUTPUTS) {
+
+                ExperimentIntermediateOutputsEvent intermediateOutEvt = new ExperimentIntermediateOutputsEvent();
+                ThriftUtils.createThriftFromBytes(message.getEvent(), intermediateOutEvt);
+                log.info(" Message Received with message id '" + message.getMessageId()
+                        + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
+                        " " +
+                        intermediateOutEvt.getExperimentId());
+                event = intermediateOutEvt;
+                gatewayId = intermediateOutEvt.getGatewayId();
+                MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+                        message.getMessageId(), gatewayId, deliveryTag);
+                messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                messageContext.setIsRedeliver(envelope.isRedeliver());
+                handler.onMessage(messageContext);
+
+            }else {
                 log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
                         "delivery tag {} ", message.getMessageType().name(), deliveryTag);
                 sendAck(deliveryTag);