You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/09/14 20:22:45 UTC
airavata git commit: Fixed issues with recovery messages
Repository: airavata
Updated Branches:
refs/heads/master d2b176ca1 -> 828e503d3
Fixed issues with recovery messages
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/828e503d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/828e503d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/828e503d
Branch: refs/heads/master
Commit: 828e503d35994122f48447c5a1e1d0105f91bfe0
Parents: d2b176c
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Mon Sep 14 14:22:37 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Mon Sep 14 14:22:37 2015 -0400
----------------------------------------------------------------------
.../impl/RabbitMQProcessLaunchConsumer.java | 39 ++++++++++----------
1 file changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/828e503d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
index ce697da..8f0ce29 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -171,26 +171,25 @@ public class RabbitMQProcessLaunchConsumer {
TBase event = null;
String gatewayId = null;
long deliveryTag = envelope.getDeliveryTag();
- if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
- ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
- processSubmitEvent.getProcessId());
- event = processSubmitEvent;
- gatewayId = processSubmitEvent.getGatewayId();
- }else if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
- ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), processTerminateEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for processId: " +
- processTerminateEvent.getProcessId());
- event = processTerminateEvent;
- gatewayId = processTerminateEvent.getGatewayId();
- }
- MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
- messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
- handler.onMessage(messageContext);
+ if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+ ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId:" +
+ " " +
+ processSubmitEvent.getProcessId());
+ event = processSubmitEvent;
+ gatewayId = processSubmitEvent.getGatewayId();
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+ message.getMessageId(), gatewayId, deliveryTag);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ messageContext.setIsRedeliver(envelope.isRedeliver());
+ handler.onMessage(messageContext);
+ } else {
+ log.error("{} message type is not handle in ProcessLaunch Consumer. Sending ack for " +
+ "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+ sendAck(deliveryTag);
+ }
} catch (TException e) {
String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
log.warn(msg, e);