You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/27 20:16:19 UTC
airavata git commit: getting prefetch count from server properties
Repository: airavata
Updated Branches:
refs/heads/master 94a20abca -> a26c1e783
getting prefetch count from server properties
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a26c1e78
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a26c1e78
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a26c1e78
Branch: refs/heads/master
Commit: a26c1e783dba00372c465d39ce075a6d85137087
Parents: 94a20ab
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Wed May 27 14:16:13 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Wed May 27 14:16:13 2015 -0400
----------------------------------------------------------------------
.../server/src/main/resources/airavata-server.properties | 1 +
.../org/apache/airavata/messaging/core/MessagingConstants.java | 2 +-
.../airavata/messaging/core/impl/RabbitMQProcessConsumer.java | 6 ++++--
.../airavata/messaging/core/impl/RabbitMQStatusConsumer.java | 5 +++--
.../messaging/core/impl/RabbitMQTaskLaunchConsumer.java | 6 ++++--
5 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index faf96a7..56a8bd1 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -243,6 +243,7 @@ task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunch
rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
durable.queue=false
+prefetch.count=200
launch.queue.name=launch.queue
cancel.queue.name=cancel.queue
activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index e83092c..0c28c36 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -30,5 +30,5 @@ public abstract class MessagingConstants {
public static final String RABBIT_QUEUE= "queue";
public static final String RABBIT_CONSUMER_TAG = "consumerTag";
public static final String DURABLE_QUEUE="durable.queue";
- public static final int PREFETCH_COUNT=64;
+ public static final String PREFETCH_COUNT="prefetch_count";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
index c0b34e3..39b3df9 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
@@ -54,10 +54,12 @@ public class RabbitMQProcessConsumer {
private String url;
private Connection connection;
private Channel channel;
+ private int prefetchCount;
public RabbitMQProcessConsumer() throws AiravataException {
try {
url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
createConnection();
} catch (ApplicationSettingsException e) {
String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
@@ -79,7 +81,7 @@ public class RabbitMQProcessConsumer {
log.info("connected to rabbitmq: " + connection + " for default");
channel = connection.createChannel();
- channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+ channel.basicQos(prefetchCount);
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
} catch (Exception e) {
@@ -102,7 +104,7 @@ public class RabbitMQProcessConsumer {
if (queueName == null) {
if (!channel.isOpen()) {
channel = connection.createChannel();
- channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+ channel.basicQos(prefetchCount);
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
}
queueName = channel.queueDeclare().getQueue();
http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index 4d1ea4c..c723f01 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -51,13 +51,14 @@ public class RabbitMQStatusConsumer implements Consumer {
private String url;
private Connection connection;
private Channel channel;
+ private int prefetchCount;
private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
public RabbitMQStatusConsumer() throws AiravataException {
try {
url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-
+ prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
createConnection();
} catch (ApplicationSettingsException e) {
String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
@@ -86,7 +87,7 @@ public class RabbitMQStatusConsumer implements Consumer {
log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
channel = connection.createChannel();
- channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+ channel.basicQos(prefetchCount);
channel.exchangeDeclare(exchangeName, "topic", false);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index a2547e8..89a4a5d 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -53,6 +53,7 @@ public class RabbitMQTaskLaunchConsumer {
private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
private boolean durableQueue;
private MessageHandler messageHandler;
+ private int prefetchCount;
public RabbitMQTaskLaunchConsumer() throws AiravataException {
@@ -60,6 +61,7 @@ public class RabbitMQTaskLaunchConsumer {
url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+ prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
createConnection();
} catch (ApplicationSettingsException e) {
String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
@@ -88,7 +90,7 @@ public class RabbitMQTaskLaunchConsumer {
log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
channel = connection.createChannel();
- channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+ channel.basicQos(prefetchCount);
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
@@ -133,7 +135,7 @@ public class RabbitMQTaskLaunchConsumer {
if (queueName == null) {
if (!channel.isOpen()) {
channel = connection.createChannel();
- channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+ channel.basicQos(prefetchCount);
// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
}
queueName = channel.queueDeclare().getQueue();