You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/03 16:46:30 UTC

svn commit: r761700 - /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java

Author: ritchiem
Date: Fri Apr  3 14:46:29 2009
New Revision: 761700

URL: http://svn.apache.org/viewvc?rev=761700&view=rev
Log:
QPID-1764 : Resolved ConcurrentME. Perils of using the 'syntax sugar' for loop hides the message iterator that you need to call .remove(). Calling remove on the underlying Map will cause the resulting CME.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=761700&r1=761699&r2=761700&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Fri Apr  3 14:46:29 2009
@@ -33,6 +33,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Iterator;
 
 public class BaseTransactionLog implements TransactionLog
 {
@@ -80,15 +81,18 @@
             Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
 
             //For each Message ID that is in the map check
-            for (Long messageID : messageMap.keySet())
+            Iterator iterator = messageMap.keySet().iterator();
+
+            while (iterator.hasNext())
             {
+                Long messageID = (Long) iterator.next();
                 //If we don't have a gloabl reference for this message then there is only a single enqueue
                 if (_idToQueues.get(messageID) == null)
                 {
                     // Add the removal of the message to this transaction
                     _delegate.removeMessage(context,messageID);
                     // Remove this message ID as we have processed it so we don't reprocess after the main commmit
-                    messageMap.remove(messageID);
+                    iterator.remove();
                 }
             }
         }
@@ -179,6 +183,15 @@
                     }
                     else
                     {
+                        //When a message is on more than one queue it is possible that this code section is exectuted
+                        // by one thread per enqueue.
+                        // It is however, thread safe because there is only removes being performed and so the
+                        // last thread that does the remove will see the empty queue and remove the message
+                        // At this stage there is nothing that is going to cause this operation to abort. So we don't
+                        // need to worry about any potential adds.
+                        // The message will no longer be enqueued as that operation has been committed before now so
+                        // this is clean up of the data.                        
+
                         // Update the enqueued list
                         enqueuedList.remove(queue);
 
@@ -195,6 +208,8 @@
 
             //Commit the removes on the delegate.
             _delegate.commitTran(removeContext);
+            // Mark this context as committed.
+            removeContext.commitTransaction();
         }
         finally
         {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org