You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/08 21:44:34 UTC

svn commit: r763361 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/store/ main/java/org/apache/qpid/server/transactionlog/ test/java/org/apache/qpid/server/transactionlog/

Author: ritchiem
Date: Wed Apr  8 19:44:34 2009
New Revision: 763361

URL: http://svn.apache.org/viewvc?rev=763361&view=rev
Log:
QPID-1794 : Clear the StoreContext after non-transactional processing. Updated BaseTransactionLog to synchronize the on the enqueued messages from the _idToQueue Map as this will be being modified by many ack-ing threads and closing/requeue threads. Updated BaseTransactionLogTest so that it uses a single StoreContext per test rather than a fresh context for each operation. This was masking the problem.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=763361&r1=763360&r2=763361&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Wed Apr  8 19:44:34 2009
@@ -27,6 +27,8 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
+import java.util.Collections;
 
 /**
  * A context that the store can use to associate with a transactional context. For example, it could store
@@ -41,8 +43,7 @@
     private static final String DEFAULT_NAME = "StoreContext";
     private String _name;
     private Object _payload;
-    private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
-    private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
+    private Map<Long, List<AMQQueue>> _dequeueMap;
     private boolean _async;
     private boolean _inTransaction;
 
@@ -53,12 +54,11 @@
 
     public StoreContext(String name)
     {
-        this(name,false);
+        this(name, false);
     }
 
     /**
-     *
-     * @param name The name of this Transaction
+     * @param name         The name of this Transaction
      * @param asynchrouous Is this Transaction Asynchronous
      */
     public StoreContext(String name, boolean asynchrouous)
@@ -66,8 +66,7 @@
         _name = name;
         _async = asynchrouous;
         _inTransaction = false;
-        _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
-        _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();        
+        _dequeueMap = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>());
     }
 
     public StoreContext(boolean asynchronous)
@@ -86,7 +85,7 @@
         {
             _logger.debug("public void setPayload(Object payload = " + payload + "): called");
         }
-        _payload = payload;        
+        _payload = payload;
     }
 
     /**
@@ -99,49 +98,13 @@
         return "<_name = " + _name + ", _payload = " + _payload + ">";
     }
 
-    public Map<Long, ArrayList<AMQQueue>> getEnqueueMap()
-    {
-        return _enqueueMap;
-    }
-
-    public Map<Long, ArrayList<AMQQueue>> getDequeueMap()
+    public Map<Long, List<AMQQueue>> getDequeueMap()
     {
         return _dequeueMap;
     }
 
     /**
-     * Record the enqueues for processing if we abort
-     *
-     * @param queues
-     * @param messageId
-     *
-     * @throws AMQException
-     */
-    public void enqueueMessage(ArrayList<AMQQueue> queues, Long messageId) throws AMQException
-    {
-        if (inTransaction())
-        {
-            ArrayList<AMQQueue> enqueues = _enqueueMap.get(messageId);
-
-            if (enqueues == null)
-            {
-                enqueues = new ArrayList<AMQQueue>();
-                _enqueueMap.put(messageId, enqueues);
-            }
-
-            for (AMQQueue q : queues)
-            {
-                if (!enqueues.contains(q))
-                {
-                    enqueues.add(q);
-                }
-            }
-
-        }
-    }
-
-    /**
-     * Record the dequeue for processing after the commit 
+     * Record the dequeue for processing after the commit
      *
      * @param queue
      * @param messageId
@@ -150,15 +113,22 @@
      */
     public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
     {
-        ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+        List<AMQQueue> dequeues = _dequeueMap.get(messageId);
 
-        if (dequeues == null)
+        synchronized (_dequeueMap)
         {
-            dequeues = new ArrayList<AMQQueue>();
-            _dequeueMap.put(messageId, dequeues);
+            if (dequeues == null)
+            {
+                dequeues = Collections.synchronizedList(new ArrayList<AMQQueue>());
+                _dequeueMap.put(messageId, dequeues);
+            }
         }
 
         dequeues.add(queue);
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Added (" + messageId + ") to dequeues:" + dequeues);
+        }
     }
 
     public void beginTransaction() throws AMQException
@@ -174,13 +144,13 @@
 
     public void abortTransaction() throws AMQException
     {
-        _enqueueMap.clear();
+        _dequeueMap.clear();
         _inTransaction = false;
     }
 
     public boolean inTransaction()
     {
-        return _inTransaction; //  _payload != null;
+        return _inTransaction;
     }
 
     public boolean isAsync()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=763361&r1=763360&r2=763361&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Wed Apr  8 19:44:34 2009
@@ -31,16 +31,18 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.List;
 
 public class BaseTransactionLog implements TransactionLog
 {
     private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
 
     TransactionLog _delegate;
-    protected Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+    protected Map<Long, List<AMQQueue>> _idToQueues = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>());
 
     public BaseTransactionLog(TransactionLog delegate)
     {
@@ -59,14 +61,15 @@
 
     public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
-        context.enqueueMessage(queues, messageId);
-
         if (queues.size() > 1)
         {
-            _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
+            }
 
             //Clone the list incase someone else changes it.
-            _idToQueues.put(messageId, (ArrayList) queues.clone());
+            _idToQueues.put(messageId, (List<AMQQueue>)Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
         }
 
         _delegate.enqueueMessage(context, queues, messageId);
@@ -78,21 +81,33 @@
 
         if (context.inTransaction())
         {
-            Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
+
+            Map<Long, List<AMQQueue>> messageMap = context.getDequeueMap();
 
             //For each Message ID that is in the map check
-            Iterator iterator = messageMap.keySet().iterator();
+            Set<Long> messageIDs = messageMap.keySet();
 
-            while (iterator.hasNext())
+            synchronized (messageMap)
             {
-                Long messageID = (Long) iterator.next();
-                //If we don't have a gloabl reference for this message then there is only a single enqueue
-                if (_idToQueues.get(messageID) == null)
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Pre-Processing single dequeue of:" + messageIDs);
+                }
+
+                Iterator iterator = messageIDs.iterator();
+        
+                while (iterator.hasNext())
                 {
-                    // Add the removal of the message to this transaction
-                    _delegate.removeMessage(context,messageID);
-                    // Remove this message ID as we have processed it so we don't reprocess after the main commmit
-                    iterator.remove();
+                    Long messageID = (Long) iterator.next();
+                    //If we don't have a gloabl reference for this message then there is only a single enqueue
+                    //can check here to see if this is the last reference?
+                    if (_idToQueues.get(messageID) == null)
+                    {
+                        // Add the removal of the message to this transaction
+                        _delegate.removeMessage(context, messageID);
+                        // Remove this message ID as we have processed it so we don't reprocess after the main commmit
+                        iterator.remove();
+                    }
                 }
             }
         }
@@ -136,22 +151,19 @@
 
     public void abortTran(StoreContext context) throws AMQException
     {
-        // If we have enqueues to rollback
-        processDequeues(context.getEnqueueMap());
-
         //Abort the recorded state for this transaction.
         context.abortTransaction();
 
         _delegate.abortTran(context);
     }
 
-    private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
+    private void processDequeues(Map<Long, List<AMQQueue>> messageMap)
             throws AMQException
     {
         // Check we have dequeues to process then process them
         if (messageMap == null || messageMap.isEmpty())
         {
-             return;
+            return;
         }
 
         // Process any enqueues to bring our model up to date.
@@ -162,50 +174,77 @@
 
         //Batch Process the Dequeues on the delegate
         _delegate.beginTran(removeContext);
+        removeContext.beginTransaction();
 
         try
         {
             //For each Message ID Decrement the reference for each of the queues it was on.
-            for (Long messageID : messageIDs)
+
+            synchronized (messageMap)
             {
-                ArrayList<AMQQueue> queueList = messageMap.get(messageID);
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Processing Dequeue for:" + messageIDs);
+                }
+
+                Iterator<Long> messageIDIterator = messageIDs.iterator();
 
-                // For each of the queues decrement the reference
-                for (AMQQueue queue : queueList)
+                while(messageIDIterator.hasNext())
                 {
-                    ArrayList<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+                    Long messageID = messageIDIterator.next();
+                    List<AMQQueue> queueList = messageMap.get(messageID);
 
-                    // If we have no mapping then this message was only enqueued on a single queue
-                    // This will be the case when we are not in a larger transaction
-                    if (enqueuedList == null)
-                    {
-                        _delegate.removeMessage(removeContext, messageID);
-                    }
-                    else
+                   //Remove this message from our DequeueMap as we are processing it.
+                    messageIDIterator.remove();
+
+                    // For each of the queues decrement the reference
+                    for (AMQQueue queue : queueList)
                     {
-                        //When a message is on more than one queue it is possible that this code section is exectuted
-                        // by one thread per enqueue.
-                        // It is however, thread safe because there is only removes being performed and so the
-                        // last thread that does the remove will see the empty queue and remove the message
-                        // At this stage there is nothing that is going to cause this operation to abort. So we don't
-                        // need to worry about any potential adds.
-                        // The message will no longer be enqueued as that operation has been committed before now so
-                        // this is clean up of the data.                        
+                        List<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+
+                        if (_logger.isInfoEnabled())
+                        {
+                            _logger.info("Dequeue message:" + messageID + " from :" + queue);
+                        }
 
-                        // Update the enqueued list
-                        enqueuedList.remove(queue);
 
-                        // If the list is now empty then remove the message
-                        if (enqueuedList.isEmpty())
+                        // If we have no mapping then this message was only enqueued on a single queue
+                        // This will be the case when we are not in a larger transaction
+                        if (enqueuedList == null)
                         {
                             _delegate.removeMessage(removeContext, messageID);
-                            //Remove references list
-                            _idToQueues.remove(messageID);
+                        }
+                        else
+                        {
+                            //When a message is on more than one queue it is possible that this code section is exectuted
+                            // by one thread per enqueue.
+                            // It is however, thread safe because there is only removes being performed and so the
+                            // last thread that does the remove will see the empty queue and remove the message
+                            // At this stage there is nothing that is going to cause this operation to abort. So we don't
+                            // need to worry about any potential adds.
+                            // The message will no longer be enqueued as that operation has been committed before now so
+                            // this is clean up of the data.
+                            synchronized (enqueuedList)
+                            {
+                                // Update the enqueued list but if the queue is not in the list then we are trying
+                                // to dequeue something that is not there anymore, or was never there.
+                                if (!enqueuedList.remove(queue))
+                                {
+                                    throw new UnableToDequeueException(messageID, queue);
+                                }
+
+                                // If the list is now empty then remove the message
+                                if (enqueuedList.isEmpty())
+                                {
+                                    _delegate.removeMessage(removeContext, messageID);
+                                    //Remove references list
+                                    _idToQueues.remove(messageID);
+                                }
+                            }
                         }
                     }
                 }
             }
-
             //Commit the removes on the delegate.
             _delegate.commitTran(removeContext);
             // Mark this context as committed.
@@ -244,4 +283,18 @@
     {
         return _delegate;
     }
+
+    private class UnableToDequeueException extends RuntimeException
+    {
+        Long _messageID;
+        AMQQueue _queue;
+
+        public UnableToDequeueException(Long messageID, AMQQueue queue)
+        {
+            super("Unable to dequeue message(" + messageID + ") from queue " +
+                  "(" + queue + ") it is not/nolonger enqueued on it.");
+            _messageID = messageID;
+            _queue = queue;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java?rev=763361&r1=763360&r2=763361&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java Wed Apr  8 19:44:34 2009
@@ -77,6 +77,8 @@
     /**
      * Places a message onto a specified queue, in a given transactional context.
      *
+     * This method need not be thread safe as it is only called by the message delivery thread
+     *
      * @param context   The transactional context for the operation.
      * @param queues
      *@param messageId The message to enqueue.  @throws AMQException If the operation fails for any reason.  @throws org.apache.qpid.AMQException
@@ -86,6 +88,8 @@
     /**
      * Extracts a message from a specified queue, in a given transactional context.
      *
+     * This method MUST be thread safe as dequeue will be called by multiple threads, ack, requeue, delivery thread
+     *
      * @param context   The transactional context for the operation.
      * @param queue
      * @param messageId The message to dequeue.  @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
@@ -95,6 +99,8 @@
     /**
      * Remove the specified message from the log
      *
+     * This method MUST be thread safe as dequeue will be called by multiple threads, ack, requeue, delivery thread 
+     *
      * @param context The transactional context for the operation
      * @param messageId The message to remove
      * @throws AMQException

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java?rev=763361&r1=763360&r2=763361&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Wed Apr  8 19:44:34 2009
@@ -51,11 +51,13 @@
     TestTransactionLog _transactionLog;
     private ArrayList<AMQQueue> _queues;
     private MockPersistentAMQMessage _message;
+    StoreContext _context;
 
     public void setUp() throws Exception
     {
         super.setUp();
         _transactionLog = new TestableBaseTransactionLog(this);
+        _context = new StoreContext();
     }
 
     public void testSingleEnqueueNoTransactional() throws AMQException
@@ -64,13 +66,13 @@
 
         _message = new MockPersistentAMQMessage(1L, this);
 
-        _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+        _message.addContentBodyFrame(_context, new MockContentChunk(100), true);
 
         MessagePublishInfo mpi = new MessagePublishInfoImpl();
 
         ContentHeaderBody chb = new ContentHeaderBody();
 
-        _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+        _message.setPublishAndContentHeaderBody(_context, mpi, chb);
 
         verifyMessageStored(_message.getMessageId());
         // Enqueue
@@ -79,7 +81,7 @@
         MockAMQQueue queue = new MockAMQQueue(this.getName());
         _queues.add(queue);
 
-        _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+        _transactionLog.enqueueMessage(_context, _queues, _message.getMessageId());
 
         verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
     }
@@ -89,14 +91,14 @@
         // Enqueue a message to dequeue
         testSingleEnqueueNoTransactional();
 
-        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+        _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
 
         verifyMessageRemoved(_message.getMessageId());
     }
 
     public void testSingleEnqueueTransactional() throws AMQException
     {
-        StoreContext context = new StoreContext();
+        StoreContext context = _context;
 
         _transactionLog.beginTran(context);
 
@@ -133,7 +135,7 @@
         // Enqueue a message to dequeue
         testSingleEnqueueTransactional();
 
-        StoreContext context = new StoreContext();
+        StoreContext context = _context;
 
         _transactionLog.beginTran(context);
 
@@ -150,13 +152,13 @@
 
         _message = new MockPersistentAMQMessage(1L, this);
 
-        _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+        _message.addContentBodyFrame(_context, new MockContentChunk(100), true);
 
         MessagePublishInfo mpi = new MessagePublishInfoImpl();
 
         ContentHeaderBody chb = new ContentHeaderBody();
 
-        _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+        _message.setPublishAndContentHeaderBody(_context, mpi, chb);
 
         verifyMessageStored(_message.getMessageId());
         // Enqueue
@@ -172,7 +174,7 @@
         queue = new MockAMQQueue(this.getName() + "3");
         _queues.add(queue);
 
-        _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+        _transactionLog.enqueueMessage(_context, _queues, _message.getMessageId());
 
         verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
     }
@@ -182,7 +184,7 @@
         // Enqueue a message to dequeue
         testMultipleEnqueueNoTransactional();
 
-        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+        _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
 
         ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
 
@@ -192,7 +194,7 @@
         verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
         verifyMessageStored(_message.getMessageId());
 
-        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+        _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
 
         assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
         _queues.remove(0);
@@ -218,7 +220,7 @@
 
         verifyMessageStored(_message.getMessageId());
 
-        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+        _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
 
         verifyMessageRemoved(_message.getMessageId());
     }
@@ -233,7 +235,7 @@
 
     public void testMultipleEnqueueTransactional() throws AMQException
     {
-        StoreContext context = new StoreContext();
+        StoreContext context = _context;
 
         _transactionLog.beginTran(context);
 
@@ -276,7 +278,7 @@
         // Enqueue a message to dequeue
         testMultipleEnqueueTransactional();
 
-        StoreContext context = new StoreContext();
+        StoreContext context = _context;
 
         _transactionLog.beginTran(context);
 
@@ -319,7 +321,7 @@
         // Enqueue a message to dequeue
         testMultipleEnqueueTransactional();
 
-        StoreContext context = new StoreContext();
+        StoreContext context = _context;
 
         _transactionLog.beginTran(context);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org