You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/27 20:16:19 UTC

airavata git commit: getting prefetch count from server properties

Repository: airavata
Updated Branches:
  refs/heads/master 94a20abca -> a26c1e783


getting prefetch count from server properties


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a26c1e78
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a26c1e78
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a26c1e78

Branch: refs/heads/master
Commit: a26c1e783dba00372c465d39ce075a6d85137087
Parents: 94a20ab
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Wed May 27 14:16:13 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Wed May 27 14:16:13 2015 -0400

----------------------------------------------------------------------
 .../server/src/main/resources/airavata-server.properties       | 1 +
 .../org/apache/airavata/messaging/core/MessagingConstants.java | 2 +-
 .../airavata/messaging/core/impl/RabbitMQProcessConsumer.java  | 6 ++++--
 .../airavata/messaging/core/impl/RabbitMQStatusConsumer.java   | 5 +++--
 .../messaging/core/impl/RabbitMQTaskLaunchConsumer.java        | 6 ++++--
 5 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index faf96a7..56a8bd1 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -243,6 +243,7 @@ task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunch
 rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
 rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
 durable.queue=false
+prefetch.count=200
 launch.queue.name=launch.queue
 cancel.queue.name=cancel.queue
 activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher

http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index e83092c..0c28c36 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -30,5 +30,5 @@ public abstract class MessagingConstants {
     public static final String RABBIT_QUEUE= "queue";
     public static final String RABBIT_CONSUMER_TAG = "consumerTag";
     public static final String DURABLE_QUEUE="durable.queue";
-    public static final int PREFETCH_COUNT=64;
+    public static final String PREFETCH_COUNT="prefetch_count";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
index c0b34e3..39b3df9 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
@@ -54,10 +54,12 @@ public class RabbitMQProcessConsumer {
     private String url;
     private Connection connection;
     private Channel channel;
+    private int prefetchCount;
 
     public RabbitMQProcessConsumer() throws AiravataException {
         try {
             url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
             createConnection();
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
@@ -79,7 +81,7 @@ public class RabbitMQProcessConsumer {
             log.info("connected to rabbitmq: " + connection + " for default");
 
             channel = connection.createChannel();
-            channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+            channel.basicQos(prefetchCount);
 //            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
 
         } catch (Exception e) {
@@ -102,7 +104,7 @@ public class RabbitMQProcessConsumer {
             if (queueName == null) {
                 if (!channel.isOpen()) {
                     channel = connection.createChannel();
-                    channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+                    channel.basicQos(prefetchCount);
 //                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
                 }
                 queueName = channel.queueDeclare().getQueue();

http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index 4d1ea4c..c723f01 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -51,13 +51,14 @@ public class RabbitMQStatusConsumer implements Consumer {
     private String url;
     private Connection connection;
     private Channel channel;
+    private int prefetchCount;
     private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
 
     public RabbitMQStatusConsumer() throws AiravataException {
         try {
             url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
             exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
-
+            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
             createConnection();
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
@@ -86,7 +87,7 @@ public class RabbitMQStatusConsumer implements Consumer {
             log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
 
             channel = connection.createChannel();
-            channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+            channel.basicQos(prefetchCount);
             channel.exchangeDeclare(exchangeName, "topic", false);
 
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a26c1e78/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index a2547e8..89a4a5d 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -53,6 +53,7 @@ public class RabbitMQTaskLaunchConsumer {
     private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
     private boolean durableQueue;
     private MessageHandler messageHandler;
+    private int prefetchCount;
 
 
     public RabbitMQTaskLaunchConsumer() throws AiravataException {
@@ -60,6 +61,7 @@ public class RabbitMQTaskLaunchConsumer {
             url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
             durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
             taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+            prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
             createConnection();
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
@@ -88,7 +90,7 @@ public class RabbitMQTaskLaunchConsumer {
             log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
 
             channel = connection.createChannel();
-            channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+            channel.basicQos(prefetchCount);
 
 //            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
 
@@ -133,7 +135,7 @@ public class RabbitMQTaskLaunchConsumer {
             if (queueName == null) {
                 if (!channel.isOpen()) {
                     channel = connection.createChannel();
-                    channel.basicQos(MessagingConstants.PREFETCH_COUNT);
+                    channel.basicQos(prefetchCount);
 //                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
                 }
                 queueName = channel.queueDeclare().getQueue();