You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/03 20:00:24 UTC

svn commit: r761742 - in /qpid/branches/0.5-release/qpid/java/broker/src: main/java/org/apache/qpid/server/store/ main/java/org/apache/qpid/server/transactionlog/ test/java/org/apache/qpid/server/ack/ test/java/org/apache/qpid/server/queue/ test/java/o...

Author: ritchiem
Date: Fri Apr  3 18:00:24 2009
New Revision: 761742

URL: http://svn.apache.org/viewvc?rev=761742&view=rev
Log:
QPID-1764 : Resolved ConcurrentME. Perils of using the 'syntax sugar' for loop hides the message iterator that you need to call .remove(). Calling remove on the underlying Map will cause the resulting CME.

Merged from r761700 from trunk

QPID-1764 : Updated all tests to use the TestTransactionLog interface and split testing code into subclasses. TestableTransactionLog will now correctly wrap a TransactionLog for testing. To enable testing of the BaseTransactionLog a TestableBaseTransactionLog was needed to only return values that are actually stored in the BaseTL the TestableTransactionLog actually stores single enqueues so that they can be queried by the test.

Merged from r761741 from trunk

Added:
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java
Modified:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Apr  3 18:00:24 2009
@@ -51,7 +51,7 @@
  */
 public class MemoryMessageStore implements TransactionLog, RoutingTable
 {
-    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+    protected static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
 
     private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
 
@@ -154,13 +154,7 @@
 
     public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
-        for (AMQQueue q : queues)
-        {
-            if (q.isDurable())
-            {
-                enqueueMessage(context,q,messageId);
-            }
-        }
+        // Not required to do anything
     }
 
     public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
@@ -232,25 +226,13 @@
         _metaDataMap.put(messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
-    {
-        checkNotClosed();
-        return _metaDataMap.get(messageId);
-    }
-
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
-    {
-        checkNotClosed();
-        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
-        return bodyList.get(index);
-    }
 
     public boolean isPersistent()
     {
         return false;
     }
 
-    private void checkNotClosed() throws MessageStoreClosedException
+    protected void checkNotClosed() throws MessageStoreClosedException
     {
         if (_closed.get())
         {

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Fri Apr  3 18:00:24 2009
@@ -33,6 +33,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Iterator;
 
 public class BaseTransactionLog implements TransactionLog
 {
@@ -80,15 +81,18 @@
             Map<Long, ArrayList<AMQQueue>> messageMap = context.getDequeueMap();
 
             //For each Message ID that is in the map check
-            for (Long messageID : messageMap.keySet())
+            Iterator iterator = messageMap.keySet().iterator();
+
+            while (iterator.hasNext())
             {
+                Long messageID = (Long) iterator.next();
                 //If we don't have a gloabl reference for this message then there is only a single enqueue
                 if (_idToQueues.get(messageID) == null)
                 {
                     // Add the removal of the message to this transaction
                     _delegate.removeMessage(context,messageID);
                     // Remove this message ID as we have processed it so we don't reprocess after the main commmit
-                    messageMap.remove(messageID);
+                    iterator.remove();
                 }
             }
         }
@@ -179,6 +183,15 @@
                     }
                     else
                     {
+                        //When a message is on more than one queue it is possible that this code section is exectuted
+                        // by one thread per enqueue.
+                        // It is however, thread safe because there is only removes being performed and so the
+                        // last thread that does the remove will see the empty queue and remove the message
+                        // At this stage there is nothing that is going to cause this operation to abort. So we don't
+                        // need to worry about any potential adds.
+                        // The message will no longer be enqueued as that operation has been committed before now so
+                        // this is clean up of the data.                        
+
                         // Update the enqueued list
                         enqueuedList.remove(queue);
 
@@ -195,6 +208,8 @@
 
             //Commit the removes on the delegate.
             _delegate.commitTran(removeContext);
+            // Mark this context as committed.
+            removeContext.commitTransaction();
         }
         finally
         {

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Fri Apr  3 18:00:24 2009
@@ -103,7 +103,7 @@
         private final List<Long> _unacked;
         private StoreContext _storeContext = new StoreContext();
 		private AMQQueue _queue;
-        private TransactionLog _transactionLog = new TestableMemoryMessageStore();
+        private TransactionLog _transactionLog = new TestableMemoryMessageStore().configure();
 
         private static final int MESSAGE_SIZE=100;
 

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Apr  3 18:00:24 2009
@@ -32,6 +32,7 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactory;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -43,14 +44,13 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.mina.common.ByteBuffer;
 
 import javax.management.JMException;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Test class to test AMQQueueMBean attribtues and operations
@@ -70,7 +70,7 @@
     public void testMessageCountTransient() throws Exception
     {
         int messageCount = 10;
-        sendMessages(messageCount, false);
+        List<AMQMessage> messages = sendMessages(messageCount, false);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
         long queueDepth = (messageCount * MESSAGE_SIZE);
@@ -85,13 +85,13 @@
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
 
         //Ensure that the data has been removed from the Store
-        verifyBrokerState();
+        verifyBrokerState(messages);
     }
 
     public void testMessageCountPersistent() throws Exception
     {
         int messageCount = 10;
-        sendMessages(messageCount, true);
+        List<AMQMessage> messages = sendMessages(messageCount, true);
         assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
         long queueDepth = (messageCount * MESSAGE_SIZE);
@@ -106,20 +106,38 @@
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
 
         //Ensure that the data has been removed from the Store
-        verifyBrokerState();
+        verifyBrokerState(messages);
     }
 
     // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
-    private void verifyBrokerState()
+    private void verifyBrokerState(List<AMQMessage> messages)
     {
 
-        TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog());
+        TestableTransactionLog store = new TestableTransactionLog(_virtualHost.getTransactionLog());
 
-        // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
-        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       
-        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
-        assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
-        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+        // We can only now check messageData and ConentBodyChunks by MessageID.
+        for (AMQMessage message : messages)
+        {
+            // Check we have no message metadata for the messages we sent 
+            try
+            {
+                assertNull(store.getMessageMetaData(new StoreContext(), message.getMessageId()));
+            }
+            catch (AMQException e)
+            {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+
+            try
+            {
+                assertNull(store.getContentBodyChunk(new StoreContext(), message.getMessageId(),0));
+            }
+            catch (AMQException e)
+            {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+
+        }
     }
 
     public void testConsumerCount() throws AMQException
@@ -297,8 +315,9 @@
         ApplicationRegistry.remove(1);
     }
 
-    private void sendMessages(int messageCount, boolean persistent) throws AMQException
+    private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
     {
+        List<AMQMessage> messages = new LinkedList<AMQMessage>();
         for (int i = 0; i < messageCount; i++)
         {
             IncomingMessage currentMessage = message(false, persistent);
@@ -316,9 +335,10 @@
                                                        .convertToContentChunk(
                                                        new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
                                                                        MESSAGE_SIZE)));
-            currentMessage.deliverToQueues();
+            messages.add(currentMessage.deliverToQueues());
 
 
         }
+        return messages;
     }
 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Apr  3 18:00:24 2009
@@ -30,6 +30,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,8 +39,6 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -47,6 +46,7 @@
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.List;
 
 /**
  * Tests that acknowledgements are handled correctly.
@@ -59,7 +59,7 @@
 
     private MockProtocolSession _protocolSession;
 
-    private TestableMemoryMessageStore _messageStore;
+    private TestableTransactionLog _transactionLog;
 
     private StoreContext _storeContext = new StoreContext();
 
@@ -75,9 +75,9 @@
         ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
 
         VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
-        _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog());
-        _protocolSession = new MockProtocolSession(_messageStore);
-        _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
+        _transactionLog = new TestableTransactionLog(vhost.getTransactionLog());
+        _protocolSession = new MockProtocolSession(_transactionLog);
+        _channel = new AMQChannel(_protocolSession,5, _transactionLog /*dont need exchange registry*/);
 
         _protocolSession.addChannel(_channel);
 
@@ -95,13 +95,13 @@
         publishMessages(count, false);
     }
 
-    private void publishMessages(int count, boolean persistent) throws AMQException
+    private List<AMQMessage> publishMessages(int count, boolean persistent) throws AMQException
     {
-        TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
+        TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null,
                                                                       new LinkedList<RequiredDeliveryException>()
         );
         _queue.registerSubscription(_subscription,false);
-        MessageFactory factory = MessageFactory.getInstance();
+        List<AMQMessage> sentMessages = new LinkedList<AMQMessage>();
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -109,7 +109,7 @@
             MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false,
                                                                         false, new AMQShortString("rk"));
 
-            IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore);
+            IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _transactionLog);
             //IncomingMessage msg2 = null;
             if (persistent)
             {
@@ -130,14 +130,16 @@
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
             msg.enqueue(qs);
-            msg.routingComplete(_messageStore);
+            msg.routingComplete(_transactionLog);
             if(msg.allContentReceived())
             {
-                msg.deliverToQueues();
+               sentMessages.add(msg.deliverToQueues());
             }
             // we manually send the message to the subscription
             //_subscription.send(new QueueEntry(_queue,msg), _queue);
         }
+
+        return sentMessages;
     }
 
     /**
@@ -148,11 +150,16 @@
     {
         _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
         final int msgCount = 10;
-        publishMessages(msgCount, true);
+        List<AMQMessage> sentMessages = publishMessages(msgCount, true);
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+        for (AMQMessage message : sentMessages)
+        {
+            List<AMQQueue> enqueuedQueues = _transactionLog.getMessageReferenceMap(message.getMessageId());
+            assertNotNull("Expected message to be enqueued",enqueuedQueues);
+            assertEquals("Message is not enqueued on expected number of queues.",1, enqueuedQueues.size());
+        }
 
         Set<Long> deliveryTagSet = map.getDeliveryTags();
         int i = 1;
@@ -165,7 +172,6 @@
         }
 
         assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
     }
 
     /**
@@ -180,8 +186,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize());
+//        assertTrue(_messageStore.getContentBodyMap().size() == 0);to be
 
     }
 
@@ -197,8 +203,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize());
+//        assertTrue(_messageStore.getContentBodyMap().size() == 0);
 
     }
 

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java Fri Apr  3 18:00:24 2009
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -31,10 +32,10 @@
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.commons.configuration.PropertiesConfiguration;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -42,7 +43,7 @@
 
 public class PersistentMessageTest extends TransientMessageTest
 {
-    private TestableMemoryMessageStore _messageStore;
+    private TestableTransactionLog _transactionLog;
 
     protected SimpleAMQQueue _queue;
     protected AMQShortString _q1name = new AMQShortString("q1name");
@@ -54,22 +55,22 @@
 
     public void setUp() throws Exception
     {
-        _messageStore = new TestableMemoryMessageStore();
+        _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
 
         _storeContext = new StoreContext();
         VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration(PersistentMessageTest.class.getName(),
                                                                          new PropertiesConfiguration()),
-                                            _messageStore);
+                                            _transactionLog);
         _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null);
         // Create IncomingMessage and nondurable queue
-        _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages);
+        _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, _returnMessages);
 
     }
 
     @Override
     protected AMQMessage newMessage()
     {
-        return MessageFactory.getInstance().createMessage(_messageStore, true);
+        return MessageFactory.getInstance().createMessage(_transactionLog, true);
     }
 
     @Override
@@ -82,7 +83,7 @@
     /**
      * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and
      * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right
-     * contents. TransactionLog = Empty, returnMessages 1 item. 
+     * contents. TransactionLog = Empty, returnMessages 1 item.
      *
      * @throws Exception
      */
@@ -98,17 +99,16 @@
         // equivalent to amqChannel.routeMessage()
         msg.enqueue(qs);
 
-        msg.routingComplete(_messageStore);
+        msg.routingComplete(_transactionLog);
 
         // equivalent to amqChannel.deliverCurrentMessageIfComplete
         msg.deliverToQueues();
 
         // Check that data has been stored to disk
         long messageId = msg.getMessageId();
-        checkMessageMetaDataExists(messageId);
 
         // Check that it was not enqueued
-        List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
+        List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
         assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty());
         checkMessageMetaDataRemoved(messageId);
 
@@ -118,7 +118,7 @@
     protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
     {
         IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
-                                                  new MockProtocolSession(_messageStore), _messageStore);
+                                                  new MockProtocolSession(_transactionLog), _transactionLog);
 
         // equivalent to amqChannel.publishContenHeader
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
@@ -138,7 +138,8 @@
     {
         try
         {
-            _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+            assertNotNull("Message MetaData does not exist for message:" + messageId,
+                          _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
         }
         catch (AMQException amqe)
         {
@@ -151,8 +152,8 @@
         try
         {
             assertNull("Message MetaData still exists for message:" + messageId,
-                       _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
-            List ids = _messageStore.getMessageReferenceMap(messageId);
+                       _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
+            List ids = _transactionLog.getMessageReferenceMap(messageId);
             assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty());
 
         }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Apr  3 18:00:24 2009
@@ -35,12 +35,13 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestTransactionLog;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,7 +51,7 @@
 
     protected SimpleAMQQueue _queue;
     protected VirtualHost _virtualHost;
-    protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
+    protected TestableTransactionLog _transactionLog;
     protected AMQShortString _qname = new AMQShortString("qname");
     protected AMQShortString _owner = new AMQShortString("owner");
     protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -68,6 +69,7 @@
         //Create Application Registry for test
         ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
 
+        _transactionLog = new TestableTransactionLog(new MemoryMessageStore().configure());
         PropertiesConfiguration env = new PropertiesConfiguration();
         _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
         applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -340,7 +342,9 @@
 
         // Check that it is enqueued
         List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
-        assertNotNull(data);
+        assertNotNull("Message has no enqueued information.", data);
+        assertTrue("Message is not enqueued on correct queue.", data.contains(_queue));
+        assertEquals("Message not enqueued on the right queues.", 1, data.size());
 
         // Dequeue message
         ContentHeaderBody header = new ContentHeaderBody();
@@ -355,7 +359,7 @@
 
         // Check that it is dequeued
         data = _transactionLog.getMessageReferenceMap(messageId);
-        assertTrue(data == null || data.isEmpty());
+        assertNull("Message still has enqueue data.", data);
     }
 
     public void testMessagesFlowToDisk() throws AMQException, InterruptedException
@@ -509,7 +513,9 @@
 
         //Check message was correctly enqueued
         List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
-        assertNotNull(data);
+        assertNotNull("Message has no enqueued information.", data);
+        assertTrue("Message is not enqueued on correct queue.", data.contains(_queue));
+        assertEquals("Message not enqueued on the right queues.", 1, data.size());             
     }
 
 

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Fri Apr  3 18:00:24 2009
@@ -39,7 +39,7 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 
 public class ACLManagerTest extends TestCase
@@ -66,7 +66,7 @@
         _pluginManager = new MockPluginManager("");
         _authzManager = new ACLManager(_conf, _pluginManager);
         
-        _session = new MockProtocolSession(new TestableMemoryMessageStore());
+        _session = new MockProtocolSession(new MemoryMessageStore().configure());
     }
 
     public void tearDown() throws Exception

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java Fri Apr  3 18:00:24 2009
@@ -37,7 +37,7 @@
 import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.TestIoSession;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
@@ -81,14 +81,12 @@
         }
     }
 
-    private TestableMemoryMessageStore _store;
     private VirtualHost _virtualHost;
     private AMQMinaProtocolSession _session;
 
     @Override
     public void setUp() throws Exception
     {
-        _store = new TestableMemoryMessageStore();
         PropertiesConfiguration env = new PropertiesConfiguration();
         _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
         TestIoSession iosession = new TestIoSession();

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Apr  3 18:00:24 2009
@@ -26,28 +26,24 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 
-/**
- * Tests that reference counting works correctly with AMQMessage and the message store
- */
+/** Tests that reference counting works correctly with AMQMessage and the message store */
 public class TestReferenceCounting extends TestCase
 {
-    private TestableMemoryMessageStore _store;
+    private TestableTransactionLog _store;
 
     private StoreContext _storeContext = new StoreContext();
 
-
     protected void setUp() throws Exception
     {
         super.setUp();
-        _store = new TestableMemoryMessageStore();
+        _store = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
     }
 
-    /**
-     * Check that when the reference count is decremented the message removes itself from the store
-     */
+    /** Check that when the reference count is decremented the message removes itself from the store */
     public void testMessageGetsRemoved() throws AMQException
     {
         ContentHeaderBody chb = createPersistentContentHeader();
@@ -57,14 +53,15 @@
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
 
-        assertEquals(1, _store.getMessageMetaDataMap().size());
+        assertNotNull("Message Metadata did not exist for new message",
+                      _store.getMessageMetaData(new StoreContext(), message.getMessageId()));
     }
 
     private ContentHeaderBody createPersistentContentHeader()
     {
         ContentHeaderBody chb = new ContentHeaderBody();
         BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
-        bchp.setDeliveryMode((byte)2);
+        bchp.setDeliveryMode((byte) 2);
         chb.properties = bchp;
         return chb;
     }
@@ -77,8 +74,9 @@
         final ContentHeaderBody chb = createPersistentContentHeader();
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
-        
-        assertEquals(1, _store.getMessageMetaDataMap().size());
+
+        assertNotNull("Message Metadata did not exist for new message",
+                      _store.getMessageMetaData(new StoreContext(), message.getMessageId()));
     }
 
     public static junit.framework.Test suite()

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java Fri Apr  3 18:00:24 2009
@@ -21,12 +21,23 @@
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
 
 import java.util.Map;
 import java.util.List;
 
 public interface TestTransactionLog extends TransactionLog
 {
+    public void setBaseTransactionLog(BaseTransactionLog base);
+
     public List<AMQQueue> getMessageReferenceMap(Long messageID);
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+    public long getMessageMetaDataSize();
+    public TransactionLog getDelegate();
 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Apr  3 18:00:24 2009
@@ -20,192 +20,80 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.routing.RoutingTable;
 import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
 
 /** Adds some extra methods to the memory message store for testing purposes. */
-public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
+public class TestableMemoryMessageStore extends MemoryMessageStore implements TestTransactionLog
 {
-    private TransactionLog _transactionLog;
-    private RoutingTable _routingTable;
-    private MemoryMessageStore _mms;
+    private TestableTransactionLog _base;
 
-    public TestableMemoryMessageStore(TransactionLog log)
+    public void setBaseTransactionLog(BaseTransactionLog base)
     {
-        _transactionLog = log;
-        if (log instanceof BaseTransactionLog)
+        if (!(base instanceof TestableTransactionLog))
         {
-            TransactionLog delegate = ((BaseTransactionLog) log).getDelegate();
-            if (delegate instanceof RoutingTable)
-            {
-                _routingTable = (RoutingTable) delegate;
-            }
-            else
-            {
-                throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log);
-            }
-
-            if (delegate instanceof MemoryMessageStore)
-            {
-                _mms = (MemoryMessageStore) delegate;
-            }
-
-        }
-        else
-        {
-            throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log);
+            throw new RuntimeException("base must be a TestableTransactionLog for correct operation in a TestMemoryMessageStore");
         }
 
+        _base = (TestableTransactionLog) base;
     }
 
-    public TestableMemoryMessageStore(MemoryMessageStore mms)
-    {
-        _routingTable = mms;
-        _transactionLog = mms.configure();
-    }
-
-    public TestableMemoryMessageStore()
-    {
-        _mms = new MemoryMessageStore();
-        _transactionLog = _mms.configure();
-        _routingTable = _mms;        
-    }
-
-    public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
-    {
-        return ((MemoryMessageStore) _routingTable)._metaDataMap;
-    }
-
-    public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
-    {
-        return ((MemoryMessageStore) _routingTable)._contentBodyMap;
-    }
-
-    public List<AMQQueue> getMessageReferenceMap(Long messageId)
-    {
-//        return _mms._messageEnqueueMap.get(messageId);
-//        ((BaseTransactionLog)_transactionLog).
-        return new ArrayList<AMQQueue>();
-    }
-
-    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    @Override
+    public TransactionLog configure()
     {
-        _transactionLog  = (TransactionLog) _transactionLog.configure(virtualHost, base, config);
-        return _transactionLog;
-    }
+        BaseTransactionLog base = (BaseTransactionLog) super.configure();
 
-    public void close() throws Exception
-    {
-        _transactionLog.close();
-        _routingTable.close();
-    }
+        _base = new TestableTransactionLog(base.getDelegate());
 
-    public void createExchange(Exchange exchange) throws AMQException
-    {
-        _routingTable.createExchange(exchange);
+        return _base;
     }
 
-    public void removeExchange(Exchange exchange) throws AMQException
+    @Override
+    public TransactionLog configure(String base, VirtualHostConfiguration config)
     {
-        _routingTable.removeExchange(exchange);
-    }
-
-    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
-    {
-        _routingTable.bindQueue(exchange, routingKey, queue, args);
-    }
-
-    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
-    {
-        _routingTable.unbindQueue(exchange, routingKey, queue, args);
-    }
-
-    public void createQueue(AMQQueue queue) throws AMQException
-    {
-        _routingTable.createQueue(queue);
-    }
-
-    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
-    {
-        _routingTable.createQueue(queue, arguments);
-    }
-
-    public void removeQueue(AMQQueue queue) throws AMQException
-    {
-        _routingTable.removeQueue(queue);
-    }
-
-    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
-    {
-        _transactionLog.enqueueMessage(context, queues, messageId);
-    }
-
-    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
-    {
-        _transactionLog.dequeueMessage(context, queue, messageId);
-    }
-
-    public void removeMessage(StoreContext context, Long messageId) throws AMQException
-    {
-        _transactionLog.removeMessage(context, messageId);
-    }
-
-    public void beginTran(StoreContext context) throws AMQException
-    {
-        _transactionLog.beginTran(context);
-    }
+        //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
+        if (base.equals("store"))
+        {
+            super.configure();
 
-    public void commitTran(StoreContext context) throws AMQException
-    {
-        _transactionLog.commitTran(context);
-    }
+            _base = new TestableTransactionLog(this);
 
-    public void abortTran(StoreContext context) throws AMQException
-    {
-        _transactionLog.abortTran(context);
-    }
+            return _base;
+        }
 
-    public boolean inTran(StoreContext context)
-    {
-        return _transactionLog.inTran(context);
+        return super.configure();
     }
 
-    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    public List<AMQQueue> getMessageReferenceMap(Long messageId)
     {
-        _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+        return _base.getMessageReferenceMap(messageId);
     }
 
-    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId)
     {
-        _transactionLog.storeMessageMetaData(context, messageId, messageMetaData);
+        return _metaDataMap.get(messageId);
     }
 
-    public boolean isPersistent()
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index)
     {
-        return _transactionLog.isPersistent();
+        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
+        return bodyList.get(index);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    public long getMessageMetaDataSize()
     {
-        return _mms.getMessageMetaData(context, messageId);
+        return _metaDataMap.size();
     }
 
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    public TransactionLog getDelegate()
     {
-        return _mms.getContentBodyChunk(context, messageId, index);
+        return _base;
     }
 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Fri Apr  3 18:00:24 2009
@@ -33,6 +33,7 @@
 import org.apache.qpid.server.queue.MockContentChunk;
 import org.apache.qpid.server.queue.MockPersistentAMQMessage;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
@@ -47,14 +48,14 @@
     final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
     final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
 
-    TestableTransactionLog _transactionLog;
+    TestTransactionLog _transactionLog;
     private ArrayList<AMQQueue> _queues;
     private MockPersistentAMQMessage _message;
 
     public void setUp() throws Exception
     {
         super.setUp();
-        _transactionLog = new TestableTransactionLog(this);
+        _transactionLog = new TestableBaseTransactionLog(this);
     }
 
     public void testSingleEnqueueNoTransactional() throws AMQException

Added: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java?rev=761742&view=auto
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java (added)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java Fri Apr  3 18:00:24 2009
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.List;
+
+public class TestableBaseTransactionLog extends BaseTransactionLog implements TestTransactionLog
+{
+
+    public TestableBaseTransactionLog()
+    {
+        super(null);
+    }
+
+    public TestableBaseTransactionLog(TransactionLog delegate)
+    {
+        super(delegate);
+        if (delegate instanceof BaseTransactionLog)
+        {
+            _delegate = ((BaseTransactionLog) delegate).getDelegate();
+        }
+
+    }
+
+    @Override
+    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    {
+        if (_delegate != null)
+        {
+            TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config);
+
+            // Unwrap any BaseTransactionLog
+            if (configuredLog instanceof BaseTransactionLog)
+            {
+                _delegate = ((BaseTransactionLog) configuredLog).getDelegate();
+            }
+        }
+        else
+        {
+            String delegateClass = config.getStoreConfiguration().getString("delegate");
+            Class clazz = Class.forName(delegateClass);
+            Object o = clazz.newInstance();
+
+            if (!(o instanceof TransactionLog))
+            {
+                throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
+                                             " does not.");
+            }
+            _delegate = (TransactionLog) o;
+
+            // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+            _delegate.configure(virtualHost, base, config);
+        }
+        return this;
+    }
+
+    public void setBaseTransactionLog(BaseTransactionLog base)
+    {
+        throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs");
+    }
+
+    public List<AMQQueue> getMessageReferenceMap(Long messageID)
+    {
+        return _idToQueues.get(messageID);
+    }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public long getMessageMetaDataSize()
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaDataSize();
+        }
+        else
+        {
+            return 0;
+        }
+    }
+}

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java Fri Apr  3 18:00:24 2009
@@ -20,52 +20,95 @@
  */
 package org.apache.qpid.server.transactionlog;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.LinkedList;
+import java.util.Map;
 
 public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog
 {
-
-    List<Long> _singleEnqueues = new LinkedList<Long>();
+    protected Map<Long, List<AMQQueue>> _singleEnqueuedIDstoQueue = new HashMap<Long, List<AMQQueue>>();
 
     public TestableTransactionLog()
     {
         super(null);
     }
 
-    public TestableTransactionLog(BaseTransactionLog delegate)
+    public TestableTransactionLog(TransactionLog delegate)
     {
-        super(delegate.getDelegate());
+        super(delegate);
+        if (delegate instanceof BaseTransactionLog)
+        {
+            _delegate = ((BaseTransactionLog) delegate).getDelegate();
+        }
+
     }
 
-    public TestableTransactionLog(TransactionLog delegate)
+    /**
+     * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting
+     *
+     * @param context   The transactional context for the operation.
+     * @param queues
+     * @param messageId The message to enqueue.  @throws AMQException If the operation fails for any reason.  @throws org.apache.qpid.AMQException
+     *
+     * @throws AMQException
+     */
+    @Override
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
-        super(delegate);
+        if (queues.size() == 1)
+        {
+            _singleEnqueuedIDstoQueue.put(messageId, queues);
+        }
+
+        super.enqueueMessage(context, queues, messageId);
     }
 
+    /**
+     * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting
+     *
+     * @param context   The transactional context for the operation.
+     * @param queue
+     * @param messageId The message to enqueue.  @throws AMQException If the operation fails for any reason.  @throws org.apache.qpid.AMQException
+     *
+     * @throws AMQException
+     */
+    @Override
+    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    {
+        if (_singleEnqueuedIDstoQueue.containsKey(messageId))
+        {
+            _singleEnqueuedIDstoQueue.remove(messageId);
+        }
+
+        super.dequeueMessage(context, queue, messageId);
+    }
 
     @Override
     public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
         if (_delegate != null)
         {
-            TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config);
+            TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config);
 
             // Unwrap any BaseTransactionLog
             if (configuredLog instanceof BaseTransactionLog)
             {
-                _delegate = ((BaseTransactionLog)configuredLog).getDelegate();
+                _delegate = ((BaseTransactionLog) configuredLog).getDelegate();
             }
         }
         else
         {
-                String delegateClass = config.getStoreConfiguration().getString("delegate");
+            String delegateClass = config.getStoreConfiguration().getString("delegate");
             Class clazz = Class.forName(delegateClass);
             Object o = clazz.newInstance();
 
@@ -77,13 +120,61 @@
             _delegate = (TransactionLog) o;
 
             // If a TransactionLog uses the BaseTransactionLog then it will return this object.
-             _delegate.configure(virtualHost, base, config);
+            _delegate.configure(virtualHost, base, config);
         }
         return this;
     }
 
+    public void setBaseTransactionLog(BaseTransactionLog base)
+    {
+        throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs");
+    }
+
     public List<AMQQueue> getMessageReferenceMap(Long messageID)
     {
-        return _idToQueues.get(messageID);
+        List<AMQQueue> result = _idToQueues.get(messageID);
+
+        if (result == null)
+        {
+            result = _singleEnqueuedIDstoQueue.get(messageID);
+        }
+
+        return result;
+    }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public long getMessageMetaDataSize()
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaDataSize();
+        }
+        else
+        {
+            return 0;
+        }
     }
 }

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Fri Apr  3 18:00:24 2009
@@ -34,8 +34,10 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -62,7 +64,11 @@
     {
         super.setUp();
         PropertiesConfiguration configuration = new PropertiesConfiguration();
-        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+        // This configuration is not used as TestApplicationRegistry just creates a single vhost 'test' with
+        // TransactionLog TestableTransactionLog(TestMemoryMessageStore)
+        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableTransactionLog.class.getName());
+        configuration.setProperty("virtualhosts.virtualhost.test.store.delegate", TestableMemoryMessageStore.class.getName());
+        
         _registry = new TestApplicationRegistry(new ServerConfiguration(configuration));
         ApplicationRegistry.initialise(_registry);
         _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");        
@@ -96,7 +102,7 @@
 
     protected void checkStoreContents(int messageCount)
     {
-        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size());
+        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestTransactionLog) _transactionLog).getMessageMetaDataSize());
 
         //The above publish message is sufficiently small not to fit in the header so no Body is required.
         //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());

Modified: qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=761742&r1=761741&r2=761742&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Fri Apr  3 18:00:24 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.util;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -38,9 +37,9 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Properties;
 import java.util.Arrays;
 
@@ -83,7 +82,7 @@
 
         _managedObjectRegistry = new NoopManagedObjectRegistry();
 
-        _transactionLog = new TestableMemoryMessageStore();
+        _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
 
         _virtualHostRegistry = new VirtualHostRegistry();
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org