You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/09 09:41:58 UTC

svn commit: r763548 - in /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server: store/StoreContext.java transactionlog/BaseTransactionLog.java

Author: ritchiem
Date: Thu Apr  9 07:41:58 2009
New Revision: 763548

URL: http://svn.apache.org/viewvc?rev=763548&view=rev
Log:
QPID-1794 : Removed unnecessary synchronisation
Merged from trunk r763363

Modified:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=763548&r1=763547&r2=763548&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Thu Apr  9 07:41:58 2009
@@ -66,7 +66,7 @@
         _name = name;
         _async = asynchrouous;
         _inTransaction = false;
-        _dequeueMap = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>());
+        _dequeueMap = new HashMap<Long, List<AMQQueue>>();
     }
 
     public StoreContext(boolean asynchronous)
@@ -115,13 +115,10 @@
     {
         List<AMQQueue> dequeues = _dequeueMap.get(messageId);
 
-        synchronized (_dequeueMap)
+        if (dequeues == null)
         {
-            if (dequeues == null)
-            {
-                dequeues = Collections.synchronizedList(new ArrayList<AMQQueue>());
-                _dequeueMap.put(messageId, dequeues);
-            }
+            dequeues = new ArrayList<AMQQueue>();
+            _dequeueMap.put(messageId, dequeues);
         }
 
         dequeues.add(queue);

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=763548&r1=763547&r2=763548&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Thu Apr  9 07:41:58 2009
@@ -69,7 +69,7 @@
             }
 
             //Clone the list incase someone else changes it.
-            _idToQueues.put(messageId, (List<AMQQueue>)Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
+            _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
         }
 
         _delegate.enqueueMessage(context, queues, messageId);
@@ -87,27 +87,24 @@
             //For each Message ID that is in the map check
             Set<Long> messageIDs = messageMap.keySet();
 
-            synchronized (messageMap)
+            if (_logger.isInfoEnabled())
             {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Pre-Processing single dequeue of:" + messageIDs);
-                }
+                _logger.info("Pre-Processing single dequeue of:" + messageIDs);
+            }
+
+            Iterator iterator = messageIDs.iterator();
 
-                Iterator iterator = messageIDs.iterator();
-        
-                while (iterator.hasNext())
+            while (iterator.hasNext())
+            {
+                Long messageID = (Long) iterator.next();
+                //If we don't have a gloabl reference for this message then there is only a single enqueue
+                //can check here to see if this is the last reference?
+                if (_idToQueues.get(messageID) == null)
                 {
-                    Long messageID = (Long) iterator.next();
-                    //If we don't have a gloabl reference for this message then there is only a single enqueue
-                    //can check here to see if this is the last reference?
-                    if (_idToQueues.get(messageID) == null)
-                    {
-                        // Add the removal of the message to this transaction
-                        _delegate.removeMessage(context, messageID);
-                        // Remove this message ID as we have processed it so we don't reprocess after the main commmit
-                        iterator.remove();
-                    }
+                    // Add the removal of the message to this transaction
+                    _delegate.removeMessage(context, messageID);
+                    // Remove this message ID as we have processed it so we don't reprocess after the main commmit
+                    iterator.remove();
                 }
             }
         }
@@ -180,66 +177,66 @@
         {
             //For each Message ID Decrement the reference for each of the queues it was on.
 
-            synchronized (messageMap)
+            if (_logger.isInfoEnabled())
             {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Processing Dequeue for:" + messageIDs);
-                }
+                _logger.info("Processing Dequeue for:" + messageIDs);
+            }
 
-                Iterator<Long> messageIDIterator = messageIDs.iterator();
+            Iterator<Long> messageIDIterator = messageIDs.iterator();
 
-                while(messageIDIterator.hasNext())
-                {
-                    Long messageID = messageIDIterator.next();
-                    List<AMQQueue> queueList = messageMap.get(messageID);
+            while(messageIDIterator.hasNext())
+            {
+                Long messageID = messageIDIterator.next();
+                List<AMQQueue> queueList = messageMap.get(messageID);
 
-                   //Remove this message from our DequeueMap as we are processing it.
-                    messageIDIterator.remove();
+               //Remove this message from our DequeueMap as we are processing it.
+                messageIDIterator.remove();
 
-                    // For each of the queues decrement the reference
-                    for (AMQQueue queue : queueList)
-                    {
-                        List<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+                // For each of the queues decrement the reference
+                for (AMQQueue queue : queueList)
+                {
+                    List<AMQQueue> enqueuedList = _idToQueues.get(messageID);
 
-                        if (_logger.isInfoEnabled())
-                        {
-                            _logger.info("Dequeue message:" + messageID + " from :" + queue);
-                        }
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Dequeue message:" + messageID + " from :" + queue);
+                    }
 
 
-                        // If we have no mapping then this message was only enqueued on a single queue
-                        // This will be the case when we are not in a larger transaction
-                        if (enqueuedList == null)
-                        {
-                            _delegate.removeMessage(removeContext, messageID);
-                        }
-                        else
+                    // If we have no mapping then this message was only enqueued on a single queue
+                    // This will be the case when we are not in a larger transaction
+                    if (enqueuedList == null)
+                    {
+                        _delegate.removeMessage(removeContext, messageID);
+                    }
+                    else
+                    {
+                        //When a message is on more than one queue it is possible that this code section is exectuted
+                        // by one thread per enqueue.
+                        // It is however, thread safe because there is only removes being performed and so the
+                        // last thread that does the remove will see the empty queue and remove the message
+                        // At this stage there is nothing that is going to cause this operation to abort. So we don't
+                        // need to worry about any potential adds.
+                        // The message will no longer be enqueued as that operation has been committed before now so
+                        // this is clean up of the data.
+
+                        //Must synchronize here as this list may have been extracted from _idToQueues in many threads
+                        // and we must ensure only one of them update the list at a time.
+                        synchronized (enqueuedList)
                         {
-                            //When a message is on more than one queue it is possible that this code section is exectuted
-                            // by one thread per enqueue.
-                            // It is however, thread safe because there is only removes being performed and so the
-                            // last thread that does the remove will see the empty queue and remove the message
-                            // At this stage there is nothing that is going to cause this operation to abort. So we don't
-                            // need to worry about any potential adds.
-                            // The message will no longer be enqueued as that operation has been committed before now so
-                            // this is clean up of the data.
-                            synchronized (enqueuedList)
+                            // Update the enqueued list but if the queue is not in the list then we are trying
+                            // to dequeue something that is not there anymore, or was never there.
+                            if (!enqueuedList.remove(queue))
+                            {
+                                throw new UnableToDequeueException(messageID, queue);
+                            }
+
+                            // If the list is now empty then remove the message
+                            if (enqueuedList.isEmpty())
                             {
-                                // Update the enqueued list but if the queue is not in the list then we are trying
-                                // to dequeue something that is not there anymore, or was never there.
-                                if (!enqueuedList.remove(queue))
-                                {
-                                    throw new UnableToDequeueException(messageID, queue);
-                                }
-
-                                // If the list is now empty then remove the message
-                                if (enqueuedList.isEmpty())
-                                {
-                                    _delegate.removeMessage(removeContext, messageID);
-                                    //Remove references list
-                                    _idToQueues.remove(messageID);
-                                }
+                                _delegate.removeMessage(removeContext, messageID);
+                                //Remove references list
+                                _idToQueues.remove(messageID);
                             }
                         }
                     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org