You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC

svn commit: r829675 [9/11] - in /qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/bin/ broker/src/main/java/org/apac...

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Sun Oct 25 22:58:57 2009
@@ -25,7 +25,6 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
 
 public class AMQQueueFactoryTest extends TestCase
 {
@@ -55,31 +54,18 @@
         FieldTable fieldTable = new FieldTable();
         fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
 
-        try
-        {
-            AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
-                                               _virtualHost, fieldTable);
-
-            assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());            
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
+                                           _virtualHost, fieldTable);
+
+        assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
     }
 
 
     public void testSimpleQueueRegistration()
     {
-        try
-        {
-            AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
-                                               _virtualHost, null);
-            assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+                                           _virtualHost, null);
+        assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Sun Oct 25 22:58:57 2009
@@ -29,7 +29,10 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactory;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,19 +41,15 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.commons.configuration.PropertiesConfiguration;
 
 import javax.management.JMException;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
 
 /**
  * Test class to test AMQQueueMBean attribtues and operations
@@ -61,8 +60,6 @@
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
     private MessageStore _messageStore;
-    private StoreContext _storeContext = new StoreContext();
-    private TransactionalContext _transactionalContext;
     private VirtualHost _virtualHost;
     private AMQProtocolSession _protocolSession;
     private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
@@ -108,7 +105,7 @@
         //Ensure that the data has been removed from the Store
         verifyBrokerState();
     }
-    
+
     public void testDeleteMessages() throws Exception
     {
         int messageCount = 10;
@@ -129,9 +126,9 @@
         }
         catch(Exception e)
         {
-            
+
         }
-        
+
         //delete last message, leaving 2nd to 9th
         _queueMBean.deleteMessages(10L,10L);
         assertTrue(_queueMBean.getMessageCount() == (messageCount - 2));
@@ -143,7 +140,7 @@
         }
         catch(Exception e)
         {
-            
+
         }
 
         //delete remaining messages, leaving none
@@ -159,18 +156,16 @@
     private void verifyBrokerState()
     {
 
-        TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+        TestableMemoryMessageStore store = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
 
         // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
-        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       
-        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
-        assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
-        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+
+        assertEquals("Store should have no messages:" + store.getMessageCount(), 0, store.getMessageCount());
     }
 
     public void testConsumerCount() throws AMQException
     {
-        
+
         assertTrue(_queue.getActiveConsumerCount() == 0);
         assertTrue(_queueMBean.getActiveConsumerCount() == 0);
 
@@ -182,7 +177,7 @@
 
         Subscription subscription =
                 SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager());
-        
+
         _queue.registerSubscription(subscription, false);
         assertEquals(1,(int)_queueMBean.getActiveConsumerCount());
 
@@ -225,7 +220,6 @@
         assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth));
 
         assertTrue(_queueMBean.getName().equals("testQueue"));
-        assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
         assertFalse(_queueMBean.isAutoDelete());
         assertFalse(_queueMBean.isDurable());
     }
@@ -261,7 +255,7 @@
         {
 
         }
-        
+
         try
         {
             long end = Integer.MAX_VALUE;
@@ -275,17 +269,22 @@
         }
 
         IncomingMessage msg = message(false, false);
-        long id = msg.getMessageId();
-        _queue.clearQueue(_storeContext);
+        _queue.clearQueue();
         ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
         qs.add(_queue);
         msg.enqueue(qs);
-        msg.routingComplete(_messageStore, new MessageHandleFactory());
+        MessageMetaData mmd = msg.headersReceived();
+        msg.setStoredMessage(_messageStore.addMessage(mmd));
+        long id = msg.getMessageNumber();
 
         msg.addContentBodyFrame(new ContentChunk()
         {
             ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
 
+            {
+                _data.limit((int)MESSAGE_SIZE);
+            }
+
             public int getSize()
             {
                 return (int) MESSAGE_SIZE;
@@ -301,7 +300,12 @@
 
             }
         });
-        msg.deliverToQueues();
+
+        AMQMessage m = new AMQMessage(msg.getStoredMessage());
+        for(AMQQueue q : msg.getDestinationQueues())
+        {
+            q.enqueue(m);
+        }
 //        _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
         _queueMBean.viewMessageContent(id);
         try
@@ -350,7 +354,7 @@
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
         contentHeaderBody.properties = new BasicContentHeaderProperties();
         ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
-        IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext,  _protocolSession);
+        IncomingMessage msg = new IncomingMessage(publish);
         msg.setContentHeaderBody(contentHeaderBody);
         return msg;
 
@@ -360,15 +364,17 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+        IApplicationRegistry  applicationRegistry  = new TestApplicationRegistry(new ServerConfiguration(configuration));
+        ApplicationRegistry.initialise(applicationRegistry );
+
+        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+
         _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
         _messageStore = _virtualHost.getMessageStore();
 
-        _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                            null,
-                                                            new LinkedList<RequiredDeliveryException>()
-        );
-
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost,
                                                     null);
         _queueMBean = new AMQQueueMBean(_queue);
@@ -391,7 +397,8 @@
             currentMessage.enqueue(qs);
 
             // route header
-            currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+            MessageMetaData mmd = currentMessage.headersReceived();
+            currentMessage.setStoredMessage(_messageStore.addMessage(mmd));
 
             // Add the body so we have somthing to test later
             currentMessage.addContentBodyFrame(
@@ -400,7 +407,12 @@
                                                        .convertToContentChunk(
                                                        new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
                                                                        MESSAGE_SIZE)));
-            currentMessage.deliverToQueues();
+
+            AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
+            for(AMQQueue q : currentMessage.getDestinationQueues())
+            {
+                q.enqueue(m);
+            }
 
 
         }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Sun Oct 25 22:58:57 2009
@@ -28,7 +28,10 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,15 +42,9 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.Set;
-import java.util.Collections;
 
 /**
  * Tests that acknowledgements are handled correctly.
@@ -62,8 +59,6 @@
 
     private TestMemoryMessageStore _messageStore;
 
-    private StoreContext _storeContext = new StoreContext();
-
     private AMQChannel _channel;
 
     private AMQQueue _queue;
@@ -99,11 +94,7 @@
 
     private void publishMessages(int count, boolean persistent) throws AMQException
     {
-        TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
-                                                                      new LinkedList<RequiredDeliveryException>()
-        );
         _queue.registerSubscription(_subscription,false);
-        MessageHandleFactory factory = new MessageHandleFactory();
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -136,31 +127,50 @@
                     return new AMQShortString("rk");
                 }
             };
-            IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+            final IncomingMessage msg = new IncomingMessage(publishBody);
             //IncomingMessage msg2 = null;
+            BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+            ContentHeaderBody cb = new ContentHeaderBody();
+            cb.properties = b;
+
             if (persistent)
             {
-                BasicContentHeaderProperties b = new BasicContentHeaderProperties();
                 //This is DeliveryMode.PERSISTENT
                 b.setDeliveryMode((byte) 2);
-                ContentHeaderBody cb = new ContentHeaderBody();
-                cb.properties = b;
-                msg.setContentHeaderBody(cb);
-            }
-            else
-            {
-                msg.setContentHeaderBody(new ContentHeaderBody());
             }
+
+            msg.setContentHeaderBody(cb);
+
             // we increment the reference here since we are not delivering the messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to construct if we have direct access to the
             // subscription
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
             msg.enqueue(qs);
-            msg.routingComplete(_messageStore, factory);
+            MessageMetaData mmd = msg.headersReceived();
+            msg.setStoredMessage(_messageStore.addMessage(mmd));
             if(msg.allContentReceived())
             {
-                msg.deliverToQueues();
+                ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+                txn.enqueue(_queue, msg, new ServerTransaction.Action() {
+                    public void postCommit()
+                    {
+                        try
+                        {
+                            _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
+                        }
+                        catch (AMQException e)
+                        {
+                             throw new RuntimeException(e);
+                        }
+                    }
+
+                    public void onRollback()
+                    {
+                        //To change body of implemented methods use File | Settings | File Templates.
+                    }
+                });
+
             }
             // we manually send the message to the subscription
             //_subscription.send(new QueueEntry(_queue,msg), _queue);
@@ -178,8 +188,7 @@
         publishMessages(msgCount, true);
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
-        assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+        assertEquals("",msgCount,map.size());
 
         Set<Long> deliveryTagSet = map.getDeliveryTags();
         int i = 1;
@@ -191,8 +200,6 @@
             assertTrue(unackedMsg.getQueue() == _queue);
         }
 
-        assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
     }
 
     /**
@@ -207,8 +214,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertTrue(_messageStore.getMessageCount() == 0);
+
 
     }
 
@@ -224,8 +231,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertTrue(_messageStore.getMessageCount() == 0);
+
 
     }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Sun Oct 25 22:58:57 2009
@@ -20,25 +20,18 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.AMQMessage;
 
 public class MockAMQMessage extends AMQMessage
 {
     public MockAMQMessage(long messageId)
             throws AMQException
     {
-       super(new MockAMQMessageHandle(messageId) ,
-                (StoreContext)null,
-                (MessagePublishInfo)new MockMessagePublishInfo());
+       super(new MockStoredMessage(messageId));
     }
 
-    protected MockAMQMessage(AMQMessage msg)
-            throws AMQException
-    {
-        super(msg);
-    }
+
 
 
     @Override
@@ -46,4 +39,5 @@
     {
         return 0l;
     }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Sun Oct 25 22:58:57 2009
@@ -23,23 +23,19 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
 
 import java.util.List;
 import java.util.Set;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
 
 public class MockAMQQueue implements AMQQueue
 {
@@ -47,6 +43,10 @@
     private AMQShortString _name;
     private VirtualHost _virtualhost;
 
+    private PrincipalHolder _principalHolder;
+
+    private Object _exclusiveOwner;
+
     public MockAMQQueue(String name)
     {
        _name = new AMQShortString(name);
@@ -57,6 +57,11 @@
         return _name;
     }
 
+    public void setNoLocal(boolean b)
+    {
+        
+    }
+
     public boolean isDurable()
     {
         return false;  //To change body of implemented methods use File | Settings | File Templates.
@@ -162,17 +167,22 @@
        return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+    public QueueEntry enqueue(ServerMessage message) throws AMQException
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+    public void requeue(QueueEntry entry)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
+    public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void dequeue(QueueEntry entry)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -211,23 +221,23 @@
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
-    
+
     public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
     {
         return null;
     }
 
-    public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+    public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+    public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+    public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -286,16 +296,16 @@
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+    public void deleteMessageFromTop()
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public long clearQueue(StoreContext storeContext) throws AMQException
+    public long clearQueue()
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
-   
+
 
     public void checkMessageStatus() throws AMQException
     {
@@ -327,8 +337,28 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public boolean isExclusive()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Exchange getAlternateExchange()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setAlternateExchange(Exchange exchange)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Map<String, Object> getArguments()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void checkCapacity(AMQChannel channel)
-    {               
+    {
     }
 
     public ManagedObject getManagedObject()
@@ -343,7 +373,7 @@
 
     public void setMinimumAlertRepeatGap(long value)
     {
-        
+
     }
 
     public long getCapacity()
@@ -368,7 +398,32 @@
 
     public void configure(QueueConfiguration config)
     {
-        
+
+    }
+
+    public PrincipalHolder getPrincipalHolder()
+    {
+        return _principalHolder;
     }
 
+    public void setPrincipalHolder(PrincipalHolder principalHolder)
+    {
+        _principalHolder = principalHolder;
+    }
+
+    public Object getExclusiveOwner()
+    {
+        return _exclusiveOwner;
+    }
+
+    public void setExclusiveOwner(Object exclusiveOwner)
+    {
+        _exclusiveOwner = exclusiveOwner;
+    }
+
+
+    public String getResourceName()
+    {
+        return _name.toString();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Sun Oct 25 22:58:57 2009
@@ -21,8 +21,9 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
 
 public class MockQueueEntry implements QueueEntry
 {
@@ -44,14 +45,14 @@
         return false;
     }
 
-    public void addStateChangeListener(StateChangeListener listener)
+    public boolean isAcquiredBy(Subscription subscription)
     {
-
+        return false;
     }
 
-    public String debugIdentity()
+    public void addStateChangeListener(StateChangeListener listener)
     {
-        return null;
+
     }
 
     public boolean delete()
@@ -59,17 +60,22 @@
         return false;
     }
 
-    public void dequeue(StoreContext storeContext) throws FailedDequeueException
+    public void dequeue()
+    {
+
+    }
+
+    public void discard()
     {
 
     }
 
-    public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+    public void routeToAlternate()
     {
 
     }
 
-    public void dispose(StoreContext storeContext) throws MessageCleanupException
+    public void dispose()
     {
 
     }
@@ -119,70 +125,95 @@
         return false;
     }
 
-    
+
     public boolean isQueueDeleted()
     {
 
         return false;
     }
 
-    
+
     public boolean isRejectedBy(Subscription subscription)
     {
 
         return false;
     }
 
-    
+
     public void reject()
     {
 
 
     }
 
-    
+
     public void reject(Subscription subscription)
     {
 
 
     }
 
-    
+
     public void release()
     {
 
 
     }
 
-    
+    public boolean releaseButRetain()
+    {
+        return false;
+    }
+
+
     public boolean removeStateChangeListener(StateChangeListener listener)
     {
 
         return false;
     }
 
-    
-    public void requeue(StoreContext storeContext) throws AMQException
+
+    public void requeue()
     {
 
 
     }
 
-    
+    public void requeue(Subscription subscription)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
     public void setDeliveredToSubscription()
     {
 
 
     }
 
-    
-    public void setRedelivered(boolean b)
+
+    public void setRedelivered()
+    {
+
+
+    }
+
+    public AMQMessageHeader getMessageHeader()
     {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
 
+    public boolean isPersistent()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
 
+    public boolean isRedelivered()
+    {
+        return false;
     }
 
-    
+
     public int compareTo(QueueEntry o)
     {
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Sun Oct 25 22:58:57 2009
@@ -1,6 +1,6 @@
 package org.apache.qpid.server.queue;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -31,21 +31,21 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 
 public class SimpleAMQQueueTest extends TestCase
 {
@@ -59,7 +59,7 @@
     protected DirectExchange _exchange = new DirectExchange();
     protected MockSubscription _subscription = new MockSubscription();
     protected FieldTable _arguments = null;
-    
+
     MessagePublishInfo info = new MessagePublishInfo()
     {
 
@@ -88,7 +88,7 @@
             return null;
         }
     };
-    
+
     @Override
     protected void setUp() throws Exception
     {
@@ -97,7 +97,7 @@
         ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance();
 
         PropertiesConfiguration env = new PropertiesConfiguration();
-        _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+        _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store);
         applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
 
         _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -119,51 +119,51 @@
         }
         catch (IllegalArgumentException e)
         {
-            assertTrue("Exception was not about missing name", 
+            assertTrue("Exception was not about missing name",
                             e.getMessage().contains("name"));
         }
-        
+
         try {
             _queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
             assertNull("Queue was created", _queue);
         }
         catch (IllegalArgumentException e)
         {
-            assertTrue("Exception was not about missing vhost", 
+            assertTrue("Exception was not about missing vhost",
                     e.getMessage().contains("Host"));
         }
 
-        _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, 
+        _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
                                                                 _virtualHost, _arguments);
         assertNotNull("Queue was not created", _queue);
     }
-    
+
     public void testGetVirtualHost()
     {
         assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
     }
-    
+
     public void testBinding()
     {
         try
         {
             _queue.bind(_exchange, _routingKey, null);
-            assertTrue("Routing key was not bound", 
+            assertTrue("Routing key was not bound",
                             _exchange.getBindings().containsKey(_routingKey));
-            assertEquals("Queue was not bound to key", 
+            assertEquals("Queue was not bound to key",
                         _exchange.getBindings().get(_routingKey).get(0),
                         _queue);
-            assertEquals("Exchange binding count", 1, 
+            assertEquals("Exchange binding count", 1,
                     _queue.getExchangeBindings().size());
-            assertEquals("Wrong exchange bound", _routingKey, 
+            assertEquals("Wrong exchange bound", _routingKey,
                     _queue.getExchangeBindings().get(0).getRoutingKey());
-            assertEquals("Wrong exchange bound", _exchange, 
+            assertEquals("Wrong exchange bound", _exchange,
                     _queue.getExchangeBindings().get(0).getExchange());
-            
+
             _queue.unBind(_exchange, _routingKey, null);
-            assertFalse("Routing key was still bound", 
+            assertFalse("Routing key was still bound",
                     _exchange.getBindings().containsKey(_routingKey));
-            assertNull("Routing key was not empty", 
+            assertNull("Routing key was not empty",
                     _exchange.getBindings().get(_routingKey));
         }
         catch (AMQException e)
@@ -171,61 +171,61 @@
             assertNull("Unexpected exception", e);
         }
     }
-    
+
     public void testSubscription() throws AMQException
     {
         // Check adding a subscription adds it to the queue
         _queue.registerSubscription(_subscription, false);
-        assertEquals("Subscription did not get queue", _queue, 
+        assertEquals("Subscription did not get queue", _queue,
                       _subscription.getQueue());
-        assertEquals("Queue does not have consumer", 1, 
+        assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
-        assertEquals("Queue does not have active consumer", 1, 
+        assertEquals("Queue does not have active consumer", 1,
                 _queue.getActiveConsumerCount());
-        
+
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
-        _queue.enqueue(null, messageA);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-        
+        _queue.enqueue(messageA);
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+
         // Check removing the subscription removes it's information from the queue
         _queue.unregisterSubscription(_subscription);
         assertTrue("Subscription still had queue", _subscription.isClosed());
         assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
-        assertFalse("Queue still has active consumer", 
+        assertFalse("Queue still has active consumer",
                 1 == _queue.getActiveConsumerCount());
-        
+
         AMQMessage messageB = createMessage(new Long (25));
-        _queue.enqueue(null, messageB);
-        QueueEntry entry = _subscription.getLastSeenEntry();
-        assertNull(entry);
+        _queue.enqueue(messageB);
+         assertNull(_subscription.getQueueContext());
+
     }
-    
+
     public void testQueueNoSubscriber() throws AMQException, InterruptedException
     {
         AMQMessage messageA = createMessage(new Long(24));
-        _queue.enqueue(null, messageA);
+        _queue.enqueue(messageA);
         _queue.registerSubscription(_subscription, false);
         Thread.sleep(150);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
     }
 
     public void testExclusiveConsumer() throws AMQException
     {
         // Check adding an exclusive subscription adds it to the queue
         _queue.registerSubscription(_subscription, true);
-        assertEquals("Subscription did not get queue", _queue, 
+        assertEquals("Subscription did not get queue", _queue,
                 _subscription.getQueue());
-        assertEquals("Queue does not have consumer", 1, 
+        assertEquals("Queue does not have consumer", 1,
                 _queue.getConsumerCount());
-        assertEquals("Queue does not have active consumer", 1, 
+        assertEquals("Queue does not have active consumer", 1,
                 _queue.getActiveConsumerCount());
 
         // Check sending a message ends up with the subscriber
         AMQMessage messageA = createMessage(new Long(24));
-        _queue.enqueue(null, messageA);
-        assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-        
+        _queue.enqueue(messageA);
+        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+
         // Check we cannot add a second subscriber to the queue
         Subscription subB = new MockSubscription();
         Exception ex = null;
@@ -235,12 +235,12 @@
         }
         catch (AMQException e)
         {
-           ex = e; 
+           ex = e;
         }
         assertNotNull(ex);
         assertTrue(ex instanceof AMQException);
 
-        // Check we cannot add an exclusive subscriber to a queue with an 
+        // Check we cannot add an exclusive subscriber to a queue with an
         // existing subscription
         _queue.unregisterSubscription(_subscription);
         _queue.registerSubscription(_subscription, false);
@@ -250,35 +250,35 @@
         }
         catch (AMQException e)
         {
-           ex = e; 
+           ex = e;
         }
         assertNotNull(ex);
     }
-    
-    public void testAutoDeleteQueue() throws Exception 
+
+    public void testAutoDeleteQueue() throws Exception
     {
        _queue.stop();
-       _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+       _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost);
        _queue.registerSubscription(_subscription, false);
        AMQMessage message = createMessage(new Long(25));
-       _queue.enqueue(null, message);
+       _queue.enqueue(message);
        _queue.unregisterSubscription(_subscription);
        assertTrue("Queue was not deleted when subscription was removed",
                   _queue.isDeleted());
     }
-    
+
     public void testResend() throws Exception
     {
         _queue.registerSubscription(_subscription, false);
         Long id = new Long(26);
         AMQMessage message = createMessage(id);
-        _queue.enqueue(null, message);
-        QueueEntry entry = _subscription.getLastSeenEntry();
-        entry.setRedelivered(true);
+        _queue.enqueue(message);
+        QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
+        entry.setRedelivered();
         _queue.resend(entry, _subscription);
-        
+
     }
-    
+
     public void testGetFirstMessageId() throws Exception
     {
         // Create message
@@ -286,7 +286,7 @@
         AMQMessage message = createMessage(messageId);
 
         // Put message on queue
-        _queue.enqueue(null, message);
+        _queue.enqueue(message);
         // Get message id
         Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
 
@@ -302,7 +302,7 @@
             Long messageId = new Long(i);
             AMQMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(null, message);
+            _queue.enqueue(message);
         }
         // Get message ids
         List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -323,7 +323,7 @@
             Long messageId = new Long(i);
             AMQMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(null, message);
+            _queue.enqueue(message);
         }
         // Get message ids
         List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -335,7 +335,7 @@
             assertEquals("Message ID was wrong", messageId, msgids.get(i));
         }
     }
-    
+
     public void testGetMessagesRangeOnTheQueue() throws Exception
     {
         for (int i = 1 ; i <= 10; i++)
@@ -344,142 +344,138 @@
             Long messageId = new Long(i);
             AMQMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(null, message);
+            _queue.enqueue(message);
         }
-        
+
         // Get non-existent 0th QueueEntry & check returned list was empty
         // (the position parameters in this method are indexed from 1)
         List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
         assertTrue(entries.size() == 0);
-        
+
         // Check that when 'from' is 0 it is ignored and the range continues from 1
         entries = _queue.getMessagesRangeOnTheQueue(0, 2);
         assertTrue(entries.size() == 2);
-        long msgID = entries.get(0).getMessage().getMessageId();
+        long msgID = entries.get(0).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 1L);
-        msgID = entries.get(1).getMessage().getMessageId();
+        msgID = entries.get(1).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 2L);
 
         // Check that when 'from' is greater than 'to' the returned list is empty
         entries = _queue.getMessagesRangeOnTheQueue(5, 4);
         assertTrue(entries.size() == 0);
-        
-        // Get first QueueEntry & check id 
+
+        // Get first QueueEntry & check id
         entries = _queue.getMessagesRangeOnTheQueue(1, 1);
         assertTrue(entries.size() == 1);
-        msgID = entries.get(0).getMessage().getMessageId();
+        msgID = entries.get(0).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 1L);
-        
+
         // Get 5th,6th,7th entries and check id's
         entries = _queue.getMessagesRangeOnTheQueue(5, 7);
         assertTrue(entries.size() == 3);
-        msgID = entries.get(0).getMessage().getMessageId();
+        msgID = entries.get(0).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 5L);
-        msgID = entries.get(1).getMessage().getMessageId();
+        msgID = entries.get(1).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 6L);
-        msgID = entries.get(2).getMessage().getMessageId();
+        msgID = entries.get(2).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 7L);
-        
+
         // Get 10th QueueEntry & check id
         entries = _queue.getMessagesRangeOnTheQueue(10, 10);
         assertTrue(entries.size() == 1);
-        msgID = entries.get(0).getMessage().getMessageId();
+        msgID = entries.get(0).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 10L);
-        
+
         // Get non-existent 11th QueueEntry & check returned set was empty
         entries = _queue.getMessagesRangeOnTheQueue(11, 11);
         assertTrue(entries.size() == 0);
-        
+
         // Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs
         entries = _queue.getMessagesRangeOnTheQueue(9, 11);
         assertTrue(entries.size() == 2);
-        msgID = entries.get(0).getMessage().getMessageId();
+        msgID = entries.get(0).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 9L);
-        msgID = entries.get(1).getMessage().getMessageId();
+        msgID = entries.get(1).getMessage().getMessageNumber();
         assertEquals("Message ID was wrong", msgID, 10L);
     }
-  
+
     public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
     {
         // Create IncomingMessage and nondurable queue
-        NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
-        IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+        final IncomingMessage msg = new IncomingMessage(info);
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.properties = new BasicContentHeaderProperties();
         ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
         msg.setContentHeaderBody(contentHeaderBody);
-        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
-        
+
+        final ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
         // Send persistent message
+
         qs.add(_queue);
-        msg.enqueue(qs);
-        msg.routingComplete(_store, new MessageHandleFactory());
-        _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
-        
+        MessageMetaData metaData = msg.headersReceived();
+        StoredMessage handle = _store.addMessage(metaData);
+        msg.setStoredMessage(handle);
+
+
+        ServerTransaction txn = new AutoCommitTransaction(_store);
+
+        txn.enqueue(qs, msg, new ServerTransaction.Action()
+                                    {
+                                        public void postCommit()
+                                        {
+                                            msg.enqueue(qs);
+                                        }
+
+                                        public void onRollback()
+                                        {
+                                        }
+                                    });
+
+
+
         // Check that it is enqueued
         AMQQueue data = _store.getMessages().get(1L);
-        assertNotNull(data);
-        
+        assertNull(data);
+
         // Dequeue message
         MockQueueEntry entry = new MockQueueEntry();
-        AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
-        
+        AMQMessage amqmsg = new AMQMessage(handle);
+
         entry.setMessage(amqmsg);
-        _queue.dequeue(null, entry);
-        
+        _queue.dequeue(entry);
+
         // Check that it is dequeued
         data = _store.getMessages().get(1L);
         assertNull(data);
     }
 
 
-    // FIXME: move this to somewhere useful
-    private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
-    {
-        final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
-                                                                                                   null,
-                                                                                                   false);
-        try
-        {
-            amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
-                                                              publishBody,
-                                                              new ContentHeaderBody()
-            {
-                public int getSize()
-                {
-                    return 1;
-                }
-            });
-        }
-        catch (AMQException e)
-        {
-            // won't happen
-        }
-
-
-        return amqMessageHandle;
-    }
-
     public class TestMessage extends AMQMessage
     {
         private final long _tag;
         private int _count;
 
-        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
                 throws AMQException
         {
-            super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+            this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new BasicContentHeaderProperties(), 0));
+
+        }
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, ContentHeaderBody chb)
+                throws AMQException
+        {
+            super(new MockStoredMessage(messageId, publishBody, chb));
             _tag = tag;
         }
 
-
         public boolean incrementReference()
         {
             _count++;
             return true;
         }
 
-        public void decrementReference(StoreContext context)
+        public void decrementReference()
         {
             _count--;
         }
@@ -489,10 +485,10 @@
             assertEquals("Wrong count for message with tag " + _tag, expected, _count);
         }
     }
-    
+
     protected AMQMessage createMessage(Long id) throws AMQException
     {
-        AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+        AMQMessage messageA = new TestMessage(id, id, info);
         return messageA;
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java Sun Oct 25 22:58:57 2009
@@ -27,8 +27,6 @@
 
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.AMQException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class SimpleAMQQueueThreadPoolTest extends TestCase
 {
@@ -47,7 +45,7 @@
             assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
 
             assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
-            
+
             queue.stop();
 
             assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount());
@@ -55,6 +53,6 @@
         finally
         {
             ApplicationRegistry.remove();
-        }       
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Sun Oct 25 22:58:57 2009
@@ -57,11 +57,11 @@
         BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile));
         out.write("<security><queueDenier>notyet</queueDenier><exchangeDenier>yes</exchangeDenier></security>");
         out.close();
-        
+
         _conf = new SecurityConfiguration(new XMLConfiguration(tmpFile));
-        
+
         // Create ACLManager
-        
+
         _pluginManager = new MockPluginManager("");
         _authzManager = new ACLManager(_conf, _pluginManager);
 
@@ -79,15 +79,15 @@
         // Correctly Close the AR we created
         ApplicationRegistry.remove();
         super.tearDown();
-    }    
-    
+    }
+
     public void testACLManagerConfigurationPluginManager() throws Exception
     {
         AMQQueue queue = new MockAMQQueue("notyet");
         AMQQueue otherQueue = new MockAMQQueue("other");
-        
+
         assertFalse(_authzManager.authoriseDelete(_session, queue));
-        
+
         // This should only be denied if the config hasn't been correctly passed in
         assertTrue(_authzManager.authoriseDelete(_session, otherQueue));
         assertTrue(_authzManager.authorisePurge(_session, queue));
@@ -96,11 +96,11 @@
     public void testACLManagerConfigurationPluginManagerACLPlugin() throws ConfigurationException
     {
         _authzManager = new ACLManager(_conf, _pluginManager, ExchangeDenier.FACTORY);
-        
+
         Exchange exchange = null;
         assertFalse(_authzManager.authoriseDelete(_session, exchange));
     }
-    
+
     public void testConfigurePlugins() throws ConfigurationException
     {
         Configuration hostConfig = new PropertiesConfiguration();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java Sun Oct 25 22:58:57 2009
@@ -14,16 +14,16 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.security.access;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.PrincipalHolder;
 
 public class ExchangeDenier extends AllowAll
 {
@@ -40,9 +40,9 @@
             return new ExchangeDenier();
         }
     };
-    
+
     @Override
-    public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+    public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
     {
         return AuthzResult.DENIED;
     }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java Sun Oct 25 22:58:57 2009
@@ -27,14 +27,13 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.amqp_0_9.ExchangeDeclareBodyImpl;
-import org.apache.qpid.framing.amqp_0_9.QueueDeclareBodyImpl;
 import org.apache.qpid.framing.amqp_8_0.QueueBindBodyImpl;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 
@@ -43,7 +42,7 @@
 
     private String _user = "user";
     private PrincipalPermissions _perms;
-    
+
     // Common things that are passed to frame constructors
     private AMQShortString _queueName = new AMQShortString(this.getClass().getName()+"queue");
     private AMQShortString _tempQueueName = new AMQShortString(this.getClass().getName()+"tempqueue");
@@ -65,18 +64,18 @@
     private AMQQueue _temporaryQueue;
     private Boolean _temporary = false;
     private Boolean _ownQueue = false;
-        
+
     @Override
     public void setUp()
     {
         //Highlight that this test will cause a new AR to be created
-        ApplicationRegistry.getInstance();        
+        ApplicationRegistry.getInstance();
 
         _perms = new PrincipalPermissions(_user);
-        try 
+        try
         {
             PropertiesConfiguration env = new PropertiesConfiguration();
-            _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
+            _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration("test", env));
             _exchange = DirectExchange.TYPE.newInstance(_virtualHost, _exchangeName, _durable, _ticket, _autoDelete);
             _queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, _virtualHost, _arguments);
             _temporaryQueue = AMQQueueFactory.createAMQQueueImpl(_tempQueueName, false, _owner , true, _virtualHost, _arguments);
@@ -132,27 +131,29 @@
         _perms.grant(Permission.CREATEQUEUE, grantArgs);
         assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEQUEUE, authArgs));
     }
-    
+
     // FIXME disabled, this fails due to grant putting the grant into the wrong map QPID-1598
     public void disableTestExchangeCreate()
     {
-        ExchangeDeclareBodyImpl exchangeDeclare = 
+        ExchangeDeclareBodyImpl exchangeDeclare =
             new ExchangeDeclareBodyImpl(_ticket, _exchangeName, _exchangeType, _passive, _durable,
                                         _autoDelete, _internal, _nowait, _arguments);
         Object[] authArgs = new Object[]{exchangeDeclare};
         Object[] grantArgs = new Object[]{_exchangeName, _exchangeType};
-        
+
         assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
         _perms.grant(Permission.CREATEEXCHANGE, grantArgs);
         assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
     }
-    
+
     public void testConsume()
     {
         Object[] authArgs = new Object[]{_queue};
         Object[] grantArgs = new Object[]{_queueName, _ownQueue};
 
-        assertEquals(AuthzResult.DENIED,_perms.authorise(Permission.CONSUME, authArgs));
+        /* FIXME: This throws a null pointer exception QPID-1599
+         * assertFalse(_perms.authorise(Permission.CONSUME, authArgs));
+         */
         _perms.grant(Permission.CONSUME, grantArgs);
         assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CONSUME, authArgs));
     }
@@ -166,7 +167,7 @@
         _perms.grant(Permission.PUBLISH, grantArgs);
         assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.PUBLISH, authArgs));
     }
-    
+
     public void testVhostAccess()
     {
         //Tests that granting a user Virtualhost level access allows all authorisation requests

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java Sun Oct 25 22:58:57 2009
@@ -14,21 +14,20 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.security.access;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
 import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.PrincipalHolder;
 
 public class QueueDenier extends AllowAll
 {
-    
+
     public static final ACLPluginFactory FACTORY = new ACLPluginFactory()
     {
         public boolean supportsTag(String name)
@@ -43,18 +42,18 @@
             return plugin;
         }
     };
-    
+
     private String _queueName = "";
 
-    
+
     @Override
-    public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+    public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
     {
         if (!(queue.getName().toString().equals(_queueName)))
         {
             return AuthzResult.ALLOWED;
-        } 
-        else 
+        }
+        else
         {
             return AuthzResult.DENIED;
         }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Sun Oct 25 22:58:57 2009
@@ -29,17 +29,13 @@
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.ExchangeBinding;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -103,7 +99,7 @@
 
         try
         {
-            _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration));
+            _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), configuration));
             ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost);
         }
         catch (Exception e)
@@ -169,7 +165,7 @@
         Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
         bindAllTopicQueuesToExchange(topicExchange, topicRouting);
 
-        //Send Message To NonDurable direct Exchange = persistent        
+        //Send Message To NonDurable direct Exchange = persistent
         sendMessageOnExchange(nonDurableExchange, directRouting, true);
         // and non-persistent
         sendMessageOnExchange(nonDurableExchange, directRouting, false);
@@ -344,22 +340,11 @@
 
         MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey);
 
-        IncomingMessage currentMessage = null;
+        final IncomingMessage currentMessage;
 
-        try
-        {
-            currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
-                                                 messageInfo,
-                                                 new NonTransactionalContext(_virtualHost.getMessageStore(),
-                                                                             new StoreContext(), null, null),
-                                                 new InternalTestProtocolSession(_virtualHost));
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
 
-        currentMessage.setMessageStore(_virtualHost.getMessageStore());
+        currentMessage = new IncomingMessage(messageInfo);
+
         currentMessage.setExchange(directExchange);
 
         ContentHeaderBody headerBody = new ContentHeaderBody();
@@ -379,35 +364,42 @@
 
         currentMessage.setExpiration();
 
-        try
-        {
-            currentMessage.route();
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+        MessageMetaData mmd = currentMessage.headersReceived();
+        currentMessage.setStoredMessage(_virtualHost.getMessageStore().addMessage(mmd));
+
+        currentMessage.route();
+
 
-        try
-        {
-            currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
 
         // check and deliver if header says body length is zero
         if (currentMessage.allContentReceived())
         {
-            try
-            {
-                currentMessage.deliverToQueues();
-            }
-            catch (AMQException e)
-            {
-                fail(e.getMessage());
-            }
+            // TODO Deliver to queues
+            ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore());
+            final List<AMQQueue> destinationQueues = currentMessage.getDestinationQueues();
+            trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
+                public void postCommit()
+                {
+                    try
+                    {
+                        AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
+
+                        for(AMQQueue queue : destinationQueues)
+                        {
+                            QueueEntry entry = queue.enqueue(message);
+                        }
+                    }
+                    catch (AMQException e)
+                    {
+                        e.printStackTrace();
+                    }
+                }
+
+                public void onRollback()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+            });
         }
     }
 
@@ -496,14 +488,7 @@
             fail(e.getMessage());
         }
 
-        try
-        {
-            _virtualHost.getQueueRegistry().registerQueue(queue);
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
+        _virtualHost.getQueueRegistry().registerQueue(queue);
 
     }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Sun Oct 25 22:58:57 2009
@@ -25,14 +25,15 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogSubject;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
 
 /**
  * A message store that does nothing. Designed to be used in tests that do not want to use any message store
@@ -45,8 +46,19 @@
     public void configure(String base, Configuration config) throws Exception
     {
     }
-    
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+
+    public void configureConfigStore(String name,
+                          ConfigurationRecoveryHandler recoveryHandler,
+                          Configuration config,
+                          LogSubject logSubject) throws Exception
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void configureMessageStore(String name,
+                                      MessageStoreRecoveryHandler recoveryHandler,
+                                      Configuration config,
+                                      LogSubject logSubject) throws Exception
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -55,7 +67,12 @@
     {
     }
 
-    public void removeMessage(StoreContext s, Long messageId)
+    public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void removeMessage(Long messageId)
     {
     }
 
@@ -85,24 +102,10 @@
 
     public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
-    }        
-
-    public void beginTran(StoreContext s) throws AMQException
-    {
     }
 
-    public boolean inTran(StoreContext sc)
-    {
-        return false;
-    }
 
-    public void commitTran(StoreContext storeContext) throws AMQException
-    {
-    }
 
-    public void abortTran(StoreContext storeContext) throws AMQException
-    {
-    }
 
     public List<AMQQueue> createQueues() throws AMQException
     {
@@ -114,22 +117,26 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    public void storeContentBodyChunk(
+            Long messageId,
+            int index,
+            ContentChunk contentBody,
+            boolean lastContentBody) throws AMQException
     {
 
     }
 
-    public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
+    public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
     {
 
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
     {
         return null;
     }
 
-    public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+    public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
     {
         return null;
     }
@@ -139,18 +146,75 @@
         return false;
     }
 
-    public void removeQueue(final AMQQueue queue) throws AMQException
+    public void storeMessageHeader(Long messageNumber, ServerMessage message)
     {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
 
+    public void storeContent(Long messageNumber, long offset, ByteBuffer body)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public ServerMessage getMessage(Long messageNumber)
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void removeQueue(final AMQQueue queue) throws AMQException
     {
 
     }
 
-    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void configureTransactionLog(String name,
+                                        TransactionLogRecoveryHandler recoveryHandler,
+                                        Configuration storeConfiguration,
+                                        LogSubject logSubject) throws Exception
     {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
 
+    public Transaction newTransaction()
+    {
+        return new Transaction()
+        {
+
+            public void enqueueMessage(TransactionLogResource  queue, Long messageId) throws AMQException
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+
+            public void dequeueMessage(TransactionLogResource  queue, Long messageId) throws AMQException
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+
+            public void commitTran() throws AMQException
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+
+            public StoreFuture commitTranAsync() throws AMQException
+            {
+                return new StoreFuture()
+                            {
+                                public boolean isComplete()
+                                {
+                                    return true;
+                                }
+
+                                public void waitForCompletion()
+                                {
+
+                                }
+                            };
+            }
+
+            public void abortTran() throws AMQException
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+        };
     }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java Sun Oct 25 22:58:57 2009
@@ -20,32 +20,79 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.List;
+import java.nio.ByteBuffer;
 
 /**
  * Adds some extra methods to the memory message store for testing purposes.
  */
 public class TestMemoryMessageStore extends MemoryMessageStore
 {
+    private AtomicInteger _messageCount = new AtomicInteger(0);
+
+
     public TestMemoryMessageStore()
     {
-        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
-        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
     }
 
-    public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+    @Override
+    public StoredMessage addMessage(StorableMessageMetaData metaData)
     {
-        return _metaDataMap;
+        return new TestableStoredMessage(super.addMessage(metaData));
     }
 
-    public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+    public int getMessageCount()
     {
-        return _contentBodyMap;
+        return _messageCount.get();
+    }
+
+    private class TestableStoredMessage implements StoredMessage
+    {
+        private final StoredMessage _storedMessage;
+
+        public TestableStoredMessage(StoredMessage storedMessage)
+        {
+            _messageCount.incrementAndGet();
+            _storedMessage = storedMessage;
+        }
+
+        public StorableMessageMetaData getMetaData()
+        {
+            return _storedMessage.getMetaData();
+        }
+
+        public long getMessageNumber()
+        {
+            return _storedMessage.getMessageNumber();
+        }
+
+        public void addContent(int offsetInMessage, ByteBuffer src)
+        {
+            _storedMessage.addContent(offsetInMessage, src);
+        }
+
+        public int getContent(int offsetInMessage, ByteBuffer dst)
+        {
+            return _storedMessage.getContent(offsetInMessage, dst);
+        }
+
+        public StoreFuture flushToStore()
+        {
+            return _storedMessage.flushToStore();
+        }
+
+        public void remove()
+        {
+            _storedMessage.remove();
+            _messageCount.decrementAndGet();
+        }
+
     }
+    
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Sun Oct 25 22:58:57 2009
@@ -26,9 +26,8 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 
 /**
  * Tests that reference counting works correctly with AMQMessage and the message store
@@ -37,13 +36,12 @@
 {
     private TestMemoryMessageStore _store;
 
-    private StoreContext _storeContext = new StoreContext();
-
 
     protected void setUp() throws Exception
     {
         super.setUp();
         _store = new TestMemoryMessageStore();
+
     }
 
     /**
@@ -83,11 +81,12 @@
         };
 
 
-        final long messageId = _store.getNewMessageId();
-        AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
-        messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
-        AMQMessage message = new AMQMessage(messageHandle,
-                                             _storeContext,info);
+
+        MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+        StoredMessage storedMessage = _store.addMessage(mmd);
+
+
+        AMQMessage message = new AMQMessage(storedMessage);
 
         message = message.takeReference();
 
@@ -95,9 +94,9 @@
  //       message.routingComplete(_store, _storeContext, new MessageHandleFactory());
 
 
-        assertEquals(1, _store.getMessageMetaDataMap().size());
-        message.decrementReference(_storeContext);
-        assertEquals(1, _store.getMessageMetaDataMap().size());
+        assertEquals(1, _store.getMessageCount());
+        message.decrementReference();
+        assertEquals(1, _store.getMessageCount());
     }
 
     private ContentHeaderBody createPersistentContentHeader()
@@ -141,25 +140,24 @@
             }
         };
 
-        final Long messageId = _store.getNewMessageId();
         final ContentHeaderBody chb = createPersistentContentHeader();
-        AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
-        messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
-        AMQMessage message = new AMQMessage(messageHandle,
-                                             _storeContext,
-                                            info);
-        
-        
+
+        MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+        StoredMessage storedMessage = _store.addMessage(mmd);
+
+        AMQMessage message = new AMQMessage(storedMessage);
+
+
         message = message.takeReference();
         // we call routing complete to set up the handle
      //   message.routingComplete(_store, _storeContext, new MessageHandleFactory());
 
 
 
-        assertEquals(1, _store.getMessageMetaDataMap().size());
+        assertEquals(1, _store.getMessageCount());
         message = message.takeReference();
-        message.decrementReference(_storeContext);
-        assertEquals(1, _store.getMessageMetaDataMap().size());
+        message.decrementReference();
+        assertEquals(1, _store.getMessageCount());
     }
 
     public static junit.framework.Test suite()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org