You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/06/04 17:04:52 UTC

svn commit: r951434 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Author: cwiklik
Date: Fri Jun  4 15:04:51 2010
New Revision: 951434

URL: http://svn.apache.org/viewvc?rev=951434&view=rev
Log:
UIMA-1799 Modified to support prefetch override on a reply queue. The prefetch value equals number of consumers defined on a reply queue in the DD

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=951434&r1=951433&r2=951434&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Jun  4 15:04:51 2010
@@ -565,6 +565,14 @@ public class UimaDefaultMessageListenerC
    */
   public void afterPropertiesSet() {
     if (endpoint != null) {
+
+			// Override the prefetch size. The dd2spring always sets this to 1 which 
+			// may effect the throughput of a service. Change the prefetch size to
+			// number of consumer threads defined in DD.
+      if ( cc > 1 && endpoint.isTempReplyDestination() && connectionFactory instanceof ActiveMQConnectionFactory ) {
+        ((ActiveMQConnectionFactory)connectionFactory).getPrefetchPolicy().setQueuePrefetch(cc);
+      }
+      
       // Endpoint has been plugged in from spring xml. This means this is a listener
       // for a reply queue. We need to rewire things a bit. First make Spring use
       // one thread to make sure we receive messages in order. To fix a race condition
@@ -579,7 +587,7 @@ public class UimaDefaultMessageListenerC
       super.setConcurrentConsumers(1);
       if (cc > 1) {
         try {
-          concurrentListener = new ConcurrentMessageListener(cc, ml);
+          concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName());
           super.setMessageListener(concurrentListener);
         } catch (Exception e) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -770,7 +778,17 @@ public class UimaDefaultMessageListenerC
     super.setDestination(aDestination);
     if (endpoint != null) {
       endpoint.setDestination(aDestination);
-      if (aDestination instanceof TemporaryQueue) {
+      //  Get the prefetch size. If > 1, it has been previously overriden. The override is done in
+      // the code since dd2spring alwys sets the prefetch on a reply queue to 1. This may slow down
+      // a throughput of a service.
+      int prefetchSize = ((ActiveMQConnectionFactory)connectionFactory).getPrefetchPolicy().getQueuePrefetch();
+      if (aDestination instanceof TemporaryQueue && prefetchSize > 1) {
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                    "setDestination", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                    "UIMAJMS_replyq_prefetch_override__INFO", new Object[] {aDestination,prefetchSize
+           });
+        }
         endpoint.setTempReplyDestination(true);
         Object pojoListener = getPojoListener();
         if (pojoListener != null && pojoListener instanceof InputChannel) {