You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/03 16:10:54 UTC

svn commit: r761685 - in /qpid/branches/0.5-release/qpid/java/broker/src: main/java/org/apache/qpid/server/store/ main/java/org/apache/qpid/server/transactionlog/ main/java/org/apache/qpid/server/virtualhost/ test/java/org/apache/qpid/server/queue/ tes...

Author: ritchiem
Date: Fri Apr  3 14:10:54 2009
New Revision: 761685

URL: http://svn.apache.org/viewvc?rev=761685&view=rev
Log:
QPID-1764 : Update to BaseTransactionLog to create a TestableTransactionLog, which will replace TestableMessageStore. Update to BaseTransactionLog/Test to work correctly with transactions
and to fully test that functionality. Updated StoreContext to know when it is in a transaction as relying on a payload being set is not sufficient as that is not set when running with the MessageMemoryStore and so transactional testing in the BTLT was not correct.
    Update to Virtualhost to correctly set the RoutingTable when the specified TransactionLog is wrapped in a BaseTransactionLog.

Merged from r761670 on trunk

Added:
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
Modified:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=761685&r1=761684&r2=761685&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Fri Apr  3 14:10:54 2009
@@ -44,6 +44,7 @@
     private HashMap<Long, ArrayList<AMQQueue>> _enqueueMap;
     private HashMap<Long, ArrayList<AMQQueue>> _dequeueMap;
     private boolean _async;
+    private boolean _inTransaction;
 
     public StoreContext()
     {
@@ -64,6 +65,9 @@
     {
         _name = name;
         _async = asynchrouous;
+        _inTransaction = false;
+        _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+        _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();        
     }
 
     public StoreContext(boolean asynchronous)
@@ -82,7 +86,7 @@
         {
             _logger.debug("public void setPayload(Object payload = " + payload + "): called");
         }
-        _payload = payload;
+        _payload = payload;        
     }
 
     /**
@@ -137,7 +141,7 @@
     }
 
     /**
-     * Record the dequeue for processing on commit
+     * Record the dequeue for processing after the commit 
      *
      * @param queue
      * @param messageId
@@ -146,39 +150,37 @@
      */
     public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException
     {
-        if (inTransaction())
-        {
-            ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
+        ArrayList<AMQQueue> dequeues = _dequeueMap.get(messageId);
 
-            if (dequeues == null)
-            {
-                dequeues = new ArrayList<AMQQueue>();
-                _dequeueMap.put(messageId, dequeues);
-            }
-
-            dequeues.add(queue);
+        if (dequeues == null)
+        {
+            dequeues = new ArrayList<AMQQueue>();
+            _dequeueMap.put(messageId, dequeues);
         }
+
+        dequeues.add(queue);
     }
 
     public void beginTransaction() throws AMQException
     {
-        _enqueueMap = new HashMap<Long, ArrayList<AMQQueue>>();
-        _dequeueMap = new HashMap<Long, ArrayList<AMQQueue>>();
+        _inTransaction = true;
     }
 
     public void commitTransaction() throws AMQException
     {
         _dequeueMap.clear();
+        _inTransaction = false;
     }
 
     public void abortTransaction() throws AMQException
     {
         _enqueueMap.clear();
+        _inTransaction = false;
     }
 
     public boolean inTransaction()
     {
-        return _payload != null;
+        return _inTransaction; //  _payload != null;
     }
 
     public boolean isAsync()

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=761685&r1=761684&r2=761685&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Fri Apr  3 14:10:54 2009
@@ -39,7 +39,7 @@
     private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class);
 
     TransactionLog _delegate;
-    private Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
+    protected Map<Long, ArrayList<AMQQueue>> _idToQueues = new HashMap<Long, ArrayList<AMQQueue>>();
 
     public BaseTransactionLog(TransactionLog delegate)
     {
@@ -60,7 +60,7 @@
     {
         context.enqueueMessage(queues, messageId);
 
-        if (queues.size() > 0)
+        if (queues.size() > 1)
         {
             _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues);
 
@@ -73,10 +73,10 @@
 
     public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
     {
+        context.dequeueMessage(queue, messageId);
+
         if (context.inTransaction())
         {
-            context.dequeueMessage(queue, messageId);
-
             Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
 
             //For each Message ID that is in the map check
@@ -97,11 +97,7 @@
 
         if (!context.inTransaction())
         {
-            HashMap<Long, ArrayList<AMQQueue>> dequeue = new HashMap<Long, ArrayList<AMQQueue>>();
-            ArrayList list = new ArrayList();
-            list.add(queue);
-            dequeue.put(messageId, list);
-            processDequeues(dequeue);
+            processDequeues(context.getDequeueMap());
         }
     }
 
@@ -128,11 +124,7 @@
         //Perform real commit of current data
         _delegate.commitTran(context);
 
-        // If we have dequeues to process then process them
-        if (context.getDequeueMap() != null)
-        {
-            processDequeues(context.getDequeueMap());
-        }
+        processDequeues(context.getDequeueMap());
 
         //Commit the recorded state for this transaction.
         context.commitTransaction();
@@ -141,10 +133,8 @@
     public void abortTran(StoreContext context) throws AMQException
     {
         // If we have enqueues to rollback
-        if (context.getEnqueueMap() != null)
-        {
-            processDequeues(context.getEnqueueMap());
-        }
+        processDequeues(context.getEnqueueMap());
+
         //Abort the recorded state for this transaction.
         context.abortTransaction();
 
@@ -154,6 +144,12 @@
     private void processDequeues(Map<Long, ArrayList<AMQQueue>> messageMap)
             throws AMQException
     {
+        // Check we have dequeues to process then process them
+        if (messageMap == null || messageMap.isEmpty())
+        {
+             return;
+        }
+
         // Process any enqueues to bring our model up to date.
         Set<Long> messageIDs = messageMap.keySet();
 
@@ -190,6 +186,8 @@
                         if (enqueuedList.isEmpty())
                         {
                             _delegate.removeMessage(removeContext, messageID);
+                            //Remove references list
+                            _idToQueues.remove(messageID);
                         }
                     }
                 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=761685&r1=761684&r2=761685&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Apr  3 14:10:54 2009
@@ -295,14 +295,23 @@
         }
         _transactionLog = (TransactionLog) o;
 
+        // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+        _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
+
         //Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable.
         if (_transactionLog instanceof RoutingTable)
         {
             _routingTable = (RoutingTable) _transactionLog;
         }
+        else if (_transactionLog instanceof BaseTransactionLog)
+        {
+            TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate();
+            if (delegate instanceof RoutingTable)
+            {
+                _routingTable = (RoutingTable) delegate;
+            }
+        }
 
-        // If a TransactionLog uses the BaseTransactionLog then it will return this object.
-        _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config);
     }
 
     //todo we need to move from store.class to transactionlog.class

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=761685&r1=761684&r2=761685&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Apr  3 14:10:54 2009
@@ -390,6 +390,7 @@
         {
             sendMessage(txnContext);
 
+            // This check may be too soon as a purging thread may be required to bring the queue back under quota.
             long usage = _queue.getMemoryUsageCurrent();
             assertTrue("Queue has gone over quota:" + usage,
                        usage <= _queue.getMemoryUsageMaximum());

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java?rev=761685&r1=761684&r2=761685&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java Fri Apr  3 14:10:54 2009
@@ -21,11 +21,12 @@
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.transactionlog.TransactionLog;
 
 import java.util.Map;
 import java.util.List;
 
-public interface TestTransactionLog
+public interface TestTransactionLog extends TransactionLog
 {
     public List<AMQQueue> getMessageReferenceMap(Long messageID);
 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java?rev=761685&r1=761684&r2=761685&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Fri Apr  3 14:10:54 2009
@@ -37,6 +37,7 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class BaseTransactionLogTest extends TestCase implements TransactionLog
@@ -46,14 +47,14 @@
     final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
     final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
 
-    BaseTransactionLog _transactionLog;
+    TestableTransactionLog _transactionLog;
     private ArrayList<AMQQueue> _queues;
     private MockPersistentAMQMessage _message;
 
     public void setUp() throws Exception
     {
         super.setUp();
-        _transactionLog = new BaseTransactionLog(this);
+        _transactionLog = new TestableTransactionLog(this);
     }
 
     public void testSingleEnqueueNoTransactional() throws AMQException
@@ -87,11 +88,9 @@
         // Enqueue a message to dequeue
         testSingleEnqueueNoTransactional();
 
-        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
 
-        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+        verifyMessageRemoved(_message.getMessageId());
     }
 
     public void testSingleEnqueueTransactional() throws AMQException
@@ -137,16 +136,13 @@
 
         _transactionLog.beginTran(context);
 
-        _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId());
+        _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId());
 
         _transactionLog.commitTran(context);
 
-        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+        verifyMessageRemoved(_message.getMessageId());
     }
 
-
     public void testMultipleEnqueueNoTransactional() throws AMQException
     {
         //Store Data
@@ -185,34 +181,54 @@
         // Enqueue a message to dequeue
         testMultipleEnqueueNoTransactional();
 
-        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId());
+        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
 
         ArrayList<AMQQueue> enqueued = _enqueues.get(_message.getMessageId());
-        assertNotNull("Message not enqueued", enqueued);
-        assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0)));
-        assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size());
 
-        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
-        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+        assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+        _queues.remove(0);
 
+        verifyEnqueuedOnQueues(_message.getMessageId(), _queues);
+        verifyMessageStored(_message.getMessageId());
 
-        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId());
+        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
 
-        enqueued = _enqueues.get(_message.getMessageId());
-        assertNotNull("Message not enqueued", enqueued);
-        assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1)));
-        assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size());
-        
-        assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
-        assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
+        assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0)));
+        _queues.remove(0);
 
-        _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId());
+        ArrayList<AMQQueue> enqueues = _enqueues.get(_message.getMessageId());
 
-        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+        assertNotNull("Message not enqueued", enqueues);
+        assertEquals("Message is not enqueued on the right number of queues", _queues.size(), enqueues.size());
+        for (AMQQueue queue : _queues)
+        {
+            assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
+        }
+
+        //Use the reference map to ensure that we are enqueuing the right number of messages
+        List<AMQQueue> references = _transactionLog.getMessageReferenceMap(_message.getMessageId());
+
+        assertNotNull("Message not enqueued", references);
+        assertEquals("Message is not enqueued on the right number of queues", _queues.size(), references.size());
+        for (AMQQueue queue : references)
+        {
+            assertTrue("Message not enqueued on:" + queue, references.contains(queue));
+        }
+
+        verifyMessageStored(_message.getMessageId());
+
+        _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId());
+
+        verifyMessageRemoved(_message.getMessageId());
     }
 
+    private void verifyMessageRemoved(Long messageID)
+    {
+        assertNull("Message references exist", _transactionLog.getMessageReferenceMap(messageID));
+        assertNull("Message enqueued", _enqueues.get(messageID));
+        assertNull("Message chunks enqueued", _storeChunks.get(messageID));
+        assertNull("Message meta data enqueued", _storeMetaData.get(messageID));
+    }
 
     public void testMultipleEnqueueTransactional() throws AMQException
     {
@@ -294,12 +310,10 @@
 
         _transactionLog.commitTran(context);
 
-        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+        verifyMessageRemoved(_message.getMessageId());
     }
 
-     public void testMultipleDequeueSingleTransaction() throws AMQException
+    public void testMultipleDequeueSingleTransaction() throws AMQException
     {
         // Enqueue a message to dequeue
         testMultipleEnqueueTransactional();
@@ -318,10 +332,8 @@
         assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
         assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
 
-
         _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId());
 
-
         enqueued = _enqueues.get(_message.getMessageId());
         assertNotNull("Message not enqueued", enqueued);
         assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1)));
@@ -330,14 +342,11 @@
         assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId()));
         assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId()));
 
-
         _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId());
 
         _transactionLog.commitTran(context);
 
-        assertNull("Message enqueued", _enqueues.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeChunks.get(_message.getMessageId()));
-        assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId()));
+        verifyMessageRemoved(_message.getMessageId());
     }
 
     private void verifyMessageStored(Long messageId)
@@ -356,6 +365,23 @@
         {
             assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue));
         }
+
+        //Use the reference map to ensure that we are enqueuing the right number of messages
+        List<AMQQueue> references = _transactionLog.getMessageReferenceMap(messageId);
+
+        if (queues.size() == 1)
+        {
+            assertNull("Message has an enqueued list", references);
+        }
+        else
+        {
+            assertNotNull("Message not enqueued", references);
+            assertEquals("Message is not enqueued on the right number of queues", queues.size(), references.size());
+            for (AMQQueue queue : references)
+            {
+                assertTrue("Message not enqueued on:" + queue, references.contains(queue));
+            }
+        }
     }
 
     /*************************** TransactionLog *******************************
@@ -419,19 +445,42 @@
 
         if (queues == null)
         {
-            throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
-                                       "queue(" + queue + ") but no enqueue data available");
-        }
+            boolean found = false;
+            // If we are in a transaction we may have already done the dequeue.
+            if (context.inTransaction())
+            {
 
-        synchronized (queues)
-        {
-            if (!queues.contains(queue))
+                for (Object record : (ArrayList) context.getPayload())
+                {
+                    if (record instanceof RemoveRecord)
+                    {
+                        if (((RemoveRecord) record)._messageId.equals(messageId))
+                        {
+                            found = true;
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if (!found)
             {
                 throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
-                                           "queue(" + queue + ") but no message not enqueued on queue");
+                                           "queue(" + queue + ") but no enqueue data available");
+            }
+        }
+        else
+        {
+            synchronized (queues)
+            {
+                if (!queues.contains(queue))
+                {
+                    throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " +
+                                               "queue(" + queue + ") but no message not enqueued on queue");
+                }
+
+                queues.remove(queue);
             }
-                       
-            queues.remove(queue);
         }
     }
 
@@ -450,21 +499,29 @@
                                        "no enqueue data available");
         }
 
-        if (!queues.isEmpty())
+        if (queues.size() > 1)
         {
             throw new RuntimeException("Removed a message(" + messageId + ") that still had references.");
         }
 
+        MessageMetaData mmd;
         synchronized (_storeMetaData)
         {
-            _storeMetaData.remove(messageId);
+            mmd = _storeMetaData.remove(messageId);
         }
 
+        ArrayList<ContentChunk> chunks;
         synchronized (_storeChunks)
         {
-            _storeChunks.remove(messageId);
+            chunks = _storeChunks.remove(messageId);
         }
 
+        //Record the remove for part of the transaction
+        if (context.inTransaction())
+        {
+            ArrayList transactionData = (ArrayList) context.getPayload();
+            transactionData.add(new RemoveRecord(messageId, queues, mmd, chunks));
+        }
     }
 
     //
@@ -474,7 +531,7 @@
 
     public void beginTran(StoreContext context) throws AMQException
     {
-        context.setPayload(new Object());
+        context.setPayload(new ArrayList());
     }
 
     public void commitTran(StoreContext context) throws AMQException
@@ -532,4 +589,20 @@
     {
         return false;
     }
+
+    class RemoveRecord
+    {
+        MessageMetaData _mmd;
+        ArrayList<AMQQueue> _queues;
+        ArrayList<ContentChunk> _chunks;
+        Long _messageId;
+
+        RemoveRecord(Long messageId, ArrayList<AMQQueue> queues, MessageMetaData mmd, ArrayList<ContentChunk> chunks)
+        {
+            _messageId = messageId;
+            _queues = queues;
+            _mmd = mmd;
+            _chunks = chunks;
+        }
+    }
 }

Added: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java?rev=761685&view=auto
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java (added)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java Fri Apr  3 14:10:54 2009
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.routing.RoutingTable;
+
+import java.util.List;
+import java.util.LinkedList;
+
+public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog
+{
+
+    List<Long> _singleEnqueues = new LinkedList<Long>();
+
+    public TestableTransactionLog()
+    {
+        super(null);
+    }
+
+    public TestableTransactionLog(BaseTransactionLog delegate)
+    {
+        super(delegate.getDelegate());
+    }
+
+    public TestableTransactionLog(TransactionLog delegate)
+    {
+        super(delegate);
+    }
+
+
+    @Override
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    {
+        if (_delegate != null)
+        {
+            TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config);
+
+            // Unwrap any BaseTransactionLog
+            if (configuredLog instanceof BaseTransactionLog)
+            {
+                _delegate = ((BaseTransactionLog)configuredLog).getDelegate();
+            }
+        }
+        else
+        {
+                String delegateClass = config.getStoreConfiguration().getString("delegate");
+            Class clazz = Class.forName(delegateClass);
+            Object o = clazz.newInstance();
+
+            if (!(o instanceof TransactionLog))
+            {
+                throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
+                                             " does not.");
+            }
+            _delegate = (TransactionLog) o;
+
+            // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+             _delegate.configure(virtualHost, base, config);
+        }
+        return this;
+    }
+
+    public List<AMQQueue> getMessageReferenceMap(Long messageID)
+    {
+        return _idToQueues.get(messageID);
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org