You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/06/04 17:04:52 UTC
svn commit: r951434 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Author: cwiklik
Date: Fri Jun 4 15:04:51 2010
New Revision: 951434
URL: http://svn.apache.org/viewvc?rev=951434&view=rev
Log:
UIMA-1799 Modified to support prefetch override on a reply queue. The prefetch value equals number of consumers defined on a reply queue in the DD
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=951434&r1=951433&r2=951434&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Jun 4 15:04:51 2010
@@ -565,6 +565,14 @@ public class UimaDefaultMessageListenerC
*/
public void afterPropertiesSet() {
if (endpoint != null) {
+
+ // Override the prefetch size. The dd2spring always sets this to 1 which
+ // may effect the throughput of a service. Change the prefetch size to
+ // number of consumer threads defined in DD.
+ if ( cc > 1 && endpoint.isTempReplyDestination() && connectionFactory instanceof ActiveMQConnectionFactory ) {
+ ((ActiveMQConnectionFactory)connectionFactory).getPrefetchPolicy().setQueuePrefetch(cc);
+ }
+
// Endpoint has been plugged in from spring xml. This means this is a listener
// for a reply queue. We need to rewire things a bit. First make Spring use
// one thread to make sure we receive messages in order. To fix a race condition
@@ -579,7 +587,7 @@ public class UimaDefaultMessageListenerC
super.setConcurrentConsumers(1);
if (cc > 1) {
try {
- concurrentListener = new ConcurrentMessageListener(cc, ml);
+ concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName());
super.setMessageListener(concurrentListener);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -770,7 +778,17 @@ public class UimaDefaultMessageListenerC
super.setDestination(aDestination);
if (endpoint != null) {
endpoint.setDestination(aDestination);
- if (aDestination instanceof TemporaryQueue) {
+ // Get the prefetch size. If > 1, it has been previously overriden. The override is done in
+ // the code since dd2spring alwys sets the prefetch on a reply queue to 1. This may slow down
+ // a throughput of a service.
+ int prefetchSize = ((ActiveMQConnectionFactory)connectionFactory).getPrefetchPolicy().getQueuePrefetch();
+ if (aDestination instanceof TemporaryQueue && prefetchSize > 1) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "setDestination", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_replyq_prefetch_override__INFO", new Object[] {aDestination,prefetchSize
+ });
+ }
endpoint.setTempReplyDestination(true);
Object pojoListener = getPojoListener();
if (pojoListener != null && pojoListener instanceof InputChannel) {