You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/21 22:32:00 UTC

airavata git commit: adding prefetched count

Repository: airavata
Updated Branches:
  refs/heads/master 111d9b599 -> a74790a20


adding prefetched count


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a74790a2
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a74790a2
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a74790a2

Branch: refs/heads/master
Commit: a74790a20269f888a5a48e50b5ed64a55cb5ecdd
Parents: 111d9b5
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Thu May 21 16:31:55 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Thu May 21 16:31:55 2015 -0400

----------------------------------------------------------------------
 .../airavata/messaging/core/MessagingConstants.java      |  1 +
 .../messaging/core/impl/RabbitMQProcessConsumer.java     |  3 +++
 .../airavata/messaging/core/impl/RabbitMQProducer.java   |  1 +
 .../messaging/core/impl/RabbitMQStatusConsumer.java      |  2 ++
 .../messaging/core/impl/RabbitMQTaskLaunchConsumer.java  | 11 +++++++++++
 5 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index d2e086d..e83092c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -30,4 +30,5 @@ public abstract class MessagingConstants {
     public static final String RABBIT_QUEUE= "queue";
     public static final String RABBIT_CONSUMER_TAG = "consumerTag";
     public static final String DURABLE_QUEUE="durable.queue";
+    public static final int PREFETCH_COUNT=64;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
index 3352893..c0b34e3 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
@@ -70,6 +70,7 @@ public class RabbitMQProcessConsumer {
         try {
             ConnectionFactory connectionFactory = new ConnectionFactory();
             connectionFactory.setUri(url);
+            connectionFactory.setAutomaticRecoveryEnabled(true);
             connection = connectionFactory.newConnection();
             connection.addShutdownListener(new ShutdownListener() {
                 public void shutdownCompleted(ShutdownSignalException cause) {
@@ -78,6 +79,7 @@ public class RabbitMQProcessConsumer {
             log.info("connected to rabbitmq: " + connection + " for default");
 
             channel = connection.createChannel();
+            channel.basicQos(MessagingConstants.PREFETCH_COUNT);
 //            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
 
         } catch (Exception e) {
@@ -100,6 +102,7 @@ public class RabbitMQProcessConsumer {
             if (queueName == null) {
                 if (!channel.isOpen()) {
                     channel = connection.createChannel();
+                    channel.basicQos(MessagingConstants.PREFETCH_COUNT);
 //                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
                 }
                 queueName = channel.queueDeclare().getQueue();

http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index fffeece..8b391e3 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -150,6 +150,7 @@ public class RabbitMQProducer {
         try {
             ConnectionFactory connectionFactory = new ConnectionFactory();
             connectionFactory.setUri(url);
+            connectionFactory.setAutomaticRecoveryEnabled(true);
             Connection connection = connectionFactory.newConnection();
             connection.addShutdownListener(new ShutdownListener() {
                 public void shutdownCompleted(ShutdownSignalException cause) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index 6efa77a..4d1ea4c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -77,6 +77,7 @@ public class RabbitMQStatusConsumer implements Consumer {
         try {
             ConnectionFactory connectionFactory = new ConnectionFactory();
             connectionFactory.setUri(url);
+            connectionFactory.setAutomaticRecoveryEnabled(true);
             connection = connectionFactory.newConnection();
             connection.addShutdownListener(new ShutdownListener() {
                 public void shutdownCompleted(ShutdownSignalException cause) {
@@ -85,6 +86,7 @@ public class RabbitMQStatusConsumer implements Consumer {
             log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
 
             channel = connection.createChannel();
+            channel.basicQos(MessagingConstants.PREFETCH_COUNT);
             channel.exchangeDeclare(exchangeName, "topic", false);
 
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 8007ece..a2547e8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -79,6 +79,7 @@ public class RabbitMQTaskLaunchConsumer {
         try {
             ConnectionFactory connectionFactory = new ConnectionFactory();
             connectionFactory.setUri(url);
+            connectionFactory.setAutomaticRecoveryEnabled(true);
             connection = connectionFactory.newConnection();
             connection.addShutdownListener(new ShutdownListener() {
                 public void shutdownCompleted(ShutdownSignalException cause) {
@@ -87,6 +88,8 @@ public class RabbitMQTaskLaunchConsumer {
             log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
 
             channel = connection.createChannel();
+            channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+
 //            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
 
         } catch (Exception e) {
@@ -130,6 +133,7 @@ public class RabbitMQTaskLaunchConsumer {
             if (queueName == null) {
                 if (!channel.isOpen()) {
                     channel = connection.createChannel();
+                    channel.basicQos(MessagingConstants.PREFETCH_COUNT);
 //                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
                 }
                 queueName = channel.queueDeclare().getQueue();
@@ -197,7 +201,14 @@ public class RabbitMQTaskLaunchConsumer {
                         log.warn(msg, e);
                     }
                 }
+
+                @Override
+                public void handleCancel(String consumerTag) throws IOException {
+                    super.handleCancel(consumerTag);
+                    log.info("Consumer cancelled : " + consumerTag);
+                }
             });
+
             // save the name for deleting the queue
             queueDetailsMap.put(id, new QueueDetails(queueName, keys));
             return id;