You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2016/08/26 15:43:18 UTC

[23/50] [abbrv] airavata git commit: Fixed messaging publishing issues

Fixed messaging publishing issues


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/9f979b50
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/9f979b50
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/9f979b50

Branch: refs/heads/lahiru/AIRAVATA-2057
Commit: 9f979b50b972db48b0d80bcdbbe3dd932c9a0bc4
Parents: 3fcde52
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Aug 15 15:45:12 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Aug 15 15:45:12 2016 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/server/GfacServerHandler.java    |  2 +-
 .../apache/airavata/messaging/core/MessagingFactory.java  |  3 ++-
 .../airavata/messaging/core/impl/ExperimentConsumer.java  |  2 +-
 .../airavata/messaging/core/impl/ProcessConsumer.java     | 10 ++++------
 .../airavata/messaging/core/impl/RabbitMQPublisher.java   |  3 ++-
 .../airavata/messaging/core/impl/RabbitMQSubscriber.java  |  7 ++++++-
 6 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 44073dc..a7b0714 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -97,7 +97,7 @@ public class GfacServerHandler implements GfacService.Iface {
     private void initAMQPClient() throws AiravataException {
 	    // init process consumer
         List<String> routingKeys = new ArrayList<>();
-        routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName());
+        routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName());
         processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Type.PROCESS_LAUNCH);
         // init status publisher
 	    statusPublisher = Factory.getStatusPublisher();

http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
index b3e6d35..802ea5a 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java
@@ -123,7 +123,7 @@ public class MessagingFactory {
 
     private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException {
         sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName())
-                .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName())
+                .setQueueName("process_launch")
                 .setAutoAck(false);
         return new RabbitMQSubscriber(sp);
     }
@@ -131,6 +131,7 @@ public class MessagingFactory {
 
     private static Subscriber getExperimentSubscriber(RabbitMQProperties sp) throws AiravataException {
         sp.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName())
+                .setQueueName("experiment_launch")
                 .setAutoAck(false);
         return new RabbitMQSubscriber(sp);
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
index 6e4c46a..5010358 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java
@@ -72,7 +72,7 @@ public class ExperimentConsumer extends QueueingConsumer {
                 String gatewayId = null;
                 ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent();
                 ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent);
-                log.debug(" Message Received with message id '" + message.getMessageId()
+                log.info(" Message Received with message id '" + message.getMessageId()
                         + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
                         " " +
                         experimentEvent.getExperimentId());

http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
index e95a7ca..69910bd 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java
@@ -55,8 +55,7 @@ public class ProcessConsumer extends QueueingConsumer{
     }
 
 
-    @Override
-    public void handleDelivery(String consumerTag,
+    @Override public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties basicProperties,
                                byte[] body) throws IOException {
@@ -71,10 +70,9 @@ public class ProcessConsumer extends QueueingConsumer{
             if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
                 ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
                 ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
-                log.debug(" Message Received with message id '" + message.getMessageId()
-                        + "' and with message type '" + message.getMessageType() + "'  for experimentId:" +
-                        " " +
-                        processSubmitEvent.getProcessId());
+                log.info(" Message Received with message id '" + message.getMessageId()
+                        + " and with message type:" + message.getMessageType() + ", for processId:" +
+                        processSubmitEvent.getProcessId() + ", expId:" + processSubmitEvent.getExperimentId());
                 event = processSubmitEvent;
                 gatewayId = processSubmitEvent.getGatewayId();
                 MessageContext messageContext = new MessageContext(event, message.getMessageType(),

http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
index 3fdb3a1..6f1d1d8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -71,7 +71,7 @@ public class RabbitMQPublisher implements Publisher {
             if (properties.getExchangeName() != null) {
                 channel.exchangeDeclare(properties.getExchangeName(),
                                         properties.getExchangeType(),
-                                        false);
+                                        true); //durable
             }
         } catch (Exception e) {
             String msg = "RabbitMQ connection issue for exchange : " + properties.getExchangeName();
@@ -93,6 +93,7 @@ public class RabbitMQPublisher implements Publisher {
             message.setMessageType(messageContext.getType());
             message.setUpdatedTime(messageContext.getUpdatedTime().getTime());
             String routingKey = routingKeySupplier.apply(messageContext);
+            log.info("publish messageId:" + messageContext.getMessageId() + ", messageType:" + messageContext.getType() + ", to routingKey:" + routingKey);
             byte[] messageBody = ThriftUtils.serializeThriftObject(message);
             send(messageBody, routingKey);
         } catch (TException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/9f979b50/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
index 441281d..6b28723 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQSubscriber.java
@@ -85,7 +85,11 @@ public class RabbitMQSubscriber implements Subscriber {
             if (queueName == null) {
                 queueName = channel.queueDeclare().getQueue();
             } else {
-                channel.queueDeclare(queueName, true, false, false, null);
+                channel.queueDeclare(queueName,
+                                     true, // durable
+                                     false, // exclusive
+                                     false, // autoDelete
+                                     null);// arguments
             }
             final String id = getId(routingKeys, queueName);
             if (queueDetailMap.containsKey(id)) {
@@ -94,6 +98,7 @@ public class RabbitMQSubscriber implements Subscriber {
             }
             // bind all the routing keys
             for (String key : routingKeys) {
+                log.info("Binding key:" + key + " to queue:" + queueName);
                 channel.queueBind(queueName, properties.getExchangeName(), key);
             }