You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/17 18:59:43 UTC

svn commit: r696370 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/PrefetchSubscription.java transaction/Transaction.java

Author: chirino
Date: Wed Sep 17 09:59:42 2008
New Revision: 696370

URL: http://svn.apache.org/viewvc?rev=696370&view=rev
Log:
This fixes the recent errors the test cases have been seeing with transacted acks due to the new ack assertion bits added.
We now take the mesage out of the dispatch list when the ack is received, but we put it back on a rollback.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=696370&r1=696369&r2=696370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Sep 17 09:59:42 2008
@@ -197,6 +197,7 @@
                     }
                     if (inAckRange) {
                         // Don't remove the nodes until we are committed.
+                        removeList.add(node);
                         if (!context.isInTransaction()) {
                             dequeueCounter++;
                             if (!this.getConsumerInfo().isBrowser()) {
@@ -205,7 +206,6 @@
                             if (!isSlave()) {
                                 node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
                             }
-                            removeList.add(node);
                         } else {
                             // setup a Synchronization to remove nodes from the
                             // dispatched list.
@@ -215,9 +215,7 @@
                                         public void afterCommit()
                                                 throws Exception {
                                             synchronized(dispatchLock) {
-                                            
                                                 dequeueCounter++;
-                                                dispatched.remove(node);
                                                 node
                                                         .getRegionDestination()
                                                         .getDestinationStatistics()
@@ -234,9 +232,11 @@
                                             }
                                         }
 
-                                        public void afterRollback()
-                                                throws Exception {
-                                            super.afterRollback();
+                                        public void afterRollback() throws Exception {
+                                        	// Need to put it back in the front.
+                                            synchronized(dispatchLock) {
+                                        	    dispatched.add(0, node);
+                                            }
                                         }
                                     });
                         }
@@ -426,12 +426,16 @@
 		boolean checkFoundStart = false;
 		boolean checkFoundEnd = false;
 		for (MessageReference node : dispatched) {
-			if (!checkFoundStart && firstAckedMsg != null && firstAckedMsg.equals(node.getMessageId())) {
+			
+			if( firstAckedMsg == null ) {
+				checkFoundStart=true;
+			} else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
 				checkFoundStart = true;
 			}
 
-			if (checkFoundStart || firstAckedMsg == null)
+			if (checkFoundStart) {
 				checkCount++;
+			}
 
 			if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
 				checkFoundEnd = true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=696370&r1=696369&r2=696370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Wed Sep 17 09:59:42 2008
@@ -18,6 +18,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 
 import javax.transaction.xa.XAException;
@@ -88,6 +89,7 @@
     }
 
     public void fireAfterRollback() throws Exception {
+    	Collections.reverse(synchronizations);
         for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
             Synchronization s = iter.next();
             s.afterRollback();