You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/11 17:10:48 UTC

svn commit: r824084 [4/5] - in /qpid/branches/java-broker-0-10/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apache/q...

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Sun Oct 11 15:10:43 2009
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.server.virtualhost;
 
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
@@ -30,7 +28,6 @@
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.configuration.ExchangeConfiguration;
 import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -52,11 +49,11 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.access.plugins.SimpleXML;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 
 import javax.management.NotCompliantMBeanException;
 import java.util.Collections;
@@ -228,17 +225,17 @@
         // file and write them in to the new routing Table.
         for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
         {
-            _messageStore.createQueue(cqt.queue, cqt.arguments);
+            getDurableConfigurationStore().createQueue(cqt.queue, cqt.arguments);
         }
 
         for (Exchange exchange : configFileRT.exchange)
         {
-            _messageStore.createExchange(exchange);
+            getDurableConfigurationStore().createExchange(exchange);
         }
 
         for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings)
         {
-            _messageStore.bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
+            getDurableConfigurationStore().bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
         }
 
         _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
@@ -352,7 +349,7 @@
 
     	if (queue.isDurable())
     	{
-    		_messageStore.createQueue(queue);
+    		getDurableConfigurationStore().createQueue(queue);
     	}
 
     	String exchangeName = queueConfiguration.getExchange();
@@ -416,6 +413,11 @@
         return _messageStore;
     }
 
+    public DurableConfigurationStore getDurableConfigurationStore()
+    {
+        return _messageStore;
+    }
+
     public AuthenticationManager getAuthenticationManager()
     {
         return _authenticationManager;
@@ -487,7 +489,7 @@
         {
         }
 
-        public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+        public void removeMessage(Long messageId) throws AMQException
         {
             //To change body of implemented methods use File | Settings | File Templates.
         }
@@ -553,6 +555,24 @@
             //To change body of implemented methods use File | Settings | File Templates.
         }
 
+        public StoreFuture commitTranAsync(StoreContext context) throws AMQException
+        {
+            commitTran(context);
+            return new StoreFuture() 
+                        {
+                            public boolean isComplete()
+                            {
+                                return true;
+                            }
+
+                            public void waitForCompletion()
+                            {
+
+                            }
+                        };
+
+        }
+
         public void abortTran(StoreContext context) throws AMQException
         {
             //To change body of implemented methods use File | Settings | File Templates.
@@ -568,22 +588,26 @@
             return null;  //To change body of implemented methods use File | Settings | File Templates.
         }
 
-        public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+        public void storeContentBodyChunk(
+                Long messageId,
+                int index,
+                ContentChunk contentBody,
+                boolean lastContentBody) throws AMQException
         {
             //To change body of implemented methods use File | Settings | File Templates.
         }
 
-        public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+        public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
         {
             //To change body of implemented methods use File | Settings | File Templates.
         }
 
-        public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+        public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
         {
             return null;  //To change body of implemented methods use File | Settings | File Templates.
         }
 
-        public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+        public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
         {
             return null;  //To change body of implemented methods use File | Settings | File Templates.
         }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java Sun Oct 11 15:10:43 2009
@@ -62,6 +62,6 @@
 
     protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
     {
-        fromQueue.removeMessagesFromQueue(start, end, _storeContext);
+        fromQueue.removeMessagesFromQueue(start, end);
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Sun Oct 11 15:10:43 2009
@@ -171,7 +171,7 @@
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders();
-//        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageId();
+//        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageNumber();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo();
@@ -182,14 +182,14 @@
 //        //Print out all the property names
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames();
 //
-//        msg.getMessageId();
+//        msg.getMessageNumber();
 //        msg.getSize();
 //        msg.getArrivalTime();
 
 //        msg.getDeliveredSubscription();
 //        msg.getDeliveredToConsumer();
 //        msg.getMessageHandle();
-//        msg.getMessageId();
+//        msg.getMessageNumber();
 //        msg.getMessagePublishInfo();
 //        msg.getPublisher();
 
@@ -352,7 +352,7 @@
             ispersitent.add(msg.isPersistent() ? "true" : "false");
 
 
-            isredelivered.add(msg.isRedelivered() ? "true" : "false");
+            isredelivered.add(entry.isRedelivered() ? "true" : "false");
 
             isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Sun Oct 11 15:10:43 2009
@@ -22,7 +22,6 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.SimpleQueueEntryList;
 import org.apache.qpid.server.queue.MockAMQMessage;
@@ -30,15 +29,15 @@
 import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.AMQException;
 
 import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
-import java.util.Iterator;
 
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -63,6 +62,7 @@
     UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
     private static final int INITIAL_MSG_COUNT = 10;
     private AMQQueue _queue = new MockAMQQueue(getName());
+    private MessageStore _messageStore = new MemoryMessageStore();
     private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
 
     @Override
@@ -137,7 +137,7 @@
 
         // requeueIfUnabletoResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, new StoreContext()));
+                                                                    msgToResend, true, _messageStore));
 
         assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -166,7 +166,7 @@
 
         // requeueIfUnabletoResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, new StoreContext()));
+                                                                    msgToResend, true, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -187,7 +187,7 @@
 
         // requeueIfUnabletoResend = true so all messages should go to msgToRequeue
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, new StoreContext()));
+                                                                    msgToResend, true, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -208,7 +208,7 @@
 
         // requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, new StoreContext()));
+                                                                    msgToResend, false, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -240,7 +240,7 @@
 
         // requeueIfUnabletoResend : value doesn't matter here as queue has been deleted
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, new StoreContext()));
+                                                                    msgToResend, false, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Sun Oct 11 15:10:43 2009
@@ -421,7 +421,7 @@
     {
         // Check default
         ServerConfiguration serverConfig = new ServerConfiguration(_config);
-        assertEquals(0, serverConfig.getMaximumMessageCount());
+        assertEquals(0, serverConfig.getMaximumMessageCount());                                       
 
         // Check value we set
         _config.setProperty("maximumMessageCount", 10L);

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun Oct 11 15:10:43 2009
@@ -29,13 +29,8 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.log4j.Logger;
@@ -54,8 +49,6 @@
      */
     private MessageStore _store = new MemoryMessageStore();
 
-    private StoreContext _storeContext = new StoreContext();
-
     private MessageHandleFactory _handleFactory = new MessageHandleFactory();
 
     private int count;
@@ -93,14 +86,18 @@
     }
 
 
-    protected void route(Message m) throws AMQException
+    protected int route(Message m) throws AMQException
     {
         m.route(exchange);
         m.getIncomingMessage().routingComplete(_store, _handleFactory);
         if(m.getIncomingMessage().allContentReceived())
         {
-            m.getIncomingMessage().deliverToQueues();
+            for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
+            {
+                q.enqueue(m);
+            }
         }
+        return m.getIncomingMessage().getDestinationQueues().size();
     }
 
     protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -120,10 +117,8 @@
 
     protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
     {
-        try
-        {
-            route(m);
-            assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+            int queueCount = route(m);
+
             for (TestQueue q : queues)
             {
                 if (expected.contains(q))
@@ -137,12 +132,11 @@
                     //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
                 }
             }
-        }
 
-        catch (NoRouteException ex)
-        {
-            assertTrue("Expected "+m+" not to be returned",expectReturn);
-        }
+            if(expectReturn)
+            {
+                assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
+            }
 
     }
 
@@ -244,6 +238,11 @@
     {
         final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
 
+        public String toString()
+        {
+            return getName().toString();
+        }
+
         public TestQueue(AMQShortString name) throws AMQException
         {
             super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
@@ -334,6 +333,11 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
+                public boolean releaseButRetain()
+                {
+                    return false;
+                }
+
                 public boolean immediateAndNotDelivered()
                 {
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
@@ -344,6 +348,16 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
+                public AMQMessageHeader getMessageHeader()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isPersistent()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public boolean isRedelivered()
                 {
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
@@ -369,7 +383,7 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void requeue(StoreContext storeContext) throws AMQException
+                public void requeue()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
@@ -379,12 +393,12 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+                public void dequeue()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void dispose(final StoreContext storeContext) throws MessageCleanupException
+                public void dispose()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
@@ -394,7 +408,7 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+                public void discard()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
@@ -438,10 +452,9 @@
 
             public TestIncomingMessage(final long messageId,
                                        final MessagePublishInfo info,
-                                       final TransactionalContext txnContext,
                                        final AMQProtocolSession publisher)
             {
-                super(messageId, info, txnContext, publisher);
+                super(messageId, info, publisher);
             }
 
 
@@ -468,14 +481,6 @@
 
         private static MessageStore _messageStore = new SkeletonMessageStore();
 
-        private static StoreContext _storeContext = new StoreContext();
-
-
-        private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                                                      null,
-                                                                         new LinkedList<RequiredDeliveryException>()
-        );
-
         Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
         {
             this(protocolSession, id, getHeaders(headers));
@@ -496,11 +501,11 @@
                         ContentHeaderBody header,
                         List<ContentBody> bodies) throws AMQException
         {
-            super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+            super(createMessageHandle(messageId, publish, header), header, header.bodySize, publish);
 
 
             
-            _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession);
+            _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
             _incoming.setContentHeaderBody(header);
 
 
@@ -515,14 +520,7 @@
                                                                                                        _messageStore,
                                                                                                        true);
 
-            try
-            {
-                amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
-            }
-            catch (AMQException e)
-            {
-                
-            }
+            amqMessageHandle.setPublishAndContentHeaderBody(publish,header);
             return amqMessageHandle;
         }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Sun Oct 11 15:10:43 2009
@@ -25,20 +25,14 @@
 import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 
-import java.util.LinkedList;
-
 public class DestWildExchangeTest extends TestCase
 {
 
@@ -46,7 +40,6 @@
 
     VirtualHost _vhost;
     MessageStore _store;
-    StoreContext _context;
 
     InternalTestProtocolSession _protocolSession;
 
@@ -56,7 +49,6 @@
         _exchange = new TopicExchange();
         _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
         _store = new MemoryMessageStore();
-        _context = new StoreContext();
         _protocolSession = new InternalTestProtocolSession(_vhost);
     }
 
@@ -74,7 +66,7 @@
 
         MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
 
-        IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
+        IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
 
         message.enqueue(_exchange.route(message));
 
@@ -89,33 +81,20 @@
 
         IncomingMessage message = createMessage("a.b");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has  route and should be routed");
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("a.c");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has no route and should fail to be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        int queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
     }
@@ -129,52 +108,33 @@
 
         IncomingMessage message = createMessage("a.b");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has  route and should be routed");
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("a.c");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has route and should be routed");
-        }
+        int queueCount = routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("a");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has no route and should fail to be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+
+        queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
     }
@@ -187,89 +147,56 @@
 
         IncomingMessage message = createMessage("a.b.c");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has route and should be routed");
-        }
+        int queueCount = routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("a.b");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has route and should be routed");
-        }
+        queueCount = routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("a.c");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has route and should be routed");
-        }
+        queueCount = routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
         message = createMessage("a");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has route and should be routed");
-        }
+        queueCount = routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("b");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has no route and should fail to be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+
+        queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
     }
@@ -283,38 +210,24 @@
 
         IncomingMessage message = createMessage("a.c.d.b");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has no route and should be routed");
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
         message = createMessage("a.c.b");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has no route and should be routed");
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
     }
@@ -327,66 +240,39 @@
 
         IncomingMessage message = createMessage("a.c.b.b");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has route and should not be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        int queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("a.a.b.c");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has no route and should be routed");
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
         message = createMessage("a.b.c.b");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has  route and should not be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
 
         message = createMessage("a.b.c.b.c");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has no route and should be routed");
-
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
     }
@@ -400,34 +286,21 @@
 
         IncomingMessage message = createMessage("a.c.b.b.c");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has route and should not be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        int queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
 
 
         message = createMessage("a.a.b.c.d");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has no route and should be routed");
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
     }
@@ -440,33 +313,20 @@
 
         IncomingMessage message = createMessage("a.c.b.b.c");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has route and should not be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        int queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
 
         message = createMessage("a.a.b.c.d");
 
-        try
-        {
-            routeMessage(message);
-        }
-        catch (AMQException nre)
-        {
-            fail("Message has no route and should be routed");
-        }
+        routeMessage(message);
 
         Assert.assertEquals(1, queue.getMessageCount());
 
-        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
 
-        queue.deleteMessageFromTop(_context);
+        queue.deleteMessageFromTop();
         Assert.assertEquals(0, queue.getMessageCount());
 
     }
@@ -479,25 +339,24 @@
 
         IncomingMessage message = createMessage("a.b.c");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has route and should not be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        int queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
 
     }
 
-    private void routeMessage(final IncomingMessage message)
+    private int routeMessage(final IncomingMessage message)
             throws AMQException
     {
         message.enqueue(_exchange.route(message));
         message.routingComplete(_store, new MessageHandleFactory());
-        message.deliverToQueues();
+        AMQMessage msg = new AMQMessage(message.getMessageHandle(), message.getContentHeader(), message.getSize(), message.getMessagePublishInfo());
+        for(AMQQueue q : message.getDestinationQueues())
+        {
+            q.enqueue(msg);
+        }
+        return message.getDestinationQueues().size();
     }
 
     public void testMoreRouting() throws AMQException
@@ -508,14 +367,8 @@
 
         IncomingMessage message = createMessage("a.b.c");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has route and should not be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        int queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
 
@@ -529,14 +382,8 @@
 
         IncomingMessage message = createMessage("a");
 
-        try
-        {
-            routeMessage(message);
-            fail("Message has route and should not be routed");
-        }
-        catch (AMQException nre)
-        {
-        }
+        int queueCount = routeMessage(message);
+        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
 
         Assert.assertEquals(0, queue.getMessageCount());
 
@@ -546,11 +393,7 @@
     {
         MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
 
-        TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
-                                                                       new LinkedList<RequiredDeliveryException>()
-        );
-
-        IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession);
+        IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
         message.setContentHeaderBody( new ContentHeaderBody());
 
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Sun Oct 11 15:10:43 2009
@@ -21,17 +21,17 @@
 package org.apache.qpid.server.protocol;
 
 import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TestNetworkDriver;
@@ -68,7 +68,7 @@
     public byte getProtocolMajorVersion()
     {
         return (byte) 8;
-    }
+    }    
 
     public byte getProtocolMinorVersion()
     {
@@ -99,6 +99,15 @@
         }
     }
 
+    public void writeReturn(MessagePublishInfo messagePublishInfo,
+                            ContentHeaderBody header,
+                            Iterator<AMQDataBlock> bodyFrameIterator,
+                            int channelId,
+                            int replyCode,
+                            AMQShortString replyText) throws AMQException
+    {
+
+    }
     // *** ProtocolOutputConverter Implementation
     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
@@ -108,7 +117,7 @@
     {
     }
 
-    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
     {
         _deliveryCount.incrementAndGet();
 
@@ -130,11 +139,11 @@
                 consumers.put(consumerTag, consumerDelivers);
             }
 
-            consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+            consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
         }
     }
 
-    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Sun Oct 11 15:10:43 2009
@@ -20,13 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-
-import javax.management.Notification;
-
 import junit.framework.TestCase;
-
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
@@ -34,7 +28,6 @@
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -42,13 +35,13 @@
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import javax.management.Notification;
+import java.util.ArrayList;
+
 /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
 public class AMQQueueAlertTest extends TestCase
 {                                                         
@@ -61,11 +54,6 @@
     private VirtualHost _virtualHost;
     private AMQProtocolEngine _protocolSession;
     private MessageStore _messageStore = new MemoryMessageStore();
-    private StoreContext _storeContext = new StoreContext();
-    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                                                     null,
-                                                                                     new LinkedList<RequiredDeliveryException>()
-    );
     private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
 
     /**
@@ -75,6 +63,10 @@
      */
     public void testMessageCountAlert() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -82,7 +74,7 @@
 
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
 
-        sendMessages(MAX_MESSAGE_COUNT, 256l);
+        sendMessages(channel, MAX_MESSAGE_COUNT, 256l);
         assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
 
         Notification lastNotification = _queueMBean.getLastNotification();
@@ -99,6 +91,10 @@
      */
     public void testMessageSizeAlert() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -106,7 +102,7 @@
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
         _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
 
-        sendMessages(1, MAX_MESSAGE_SIZE * 2);
+        sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2);
         assertTrue(_queueMBean.getMessageCount() == 1);
 
         Notification lastNotification = _queueMBean.getLastNotification();
@@ -125,6 +121,10 @@
      */
     public void testQueueDepthAlertNoSubscriber() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -134,7 +134,7 @@
 
         while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
         {
-            sendMessages(1, MAX_MESSAGE_SIZE);
+            sendMessages(channel, 1, MAX_MESSAGE_SIZE);
         }
 
         Notification lastNotification = _queueMBean.getLastNotification();
@@ -154,6 +154,10 @@
      */
     public void testMessageAgeAlert() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -161,7 +165,7 @@
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
         _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
 
-        sendMessages(1, MAX_MESSAGE_SIZE);
+        sendMessages(channel, 1, MAX_MESSAGE_SIZE);
 
         // Ensure message sits on queue long enough to age.
         Thread.sleep(MAX_MESSAGE_AGE * 2);
@@ -201,7 +205,7 @@
         // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
         int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
         long totalSize = (messageCount * MAX_MESSAGE_SIZE);
-        sendMessages(messageCount, MAX_MESSAGE_SIZE);
+        sendMessages(channel, messageCount, MAX_MESSAGE_SIZE);
 
         // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
         // so there should be no Queue_Deoth alert raised
@@ -281,7 +285,7 @@
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = size;   // in bytes
-        IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+        IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
         message.setContentHeaderBody(contentHeaderBody);
 
         return message;
@@ -305,7 +309,7 @@
     }
 
 
-    private void sendMessages(long messageCount, final long size) throws AMQException
+    private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
     {
         IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
         for (int i = 0; i < messages.length; i++)
@@ -339,7 +343,11 @@
                     
                 }
             });
-            messages[i].deliverToQueues();
+            _queue.enqueue(new AMQMessage(messages[i].getMessageHandle(),
+                                          messages[i].getContentHeader(),
+                                          messages[i].getSize(),
+                                          messages[i].getMessagePublishInfo()));
+
         }
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Sun Oct 11 15:10:43 2009
@@ -29,7 +29,6 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactory;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,10 +37,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.mina.common.ByteBuffer;
@@ -49,8 +45,6 @@
 import javax.management.JMException;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
 
 /**
  * Test class to test AMQQueueMBean attribtues and operations
@@ -61,8 +55,6 @@
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
     private MessageStore _messageStore;
-    private StoreContext _storeContext = new StoreContext();
-    private TransactionalContext _transactionalContext;
     private VirtualHost _virtualHost;
     private AMQProtocolSession _protocolSession;
     private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
@@ -108,7 +100,7 @@
         //Ensure that the data has been removed from the Store
         verifyBrokerState();
     }
-    
+
     public void testDeleteMessages() throws Exception
     {
         int messageCount = 10;
@@ -129,9 +121,9 @@
         }
         catch(Exception e)
         {
-            
+
         }
-        
+
         //delete last message, leaving 2nd to 9th
         _queueMBean.deleteMessages(10L,10L);
         assertTrue(_queueMBean.getMessageCount() == (messageCount - 2));
@@ -143,7 +135,7 @@
         }
         catch(Exception e)
         {
-            
+
         }
 
         //delete remaining messages, leaving none
@@ -162,7 +154,7 @@
         TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
 
         // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
-        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       
+        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
         assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
         assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
         assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
@@ -170,7 +162,7 @@
 
     public void testConsumerCount() throws AMQException
     {
-        
+
         assertTrue(_queue.getActiveConsumerCount() == 0);
         assertTrue(_queueMBean.getActiveConsumerCount() == 0);
 
@@ -182,7 +174,7 @@
 
         Subscription subscription =
                 SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager());
-        
+
         _queue.registerSubscription(subscription, false);
         assertEquals(1,(int)_queueMBean.getActiveConsumerCount());
 
@@ -225,7 +217,6 @@
         assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth));
 
         assertTrue(_queueMBean.getName().equals("testQueue"));
-        assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
         assertFalse(_queueMBean.isAutoDelete());
         assertFalse(_queueMBean.isDurable());
     }
@@ -261,7 +252,7 @@
         {
 
         }
-        
+
         try
         {
             long end = Integer.MAX_VALUE;
@@ -275,13 +266,12 @@
         }
 
         IncomingMessage msg = message(false, false);
-        long id = msg.getMessageId();
-        _queue.clearQueue(_storeContext);
+        long id = msg.getMessageNumber();
+        _queue.clearQueue();
         ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
         qs.add(_queue);
         msg.enqueue(qs);
         msg.routingComplete(_messageStore, new MessageHandleFactory());
-
         msg.addContentBodyFrame(new ContentChunk()
         {
             ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
@@ -301,7 +291,12 @@
 
             }
         });
-        msg.deliverToQueues();
+
+        AMQMessage m = new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo());
+        for(AMQQueue q : msg.getDestinationQueues())
+        {
+            q.enqueue(m);
+        }
 //        _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
         _queueMBean.viewMessageContent(id);
         try
@@ -350,7 +345,7 @@
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
         contentHeaderBody.properties = new BasicContentHeaderProperties();
         ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
-        IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext,  _protocolSession);
+        IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
         msg.setContentHeaderBody(contentHeaderBody);
         return msg;
 
@@ -364,11 +359,6 @@
         _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
         _messageStore = _virtualHost.getMessageStore();
 
-        _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                            null,
-                                                            new LinkedList<RequiredDeliveryException>()
-        );
-
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost,
                                                     null);
         _queueMBean = new AMQQueueMBean(_queue);
@@ -400,7 +390,12 @@
                                                        .convertToContentChunk(
                                                        new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
                                                                        MESSAGE_SIZE)));
-            currentMessage.deliverToQueues();
+
+            AMQMessage m = new AMQMessage(currentMessage.getMessageHandle(), currentMessage.getContentHeader(), currentMessage.getSize(), currentMessage.getMessagePublishInfo());
+            for(AMQQueue q : currentMessage.getDestinationQueues())
+            {
+                q.enqueue(m);
+            }
 
 
         }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Sun Oct 11 15:10:43 2009
@@ -28,7 +28,8 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,15 +40,9 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.Set;
-import java.util.Collections;
 
 /**
  * Tests that acknowledgements are handled correctly.
@@ -62,8 +57,6 @@
 
     private TestMemoryMessageStore _messageStore;
 
-    private StoreContext _storeContext = new StoreContext();
-
     private AMQChannel _channel;
 
     private AMQQueue _queue;
@@ -99,9 +92,6 @@
 
     private void publishMessages(int count, boolean persistent) throws AMQException
     {
-        TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
-                                                                      new LinkedList<RequiredDeliveryException>()
-        );
         _queue.registerSubscription(_subscription,false);
         MessageHandleFactory factory = new MessageHandleFactory();
         for (int i = 1; i <= count; i++)
@@ -136,7 +126,7 @@
                     return new AMQShortString("rk");
                 }
             };
-            IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+            final IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, _protocolSession);
             //IncomingMessage msg2 = null;
             if (persistent)
             {
@@ -160,7 +150,26 @@
             msg.routingComplete(_messageStore, factory);
             if(msg.allContentReceived())
             {
-                msg.deliverToQueues();
+                Transaction txn = new AutoCommitTransaction(_messageStore);
+                txn.enqueue(_queue, msg, new Transaction.Action() {
+                    public void postCommit()
+                    {
+                        try
+                        {
+                            _queue.enqueue(new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo()));
+                        }
+                        catch (AMQException e)
+                        {
+                             throw new RuntimeException(e);
+                        }
+                    }
+
+                    public void onRollback()
+                    {
+                        //To change body of implemented methods use File | Settings | File Templates.
+                    }
+                });
+
             }
             // we manually send the message to the subscription
             //_subscription.send(new QueueEntry(_queue,msg), _queue);
@@ -178,8 +187,7 @@
         publishMessages(msgCount, true);
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
-        assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+        assertEquals("",msgCount,map.size());
 
         Set<Long> deliveryTagSet = map.getDeliveryTags();
         int i = 1;
@@ -191,8 +199,6 @@
             assertTrue(unackedMsg.getQueue() == _queue);
         }
 
-        assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
     }
 
     /**

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Sun Oct 11 15:10:43 2009
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 
@@ -30,8 +29,7 @@
             throws AMQException
     {
        super(new MockAMQMessageHandle(messageId) ,
-                (StoreContext)null,
-                (MessagePublishInfo)new MockMessagePublishInfo());
+             (MessagePublishInfo)new MockMessagePublishInfo());
     }
 
     protected MockAMQMessage(AMQMessage msg)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java Sun Oct 11 15:10:43 2009
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.store.StoreContext;
-
 public class MockAMQMessageHandle extends InMemoryMessageHandle
 {
     public MockAMQMessageHandle(final Long messageId)
@@ -30,7 +28,7 @@
     }
 
     @Override
-    public long getBodySize(StoreContext store)
+    public long getBodySize()
     {
       return 0l;
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Sun Oct 11 15:10:43 2009
@@ -23,25 +23,19 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.PrincipalHolder;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
 
 import java.util.List;
 import java.util.Set;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
 
 public class MockAMQQueue implements AMQQueue
 {
@@ -51,6 +45,8 @@
 
     private PrincipalHolder _principalHolder;
 
+    private Object _exclusiveOwner;
+
     public MockAMQQueue(String name)
     {
        _name = new AMQShortString(name);
@@ -171,7 +167,7 @@
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+    public void requeue(QueueEntry entry)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -181,7 +177,7 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
+    public void dequeue(QueueEntry entry)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -236,7 +232,7 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+    public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -295,12 +291,12 @@
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+    public void deleteMessageFromTop()
     {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public long clearQueue(StoreContext storeContext) throws AMQException
+    public long clearQueue()
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
@@ -399,4 +395,15 @@
         _principalHolder = principalHolder;
     }
 
+    public Object getExclusiveOwner()
+    {
+        return _exclusiveOwner;
+    }
+
+    public void setExclusiveOwner(Object exclusiveOwner)
+    {
+        _exclusiveOwner = exclusiveOwner;
+    }
+
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Sun Oct 11 15:10:43 2009
@@ -21,8 +21,8 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
 
 public class MockQueueEntry implements QueueEntry
 {
@@ -59,17 +59,17 @@
         return false;
     }
 
-    public void dequeue(StoreContext storeContext) throws FailedDequeueException
+    public void dequeue()
     {
 
     }
 
-    public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+    public void discard()
     {
 
     }
 
-    public void dispose(StoreContext storeContext) throws MessageCleanupException
+    public void dispose()
     {
 
     }
@@ -154,7 +154,12 @@
 
     }
 
-    
+    public boolean releaseButRetain()
+    {
+        return false;
+    }
+
+
     public boolean removeStateChangeListener(StateChangeListener listener)
     {
 
@@ -162,7 +167,7 @@
     }
 
     
-    public void requeue(StoreContext storeContext) throws AMQException
+    public void requeue()
     {
 
 
@@ -187,6 +192,16 @@
 
     }
 
+    public AMQMessageHeader getMessageHeader()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean isPersistent()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public boolean isRedelivered()
     {
         return false;  



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org