You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/09 09:41:58 UTC
svn commit: r763548 - in
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server:
store/StoreContext.java transactionlog/BaseTransactionLog.java
Author: ritchiem
Date: Thu Apr 9 07:41:58 2009
New Revision: 763548
URL: http://svn.apache.org/viewvc?rev=763548&view=rev
Log:
QPID-1794 : Removed unnecessary synchronisation
Merged from trunk r763363
Modified:
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=763548&r1=763547&r2=763548&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Thu Apr 9 07:41:58 2009
@@ -66,7 +66,7 @@
_name = name;
_async = asynchrouous;
_inTransaction = false;
- _dequeueMap = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>());
+ _dequeueMap = new HashMap<Long, List<AMQQueue>>();
}
public StoreContext(boolean asynchronous)
@@ -115,13 +115,10 @@
{
List<AMQQueue> dequeues = _dequeueMap.get(messageId);
- synchronized (_dequeueMap)
+ if (dequeues == null)
{
- if (dequeues == null)
- {
- dequeues = Collections.synchronizedList(new ArrayList<AMQQueue>());
- _dequeueMap.put(messageId, dequeues);
- }
+ dequeues = new ArrayList<AMQQueue>();
+ _dequeueMap.put(messageId, dequeues);
}
dequeues.add(queue);
Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=763548&r1=763547&r2=763548&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Thu Apr 9 07:41:58 2009
@@ -69,7 +69,7 @@
}
//Clone the list incase someone else changes it.
- _idToQueues.put(messageId, (List<AMQQueue>)Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
+ _idToQueues.put(messageId, Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
}
_delegate.enqueueMessage(context, queues, messageId);
@@ -87,27 +87,24 @@
//For each Message ID that is in the map check
Set<Long> messageIDs = messageMap.keySet();
- synchronized (messageMap)
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Pre-Processing single dequeue of:" + messageIDs);
- }
+ _logger.info("Pre-Processing single dequeue of:" + messageIDs);
+ }
+
+ Iterator iterator = messageIDs.iterator();
- Iterator iterator = messageIDs.iterator();
-
- while (iterator.hasNext())
+ while (iterator.hasNext())
+ {
+ Long messageID = (Long) iterator.next();
+ //If we don't have a gloabl reference for this message then there is only a single enqueue
+ //can check here to see if this is the last reference?
+ if (_idToQueues.get(messageID) == null)
{
- Long messageID = (Long) iterator.next();
- //If we don't have a gloabl reference for this message then there is only a single enqueue
- //can check here to see if this is the last reference?
- if (_idToQueues.get(messageID) == null)
- {
- // Add the removal of the message to this transaction
- _delegate.removeMessage(context, messageID);
- // Remove this message ID as we have processed it so we don't reprocess after the main commmit
- iterator.remove();
- }
+ // Add the removal of the message to this transaction
+ _delegate.removeMessage(context, messageID);
+ // Remove this message ID as we have processed it so we don't reprocess after the main commmit
+ iterator.remove();
}
}
}
@@ -180,66 +177,66 @@
{
//For each Message ID Decrement the reference for each of the queues it was on.
- synchronized (messageMap)
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Processing Dequeue for:" + messageIDs);
- }
+ _logger.info("Processing Dequeue for:" + messageIDs);
+ }
- Iterator<Long> messageIDIterator = messageIDs.iterator();
+ Iterator<Long> messageIDIterator = messageIDs.iterator();
- while(messageIDIterator.hasNext())
- {
- Long messageID = messageIDIterator.next();
- List<AMQQueue> queueList = messageMap.get(messageID);
+ while(messageIDIterator.hasNext())
+ {
+ Long messageID = messageIDIterator.next();
+ List<AMQQueue> queueList = messageMap.get(messageID);
- //Remove this message from our DequeueMap as we are processing it.
- messageIDIterator.remove();
+ //Remove this message from our DequeueMap as we are processing it.
+ messageIDIterator.remove();
- // For each of the queues decrement the reference
- for (AMQQueue queue : queueList)
- {
- List<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+ // For each of the queues decrement the reference
+ for (AMQQueue queue : queueList)
+ {
+ List<AMQQueue> enqueuedList = _idToQueues.get(messageID);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Dequeue message:" + messageID + " from :" + queue);
- }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Dequeue message:" + messageID + " from :" + queue);
+ }
- // If we have no mapping then this message was only enqueued on a single queue
- // This will be the case when we are not in a larger transaction
- if (enqueuedList == null)
- {
- _delegate.removeMessage(removeContext, messageID);
- }
- else
+ // If we have no mapping then this message was only enqueued on a single queue
+ // This will be the case when we are not in a larger transaction
+ if (enqueuedList == null)
+ {
+ _delegate.removeMessage(removeContext, messageID);
+ }
+ else
+ {
+ //When a message is on more than one queue it is possible that this code section is exectuted
+ // by one thread per enqueue.
+ // It is however, thread safe because there is only removes being performed and so the
+ // last thread that does the remove will see the empty queue and remove the message
+ // At this stage there is nothing that is going to cause this operation to abort. So we don't
+ // need to worry about any potential adds.
+ // The message will no longer be enqueued as that operation has been committed before now so
+ // this is clean up of the data.
+
+ //Must synchronize here as this list may have been extracted from _idToQueues in many threads
+ // and we must ensure only one of them update the list at a time.
+ synchronized (enqueuedList)
{
- //When a message is on more than one queue it is possible that this code section is exectuted
- // by one thread per enqueue.
- // It is however, thread safe because there is only removes being performed and so the
- // last thread that does the remove will see the empty queue and remove the message
- // At this stage there is nothing that is going to cause this operation to abort. So we don't
- // need to worry about any potential adds.
- // The message will no longer be enqueued as that operation has been committed before now so
- // this is clean up of the data.
- synchronized (enqueuedList)
+ // Update the enqueued list but if the queue is not in the list then we are trying
+ // to dequeue something that is not there anymore, or was never there.
+ if (!enqueuedList.remove(queue))
+ {
+ throw new UnableToDequeueException(messageID, queue);
+ }
+
+ // If the list is now empty then remove the message
+ if (enqueuedList.isEmpty())
{
- // Update the enqueued list but if the queue is not in the list then we are trying
- // to dequeue something that is not there anymore, or was never there.
- if (!enqueuedList.remove(queue))
- {
- throw new UnableToDequeueException(messageID, queue);
- }
-
- // If the list is now empty then remove the message
- if (enqueuedList.isEmpty())
- {
- _delegate.removeMessage(removeContext, messageID);
- //Remove references list
- _idToQueues.remove(messageID);
- }
+ _delegate.removeMessage(removeContext, messageID);
+ //Remove references list
+ _idToQueues.remove(messageID);
}
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org