You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/09 09:37:52 UTC
svn commit: r763546 - in /qpid/branches/0.5-release/qpid/java/broker/src:
main/java/org/apache/qpid/server/store/
main/java/org/apache/qpid/server/transactionlog/
test/java/org/apache/qpid/server/transactionlog/
Author: ritchiem
Date: Thu Apr 9 07:37:52 2009
New Revision: 763546
URL: http://svn.apache.org/viewvc?rev=763546&view=rev
Log:
QPID-1794 : Clear the StoreContext after non-transactional processing. Updated BaseTransactionLog to synchronize the on the enqueued messages from the _idToQueue Map as this will be being modified by many ack-ing threads and closing/requeue threads. Updated BaseTransactionLogTest so that it uses a single StoreContext per test rather than a fresh context for each operation. This was masking the problem.
Modified:
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=763546&r1=763545&r2=763546&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Thu Apr 9 07:37:52 2009
@@ -27,6 +27,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
+import java.util.Collections;
/**
* A context that the store can use to associate with a transactional context. For example, it could store
@@ -41,8 +43,7 @@
private static final String DEFAULT_NAME = "StoreContext";
private String _name;
private Object _payload;
- private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
- private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
+ private Map<Long, List<AMQQueue>> _dequeueMap;
private boolean _async;
private boolean _inTransaction;
@@ -53,12 +54,11 @@
public StoreContext(String name)
{
- this(name,false);
+ this(name, false);
}
/**
- *
- * @param name The name of this Transaction
+ * @param name The name of this Transaction
* @param asynchrouous Is this Transaction Asynchronous
*/
public StoreContext(String name, boolean asynchrouous)
@@ -66,8 +66,7 @@
_name = name;
_async = asynchrouous;
_inTransaction = false;
- _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
- _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+ _dequeueMap = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>());
}
public StoreContext(boolean asynchronous)
@@ -86,7 +85,7 @@
{
_logger.debug("public void setPayload(Object payload = " + payload + "): called");
}
- _payload = payload;
+ _payload = payload;
}
/**
@@ -99,49 +98,13 @@
return "<_name = " + _name + ", _payload = " + _payload + ">";
}
- public Map<Long, ArrayList<AMQQueue>> getEnqueueMap()
- {
- return _enqueueMap;
- }
-
- public Map<Long, ArrayList<AMQQueue>> getDequeueMap()
+ public Map<Long, List<AMQQueue>> getDequeueMap()
{
return _dequeueMap;
}
/**
- * Record the enqueues for processing if we abort
- *
- * @param queues
- * @param messageId
- *
- * @throws AMQException
- */
- public void enqueueMessage(ArrayList<AMQQueue> queues, Long messageId) throws AMQException
- {
- if (inTransaction())
- {
- ArrayList<AMQQueue> enqueues = _enqueueMap.get(messageId);
-
- if (enqueues == null)
- {
- enqueues = new ArrayList<AMQQueue>();
- _enqueueMap.put(messageId, enqueues);
- }
-
- for (AMQQueue q : queues)
- {
- if (!enqueues.contains(q))
- {
- enqueues.add(q);
- }
- }
-
- }
- }
-
- /**
- * Record the dequeue for processing after the commit
+ * Record the dequeue for processing after the commit
*
* @param queue
* @param messageId
@@ -150,15 +113,22 @@
*/
public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
{
- ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+ List<AMQQueue> dequeues = _dequeueMap.get(messageId);
- if (dequeues == null)
+ synchronized (_dequeueMap)
{
- dequeues = new ArrayList<AMQQueue>();
- _dequeueMap.put(messageId, dequeues);
+ if (dequeues == null)
+ {
+ dequeues = Collections.synchronizedList(new ArrayList<AMQQueue>());
+ _dequeueMap.put(messageId, dequeues);
+ }
}
dequeues.add(queue);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Added (" + messageId + ") to dequeues:" + dequeues);
+ }
}
public void beginTransaction() throws AMQException
@@ -174,13 +144,13 @@
public void abortTransaction() throws AMQException
{
- _enqueueMap.clear();
+ _dequeueMap.clear();
_inTransaction = false;
}
public boolean inTransaction()
{
- return _inTransaction; // _payload != null;
+ return _inTransaction;
}
public boolean isAsync()
Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=763546&r1=763545&r2=763546&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Thu Apr 9 07:37:52 2009
@@ -31,16 +31,18 @@
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.List;
public class BaseTransactionLog implements TransactionLog
{
private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
TransactionLog _delegate;
- protected Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+ protected Map<Long, List<AMQQueue>> _idToQueues = Collections.synchronizedMap(new HashMap<Long, List<AMQQueue>>());
public BaseTransactionLog(TransactionLog delegate)
{
@@ -59,14 +61,15 @@
public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
- context.enqueueMessage(queues, messageId);
-
if (queues.size() > 1)
{
- _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
+ }
//Clone the list incase someone else changes it.
- _idToQueues.put(messageId, (ArrayList) queues.clone());
+ _idToQueues.put(messageId, (List<AMQQueue>)Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
}
_delegate.enqueueMessage(context, queues, messageId);
@@ -78,21 +81,33 @@
if (context.inTransaction())
{
- Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
+
+ Map<Long, List<AMQQueue>> messageMap = context.getDequeueMap();
//For each Message ID that is in the map check
- Iterator iterator = messageMap.keySet().iterator();
+ Set<Long> messageIDs = messageMap.keySet();
- while (iterator.hasNext())
+ synchronized (messageMap)
{
- Long messageID = (Long) iterator.next();
- //If we don't have a gloabl reference for this message then there is only a single enqueue
- if (_idToQueues.get(messageID) == null)
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Pre-Processing single dequeue of:" + messageIDs);
+ }
+
+ Iterator iterator = messageIDs.iterator();
+
+ while (iterator.hasNext())
{
- // Add the removal of the message to this transaction
- _delegate.removeMessage(context,messageID);
- // Remove this message ID as we have processed it so we don't reprocess after the main commmit
- iterator.remove();
+ Long messageID = (Long) iterator.next();
+ //If we don't have a gloabl reference for this message then there is only a single enqueue
+ //can check here to see if this is the last reference?
+ if (_idToQueues.get(messageID) == null)
+ {
+ // Add the removal of the message to this transaction
+ _delegate.removeMessage(context, messageID);
+ // Remove this message ID as we have processed it so we don't reprocess after the main commmit
+ iterator.remove();
+ }
}
}
}
@@ -136,22 +151,19 @@
public void abortTran(StoreContext context) throws AMQException
{
- // If we have enqueues to rollback
- processDequeues(context.getEnqueueMap());
-
//Abort the recorded state for this transaction.
context.abortTransaction();
_delegate.abortTran(context);
}
- private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
+ private void processDequeues(Map<Long, List<AMQQueue>> messageMap)
throws AMQException
{
// Check we have dequeues to process then process them
if (messageMap == null || messageMap.isEmpty())
{
- return;
+ return;
}
// Process any enqueues to bring our model up to date.
@@ -162,50 +174,77 @@
//Batch Process the Dequeues on the delegate
_delegate.beginTran(removeContext);
+ removeContext.beginTransaction();
try
{
//For each Message ID Decrement the reference for each of the queues it was on.
- for (Long messageID : messageIDs)
+
+ synchronized (messageMap)
{
- ArrayList<AMQQueue> queueList = messageMap.get(messageID);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Processing Dequeue for:" + messageIDs);
+ }
+
+ Iterator<Long> messageIDIterator = messageIDs.iterator();
- // For each of the queues decrement the reference
- for (AMQQueue queue : queueList)
+ while(messageIDIterator.hasNext())
{
- ArrayList<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+ Long messageID = messageIDIterator.next();
+ List<AMQQueue> queueList = messageMap.get(messageID);
- // If we have no mapping then this message was only enqueued on a single queue
- // This will be the case when we are not in a larger transaction
- if (enqueuedList == null)
- {
- _delegate.removeMessage(removeContext, messageID);
- }
- else
+ //Remove this message from our DequeueMap as we are processing it.
+ messageIDIterator.remove();
+
+ // For each of the queues decrement the reference
+ for (AMQQueue queue : queueList)
{
- //When a message is on more than one queue it is possible that this code section is exectuted
- // by one thread per enqueue.
- // It is however, thread safe because there is only removes being performed and so the
- // last thread that does the remove will see the empty queue and remove the message
- // At this stage there is nothing that is going to cause this operation to abort. So we don't
- // need to worry about any potential adds.
- // The message will no longer be enqueued as that operation has been committed before now so
- // this is clean up of the data.
+ List<AMQQueue> enqueuedList = _idToQueues.get(messageID);
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Dequeue message:" + messageID + " from :" + queue);
+ }
- // Update the enqueued list
- enqueuedList.remove(queue);
- // If the list is now empty then remove the message
- if (enqueuedList.isEmpty())
+ // If we have no mapping then this message was only enqueued on a single queue
+ // This will be the case when we are not in a larger transaction
+ if (enqueuedList == null)
{
_delegate.removeMessage(removeContext, messageID);
- //Remove references list
- _idToQueues.remove(messageID);
+ }
+ else
+ {
+ //When a message is on more than one queue it is possible that this code section is exectuted
+ // by one thread per enqueue.
+ // It is however, thread safe because there is only removes being performed and so the
+ // last thread that does the remove will see the empty queue and remove the message
+ // At this stage there is nothing that is going to cause this operation to abort. So we don't
+ // need to worry about any potential adds.
+ // The message will no longer be enqueued as that operation has been committed before now so
+ // this is clean up of the data.
+ synchronized (enqueuedList)
+ {
+ // Update the enqueued list but if the queue is not in the list then we are trying
+ // to dequeue something that is not there anymore, or was never there.
+ if (!enqueuedList.remove(queue))
+ {
+ throw new UnableToDequeueException(messageID, queue);
+ }
+
+ // If the list is now empty then remove the message
+ if (enqueuedList.isEmpty())
+ {
+ _delegate.removeMessage(removeContext, messageID);
+ //Remove references list
+ _idToQueues.remove(messageID);
+ }
+ }
}
}
}
}
-
//Commit the removes on the delegate.
_delegate.commitTran(removeContext);
// Mark this context as committed.
@@ -244,4 +283,18 @@
{
return _delegate;
}
+
+ private class UnableToDequeueException extends RuntimeException
+ {
+ Long _messageID;
+ AMQQueue _queue;
+
+ public UnableToDequeueException(Long messageID, AMQQueue queue)
+ {
+ super("Unable to dequeue message(" + messageID + ") from queue " +
+ "(" + queue + ") it is not/nolonger enqueued on it.");
+ _messageID = messageID;
+ _queue = queue;
+ }
+ }
}
Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java?rev=763546&r1=763545&r2=763546&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java Thu Apr 9 07:37:52 2009
@@ -77,6 +77,8 @@
/**
* Places a message onto a specified queue, in a given transactional context.
*
+ * This method need not be thread safe as it is only called by the message delivery thread
+ *
* @param context The transactional context for the operation.
* @param queues
*@param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException
@@ -86,6 +88,8 @@
/**
* Extracts a message from a specified queue, in a given transactional context.
*
+ * This method MUST be thread safe as dequeue will be called by multiple threads, ack, requeue, delivery thread
+ *
* @param context The transactional context for the operation.
* @param queue
* @param messageId The message to dequeue. @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
@@ -95,6 +99,8 @@
/**
* Remove the specified message from the log
*
+ * This method MUST be thread safe as dequeue will be called by multiple threads, ack, requeue, delivery thread
+ *
* @param context The transactional context for the operation
* @param messageId The message to remove
* @throws AMQException
Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java?rev=763546&r1=763545&r2=763546&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Thu Apr 9 07:37:52 2009
@@ -51,11 +51,13 @@
TestTransactionLog _transactionLog;
private ArrayList<AMQQueue> _queues;
private MockPersistentAMQMessage _message;
+ StoreContext _context;
public void setUp() throws Exception
{
super.setUp();
_transactionLog = new TestableBaseTransactionLog(this);
+ _context = new StoreContext();
}
public void testSingleEnqueueNoTransactional() throws AMQException
@@ -64,13 +66,13 @@
_message = new MockPersistentAMQMessage(1L, this);
- _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+ _message.addContentBodyFrame(_context, new MockContentChunk(100), true);
MessagePublishInfo mpi = new MessagePublishInfoImpl();
ContentHeaderBody chb = new ContentHeaderBody();
- _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+ _message.setPublishAndContentHeaderBody(_context, mpi, chb);
verifyMessageStored(_message.getMessageId());
// Enqueue
@@ -79,7 +81,7 @@
MockAMQQueue queue = new MockAMQQueue(this.getName());
_queues.add(queue);
- _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+ _transactionLog.enqueueMessage(_context, _queues, _message.getMessageId());
verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
}
@@ -89,14 +91,14 @@
// Enqueue a message to dequeue
testSingleEnqueueNoTransactional();
- _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
verifyMessageRemoved(_message.getMessageId());
}
public void testSingleEnqueueTransactional() throws AMQException
{
- StoreContext context = new StoreContext();
+ StoreContext context = _context;
_transactionLog.beginTran(context);
@@ -133,7 +135,7 @@
// Enqueue a message to dequeue
testSingleEnqueueTransactional();
- StoreContext context = new StoreContext();
+ StoreContext context = _context;
_transactionLog.beginTran(context);
@@ -150,13 +152,13 @@
_message = new MockPersistentAMQMessage(1L, this);
- _message.addContentBodyFrame(new StoreContext(), new MockContentChunk(100), true);
+ _message.addContentBodyFrame(_context, new MockContentChunk(100), true);
MessagePublishInfo mpi = new MessagePublishInfoImpl();
ContentHeaderBody chb = new ContentHeaderBody();
- _message.setPublishAndContentHeaderBody(new StoreContext(), mpi, chb);
+ _message.setPublishAndContentHeaderBody(_context, mpi, chb);
verifyMessageStored(_message.getMessageId());
// Enqueue
@@ -172,7 +174,7 @@
queue = new MockAMQQueue(this.getName() + "3");
_queues.add(queue);
- _transactionLog.enqueueMessage(new StoreContext(), _queues, _message.getMessageId());
+ _transactionLog.enqueueMessage(_context, _queues, _message.getMessageId());
verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
}
@@ -182,7 +184,7 @@
// Enqueue a message to dequeue
testMultipleEnqueueNoTransactional();
- _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
@@ -192,7 +194,7 @@
verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
verifyMessageStored(_message.getMessageId());
- _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
_queues.remove(0);
@@ -218,7 +220,7 @@
verifyMessageStored(_message.getMessageId());
- _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+ _transactionLog.dequeueMessage(_context, _queues.get(0), _message.getMessageId());
verifyMessageRemoved(_message.getMessageId());
}
@@ -233,7 +235,7 @@
public void testMultipleEnqueueTransactional() throws AMQException
{
- StoreContext context = new StoreContext();
+ StoreContext context = _context;
_transactionLog.beginTran(context);
@@ -276,7 +278,7 @@
// Enqueue a message to dequeue
testMultipleEnqueueTransactional();
- StoreContext context = new StoreContext();
+ StoreContext context = _context;
_transactionLog.beginTran(context);
@@ -319,7 +321,7 @@
// Enqueue a message to dequeue
testMultipleEnqueueTransactional();
- StoreContext context = new StoreContext();
+ StoreContext context = _context;
_transactionLog.beginTran(context);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org