You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/18 14:17:41 UTC

svn commit: r900386 - /activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Author: gtully
Date: Mon Jan 18 13:17:40 2010
New Revision: 900386

URL: http://svn.apache.org/viewvc?rev=900386&view=rev
Log:
merge -c 898281 https://svn.apache.org/repos/asf/activemq/trunk - resolve intermittent ZeroPrefetchConsumerTest failure - prefetch excension decrement needs to be conditional on prefecth=0, relates to changes to prefetchExtension for https://issues.apache.org/activemq/browse/AMQ-2560

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=900386&r1=900385&r2=900386&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jan 18 13:17:40 2010
@@ -249,6 +249,7 @@
                         index++;
                         acknowledge(context, ack, node);
                         if (ack.getLastMessageId().equals(messageId)) {
+                            
                             if (context.isInTransaction()) {
                                 // extend prefetch window only if not a pulling
                                 // consumer
@@ -256,6 +257,11 @@
                                     prefetchExtension = Math.max(
                                             prefetchExtension, index );
                                 }
+                            } else {
+                                // contract prefetch if dispatch required a pull
+                                if (getPrefetchSize() == 0) {
+                                    prefetchExtension = Math.max(0, prefetchExtension - index);
+                                }
                             }
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;