You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/09/14 20:22:45 UTC

airavata git commit: Fixed issues with recovery messages

Repository: airavata
Updated Branches:
  refs/heads/master d2b176ca1 -> 828e503d3


Fixed issues with recovery messages


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/828e503d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/828e503d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/828e503d

Branch: refs/heads/master
Commit: 828e503d35994122f48447c5a1e1d0105f91bfe0
Parents: d2b176c
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Mon Sep 14 14:22:37 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Mon Sep 14 14:22:37 2015 -0400

----------------------------------------------------------------------
 .../impl/RabbitMQProcessLaunchConsumer.java     | 39 ++++++++++----------
 1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/828e503d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
index ce697da..8f0ce29 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -171,26 +171,25 @@ public class RabbitMQProcessLaunchConsumer {
                         TBase event = null;
                         String gatewayId = null;
                         long deliveryTag = envelope.getDeliveryTag();
-                        if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
-                            ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for experimentId: " +
-                                    processSubmitEvent.getProcessId());
-                            event = processSubmitEvent;
-                            gatewayId = processSubmitEvent.getGatewayId();
-                        }else if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
-                            ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent();
-                            ThriftUtils.createThriftFromBytes(message.getEvent(), processTerminateEvent);
-                            log.debug(" Message Received with message id '" + message.getMessageId()
-                                    + "' and with message type '" + message.getMessageType() + "'  for processId: " +
-                                    processTerminateEvent.getProcessId());
-                            event = processTerminateEvent;
-                            gatewayId = processTerminateEvent.getGatewayId();
-                        }
-                        MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
-                        messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
-                        handler.onMessage(messageContext);
+	                    if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+		                    ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+		                    ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+		                    log.debug(" Message Received with message id '" + message.getMessageId()
+				                    + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
+				                    " " +
+				                    processSubmitEvent.getProcessId());
+		                    event = processSubmitEvent;
+		                    gatewayId = processSubmitEvent.getGatewayId();
+		                    MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+				                    message.getMessageId(), gatewayId, deliveryTag);
+		                    messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+		                    messageContext.setIsRedeliver(envelope.isRedeliver());
+		                    handler.onMessage(messageContext);
+	                    } else {
+		                    log.error("{} message type is not handle in ProcessLaunch Consumer. Sending ack for " +
+				                    "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+		                    sendAck(deliveryTag);
+	                    }
                     } catch (TException e) {
                         String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
                         log.warn(msg, e);