You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/21 22:32:00 UTC
airavata git commit: adding prefetched count
Repository: airavata
Updated Branches:
refs/heads/master 111d9b599 -> a74790a20
adding prefetched count
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a74790a2
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a74790a2
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a74790a2
Branch: refs/heads/master
Commit: a74790a20269f888a5a48e50b5ed64a55cb5ecdd
Parents: 111d9b5
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Thu May 21 16:31:55 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Thu May 21 16:31:55 2015 -0400
----------------------------------------------------------------------
.../airavata/messaging/core/MessagingConstants.java | 1 +
.../messaging/core/impl/RabbitMQProcessConsumer.java | 3 +++
.../airavata/messaging/core/impl/RabbitMQProducer.java | 1 +
.../messaging/core/impl/RabbitMQStatusConsumer.java | 2 ++
.../messaging/core/impl/RabbitMQTaskLaunchConsumer.java | 11 +++++++++++
5 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index d2e086d..e83092c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -30,4 +30,5 @@ public abstract class MessagingConstants {
public static final String RABBIT_QUEUE= "queue";
public static final String RABBIT_CONSUMER_TAG = "consumerTag";
public static final String DURABLE_QUEUE="durable.queue";
+ public static final int PREFETCH_COUNT=64;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
index 3352893..c0b34e3 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
@@ -70,6 +70,7 @@ public class RabbitMQProcessConsumer {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(url);
+ connectionFactory.setAutomaticRecoveryEnabled(true);
connection = connectionFactory.newConnection();
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
@@ -78,6 +79,7 @@ public class RabbitMQProcessConsumer {
log.info("connected to rabbitmq: " + connection + " for default");
channel = connection.createChannel();
+ channel.basicQos(MessagingConstants.PREFETCH_COUNT);
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
} catch (Exception e) {
@@ -100,6 +102,7 @@ public class RabbitMQProcessConsumer {
if (queueName == null) {
if (!channel.isOpen()) {
channel = connection.createChannel();
+ channel.basicQos(MessagingConstants.PREFETCH_COUNT);
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
}
queueName = channel.queueDeclare().getQueue();
http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index fffeece..8b391e3 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -150,6 +150,7 @@ public class RabbitMQProducer {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(url);
+ connectionFactory.setAutomaticRecoveryEnabled(true);
Connection connection = connectionFactory.newConnection();
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index 6efa77a..4d1ea4c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -77,6 +77,7 @@ public class RabbitMQStatusConsumer implements Consumer {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(url);
+ connectionFactory.setAutomaticRecoveryEnabled(true);
connection = connectionFactory.newConnection();
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
@@ -85,6 +86,7 @@ public class RabbitMQStatusConsumer implements Consumer {
log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
channel = connection.createChannel();
+ channel.basicQos(MessagingConstants.PREFETCH_COUNT);
channel.exchangeDeclare(exchangeName, "topic", false);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a74790a2/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 8007ece..a2547e8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -79,6 +79,7 @@ public class RabbitMQTaskLaunchConsumer {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(url);
+ connectionFactory.setAutomaticRecoveryEnabled(true);
connection = connectionFactory.newConnection();
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
@@ -87,6 +88,8 @@ public class RabbitMQTaskLaunchConsumer {
log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
channel = connection.createChannel();
+ channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
} catch (Exception e) {
@@ -130,6 +133,7 @@ public class RabbitMQTaskLaunchConsumer {
if (queueName == null) {
if (!channel.isOpen()) {
channel = connection.createChannel();
+ channel.basicQos(MessagingConstants.PREFETCH_COUNT);
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
}
queueName = channel.queueDeclare().getQueue();
@@ -197,7 +201,14 @@ public class RabbitMQTaskLaunchConsumer {
log.warn(msg, e);
}
}
+
+ @Override
+ public void handleCancel(String consumerTag) throws IOException {
+ super.handleCancel(consumerTag);
+ log.info("Consumer cancelled : " + consumerTag);
+ }
});
+
// save the name for deleting the queue
queueDetailsMap.put(id, new QueueDetails(queueName, keys));
return id;