You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/05/02 18:49:05 UTC

svn commit: r534541 [1/3] - in /incubator/qpid/trunk/qpid/java: broker/etc/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/exception/ broker/src/main/j...

Author: rajith
Date: Wed May  2 09:49:03 2007
New Revision: 534541

URL: http://svn.apache.org/viewvc?view=rev&rev=534541
Log:
I am commiting the patch supplied by Arnaud Simon. This patch contains support for dtx.
Currently there is one test case failing. I will try to fix it, if not Arnuad will provide a patch soon

Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/etc/config.xml
    incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml
    incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: incubator/qpid/trunk/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/config.xml?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/config.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/config.xml Wed May  2 09:49:03 2007
@@ -90,14 +90,18 @@
     <virtualhosts>
         <virtualhost>
             <name>localhost</name>
-            <localhost>
-                <store>
-                    <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
-                    <environment-path>${work}/localhost-store</environment-path> -->
-
-                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
-                </store>
-
+            <localhost>                
+							  <store>                  
+                    <environment-path>${work}/localhost-store</environment-path> 
+                   <!-- <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> -->
+                    <class>org.apache.qpid.server.messageStore.MemoryMessageStore</class>
+                </store>             
+                <txn>
+                    <environment-tx-timeout>60</environment-tx-timeout>
+                	  <!-- <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> -->
+                	  <class>org.apache.qpid.server.messageStore.MemoryTransactionManager</class>
+                </txn>
+                
                 <security>
                     <!-- Need protocol changes to allow this-->
                     <authentication>
@@ -125,9 +129,12 @@
         <virtualhost>
             <name>development</name>
             <development>
-                <store>
-                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
-                </store>
+                <store>                  
+                    <class>org.apache.qpid.server.messageStore.MemoryMessageStore</class>
+                </store>             
+                <txn>
+                  	  <class>org.apache.qpid.server.messageStore.MemoryTransactionManager</class>
+                </txn>
                 <security>
                     <name>passwordfile-notusedyet</name>
                     <mechanism>PLAIN</mechanism>
@@ -139,9 +146,12 @@
         <virtualhost>
             <name>test</name>
             <test>
-                <store>
-                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
-                </store>
+                <store>                  
+                    <class>org.apache.qpid.server.messageStore.MemoryMessageStore</class>
+                </store>             
+                <txn>
+                  	  <class>org.apache.qpid.server.messageStore.MemoryTransactionManager</class>
+                </txn>
                 <security>
                     <name>passwordfile-notusedyet</name>
                     <mechanism>PLAIN</mechanism>

Modified: incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml Wed May  2 09:49:03 2007
@@ -73,12 +73,15 @@
     <virtualhosts>
         <virtualhost>
             <name>localhost</name>
-            <localhost>
-                <store>
-                    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+            <localhost>               
+								<store>                  
                     <environment-path>${work}/bdbstore/localhost-store</environment-path>
-                </store>
-
+                   <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> 
+                </store>             
+                <txn>
+                    <environment-tx-timeout>60</environment-tx-timeout>
+                	 <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> 
+                </txn>
                 <security>
                     <access>
                         <class>org.apache.qpid.server.security.access.PrincipalDatabaseAccessManager</class>
@@ -100,20 +103,28 @@
         <virtualhost>
             <name>development</name>
             <development>
-                <store>
-                    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
-                    <environment-path>${work}/bdbstore/development-store</environment-path>
-                </store>
+            <store>                  
+                   <environment-path>${work}/bdbstore/development-store</environment-path>
+                   <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> 
+           </store>             
+                <txn>
+                    <environment-tx-timeout>60</environment-tx-timeout>
+                	 <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> 
+                </txn>              
             </development>
         </virtualhost>
 
         <virtualhost>
             <name>test</name>
             <test>
-                <store>
-                    <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
-                    <environment-path>${work}/bdbstore/test-store</environment-path>
-                </store>
+              <store>                  
+                   <environment-path>${work}/bdbstore/test-store</environment-path>
+                   <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> 
+           </store>             
+                <txn>
+                    <environment-tx-timeout>60</environment-tx-timeout>
+                	 <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> 
+                </txn>     
             </test>
         </virtualhost>
 

Modified: incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml Wed May  2 09:49:03 2007
@@ -73,9 +73,14 @@
         <virtualhost>
             <name>localhost</name>
             <localhost>
-                <store>
-                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
-                </store>
+                <store>                  
+                   <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> 
+                </store>             
+                <txn>
+                    <environment-tx-timeout>60</environment-tx-timeout>
+                	 <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> 
+                </txn>
+            
 
                 <security>
                     <access>
@@ -98,18 +103,26 @@
         <virtualhost>
             <name>development</name>
             <development>
-                <store>
-                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
-                </store>
+                  <store>                  
+                   <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> 
+                </store>             
+                <txn>
+                    <environment-tx-timeout>60</environment-tx-timeout>
+                	 <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> 
+                </txn>
             </development>
         </virtualhost>
 
         <virtualhost>
             <name>test</name>
             <test>
-                <store>
-                    <class>org.apache.qpid.server.store.MemoryMessageStore</class>
-                </store>
+                 <store>                  
+                   <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> 
+                </store>             
+                <txn>
+                    <environment-tx-timeout>60</environment-tx-timeout>
+                	 <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> 
+                </txn>
             </test>
         </virtualhost>
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Wed May  2 09:49:03 2007
@@ -58,8 +58,10 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
 
 /**
  * This MBean implements the broker management interface and exposes the
@@ -100,7 +102,6 @@
      * @param exchangeName
      * @param type
      * @param durable
-     * @param autoDelete
      * @throws JMException
      */
     public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException
@@ -158,7 +159,6 @@
      * @param queueName
      * @param durable
      * @param owner
-     * @param autoDelete
      * @throws JMException
      */
     public void createNewQueue(String queueName, String owner, boolean durable) throws JMException
@@ -180,7 +180,13 @@
             queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
             if (queue.isDurable() && !queue.isAutoDelete())
             {
-                _messageStore.createQueue(queue);
+                try
+                {
+                    _messageStore.createQueue(queue);
+                } catch (Exception e)
+                {
+                    throw new JMException("problem creating queue " + queue.getName());
+                }
             }
 
             Configuration virtualHostDefaultQueueConfiguration =
@@ -222,10 +228,9 @@
         try
         {
             queue.delete();
-            _messageStore.removeQueue(new AMQShortString(queueName));
-
+            _messageStore.destroyQueue(queue);
         }
-        catch (AMQException ex)
+        catch (Exception ex)
         {
             JMException jme = new JMException(ex.getMessage());
             jme.initCause(ex);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May  2 09:49:03 2007
@@ -47,11 +47,9 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.queue.Subscription;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.LocalTransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.*;
 
 public class AMQChannel
 {
@@ -93,6 +91,8 @@
 
     private final MessageStore _messageStore;
 
+    private final TransactionManager _transactionManager;
+
     private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -116,7 +116,8 @@
     //Why do we need this reference ? - ritchiem
     private final AMQProtocolSession _session;
 
-    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
+
+    public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
     {
         _session = session;
@@ -125,6 +126,7 @@
         _prefetch_HighWaterMark = DEFAULT_PREFETCH;
         _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
         _messageStore = messageStore;
+        _transactionManager = transactionManager;
         _exchanges = exchanges;
         // by default the session is non-transactional
         _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
@@ -133,7 +135,7 @@
     /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
-        _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
+        _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
     }
 
     public boolean isTransactional()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Wed May  2 09:49:03 2007
@@ -36,8 +36,10 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
 
 public class VirtualHostConfiguration
 {
@@ -48,7 +50,9 @@
     private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
 
 
-    public VirtualHostConfiguration(String configFile) throws ConfigurationException
+    public VirtualHostConfiguration(String configFile)
+            throws
+            ConfigurationException
     {
         _logger.info("Loading Config file:" + configFile);
 
@@ -57,24 +61,25 @@
     }
 
 
-
-    private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
+    private void configureVirtualHost(String virtualHostName, Configuration configuration)
+            throws
+            ConfigurationException,
+            AMQException
     {
-        _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
+        _logger.debug("Loding configuration for virtaulhost: " + virtualHostName);
 
 
         VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
 
 
-
-        if(virtualHost == null)
+        if (virtualHost == null)
         {
             throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
         }
 
         List exchangeNames = configuration.getList("exchanges.exchange.name");
 
-        for(Object exchangeNameObj : exchangeNames)
+        for (Object exchangeNameObj : exchangeNames)
         {
             String exchangeName = String.valueOf(exchangeNameObj);
             configureExchange(virtualHost, exchangeName, configuration);
@@ -83,7 +88,7 @@
 
         List queueNames = configuration.getList("queues.queue.name");
 
-        for(Object queueNameObj : queueNames)
+        for (Object queueNameObj : queueNames)
         {
             String queueName = String.valueOf(queueNameObj);
             configureQueue(virtualHost, queueName, configuration);
@@ -91,12 +96,14 @@
 
     }
 
-    private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException
+    private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration)
+            throws
+            AMQException
     {
 
         CompositeConfiguration exchangeConfiguration = new CompositeConfiguration();
 
-        exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
+        exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange." + exchangeNameString));
         exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
 
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -110,18 +117,17 @@
         Exchange exchange;
 
 
-
         synchronized (exchangeRegistry)
         {
             exchange = exchangeRegistry.getExchange(exchangeName);
-            if(exchange == null)
+            if (exchange == null)
             {
 
-                AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct"));
-                boolean durable = exchangeConfiguration.getBoolean("durable",false);
-                boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false);
+                AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type", "direct"));
+                boolean durable = exchangeConfiguration.getBoolean("durable", false);
+                boolean autodelete = exchangeConfiguration.getBoolean("autodelete", false);
 
-                Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
+                Exchange newExchange = exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
                 exchangeRegistry.registerExchange(newExchange);
             }
 
@@ -149,11 +155,14 @@
         return queueConfiguration;
     }
 
-    private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
+    private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration)
+            throws
+            AMQException,
+            ConfigurationException
     {
         CompositeConfiguration queueConfiguration = new CompositeConfiguration();
 
-        queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString));
+        queueConfiguration.addConfiguration(configuration.subset("queues.queue." + queueNameString));
         queueConfiguration.addConfiguration(configuration.subset("queues"));
 
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -173,7 +182,7 @@
             {
                 _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName());
 
-                boolean durable = queueConfiguration.getBoolean("durable" ,false);
+                boolean durable = queueConfiguration.getBoolean("durable", false);
                 boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
                 String owner = queueConfiguration.getString("owner", null);
 
@@ -184,21 +193,32 @@
 
                 if (queue.isDurable())
                 {
-                    messageStore.createQueue(queue);
+                    try
+                    {
+                        messageStore.createQueue(queue);
+                    } catch (InternalErrorException e)
+                    {
+                        _logger.error("Problem when creating Queue '" + queueNameString
+                                + "' on virtual host " + virtualHost.getName() + ", not creating.");
+
+                    } catch (QueueAlreadyExistsException e)
+                    {
+                        _logger.error("Queue '" + queueNameString
+                                + "' already exists on virtual host " + virtualHost.getName() + ", not creating.");
+                    }
                 }
 
                 queueRegistry.registerQueue(queue);
-            }
-            else
+            } else
             {
-                _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating.");
+                _logger.info("Queue '" + queueNameString + "' already exists on virtual host " + virtualHost.getName() + ", not creating.");
             }
 
             String exchangeName = queueConfiguration.getString("exchange", null);
 
             Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
 
-            if(exchange == null)
+            if (exchange == null)
             {
                 exchange = virtualHost.getExchangeRegistry().getDefaultExchange();
             }
@@ -211,15 +231,15 @@
             synchronized (exchange)
             {
                 List routingKeys = queueConfiguration.getList("routingKey");
-                if(routingKeys == null || routingKeys.isEmpty())
+                if (routingKeys == null || routingKeys.isEmpty())
                 {
                     routingKeys = Collections.singletonList(queue.getName());
                 }
 
-                for(Object routingKeyNameObj : routingKeys)
+                for (Object routingKeyNameObj : routingKeys)
                 {
                     AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
-                    
+
 
                     queue.bind(routingKey, null, exchange);
 
@@ -227,31 +247,33 @@
                     _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
                 }
 
-                if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
+                if (exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
                 {
-                    queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());                    
+                    queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
                 }
             }
 
         }
 
 
-
         Configurator.configure(queue, queueConfiguration);
     }
 
 
-    public void performBindings() throws AMQException, ConfigurationException
+    public void performBindings()
+            throws
+            AMQException,
+            ConfigurationException
     {
         List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
         String defaultVirtualHostName = _config.getString("default");
-        if(defaultVirtualHostName != null)
+        if (defaultVirtualHostName != null)
         {
-            ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName);            
+            ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName);
         }
         _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
 
-        for(Object nameObject : virtualHostNames)
+        for (Object nameObject : virtualHostNames)
         {
             String name = String.valueOf(nameObject);
             configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
@@ -263,7 +285,6 @@
                     "Virtualhost Configuration document does not contain a valid virtualhost.");
         }
     }
-
 
 
 }

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:52:29
+ */
+public class CommandInvalidException extends Exception
+{
+    /**
+     * Constructs a new CommandInvalidException with the specified detail message.
+     *
+     * @param message the detail message.
+     */
+    public CommandInvalidException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new CommandInvalidException with the specified detail message and
+     * cause.
+     *
+     * @param message the detail message .
+     * @param cause   the cause.
+     */
+    public CommandInvalidException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new CommandInvalidException with the specified cause.
+     *
+     * @param cause the cause
+     */
+    public CommandInvalidException(Throwable cause)
+    {
+        super(cause);
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,57 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:41:53
+ */
+public class InternalErrorException extends Exception
+{
+    /**
+     * Constructs a new InternalErrorException with the specified detail message.
+     *
+     * @param message the detail message.
+     */
+    public InternalErrorException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new InternalErrorException with the specified detail message and
+     * cause.
+     *
+     * @param message the detail message .
+     * @param cause   the cause.
+     */
+    public InternalErrorException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new InternalErrorException with the specified cause.
+     *
+     * @param cause the cause
+     */
+    public InternalErrorException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:12:27
+ */
+public class InvalidXidException extends Exception
+{
+    /**
+     * Constructs a newr InvalidXidException with a standard message
+     *
+     * @param xid The invalid xid.
+     */
+    public InvalidXidException(Xid xid)
+    {
+        super("The Xid: " + xid + " is invalid");
+    }
+
+    /**
+     * Constructs a newr InvalidXidException with a cause
+     *
+     * @param xid   The invalid xid.
+     * @param cause The casue for the xid to be invalid
+     */
+    public InvalidXidException(Xid xid, Throwable cause)
+    {
+        super("The Xid: " + xid + " is invalid", cause);
+    }
+
+    /**
+     * Constructs a newr InvalidXidException with a reason message
+     *
+     * @param reason The reason why the xid is invalid
+     * @param xid    The invalid xid.
+     */
+    public InvalidXidException(Xid xid, String reason)
+    {
+        super("The Xid: " + xid + " is invalid, The reason is: " + reason);
+    }
+
+    /**
+     * Constructs a newr InvalidXidException with a reason message and cause
+     *
+     * @param reason The reason why the xid is invalid
+     * @param xid    The invalid xid.
+     * @param cause  The casue for the xid to be invalid
+     */
+    public InvalidXidException(Xid xid, String reason, Throwable cause)
+    {
+        super("The Xid: " + xid + " is invalid, The reason is: " + reason, cause);
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 09:46:31
+ */
+public class MessageAlreadyStagedException extends Exception
+{
+    /**
+     * Constructs a new MessageAlreadyStagedException with the specified detail message.
+     *
+     * @param message the detail message.
+     */
+    public MessageAlreadyStagedException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new MessageAlreadyStagedException with the specified detail message and
+     * cause.
+     *
+     * @param message the detail message .
+     * @param cause   the cause.
+     */
+    public MessageAlreadyStagedException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new MessageAlreadyStagedException with the specified cause.
+     *
+     * @param cause the cause
+     */
+    public MessageAlreadyStagedException(Throwable cause)
+    {
+        super(cause);
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:52:29
+ */
+public class MessageDoesntExistException extends Exception
+{
+    /**
+     * Constructs a new MessageDoesntExistException with the specified detail message.
+     *
+     * @param message the detail message.
+     */
+    public MessageDoesntExistException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new MessageDoesntExistException with the specified detail message and
+     * cause.
+     *
+     * @param message the detail message .
+     * @param cause   the cause.
+     */
+    public MessageDoesntExistException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new MessageDoesntExistException with the specified cause.
+     *
+     * @param cause the cause
+     */
+    public MessageDoesntExistException(Throwable cause)
+    {
+        super(cause);
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 16:47:40
+ */
+public class NotPreparedException extends Exception
+{
+    /**
+     * Constructs a new NotPreparedException with the specified detail message.
+     *
+     * @param message the detail message.
+     */
+    public NotPreparedException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new NotPreparedException with the specified detail message and
+     * cause.
+     *
+     * @param message the detail message .
+     * @param cause   the cause.
+     */
+    public NotPreparedException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new NotPreparedException with the specified cause.
+     *
+     * @param cause the cause
+     */
+    public NotPreparedException(Throwable cause)
+    {
+        super(cause);
+    }
+}
\ No newline at end of file

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:49:00
+ */
+public class QueueAlreadyExistsException extends Exception
+{
+    /**
+     * Constructs a new QueueAlreadyExistsException with the specified detail message.
+     *
+     * @param message the detail message.
+     */
+    public QueueAlreadyExistsException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new QueueAlreadyExistsException with the specified detail message and
+     * cause.
+     *
+     * @param message the detail message .
+     * @param cause   the cause.
+     */
+    public QueueAlreadyExistsException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new QueueDoesntExistException with the specified cause.
+     *
+     * @param cause the cause
+     */
+    public QueueAlreadyExistsException(Throwable cause)
+    {
+        super(cause);
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:38:24
+ */
+public class QueueDoesntExistException extends Exception
+{
+    /**
+     * Constructs a new QueueDoesntExistException with the specified detail message.
+     *
+     * @param message the detail message.
+     */
+    public QueueDoesntExistException(String message)
+    {
+        super(message);
+    }
+
+    /**
+     * Constructs a new QueueDoesntExistException with the specified detail message and
+     * cause.
+     *
+     * @param message the detail message .
+     * @param cause   the cause.
+     */
+    public QueueDoesntExistException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new QueueDoesntExistException with the specified cause.
+     *
+     * @param cause the cause
+     */
+    public QueueDoesntExistException(Throwable cause)
+    {
+        super(cause);
+    }
+}
\ No newline at end of file

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java Wed May  2 09:49:03 2007
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:45:06
+ */
+public class UnknownXidException extends Exception
+{
+    /**
+     * Constructs a newr UnknownXidException with a standard message
+     *
+     * @param xid The unknown xid.
+     */
+    public UnknownXidException(Xid xid)
+    {
+        super("The Xid: " + xid + " is unknown");
+    }
+
+    /**
+     * Constructs a newr UnknownXidException with a cause
+     *
+     * @param xid   The unknown xid.
+     * @param cause The casue for the xid to be unknown
+     */
+    public UnknownXidException(Xid xid, Throwable cause)
+    {
+        super("The Xid: " + xid + " is unknown", cause);
+    }
+
+    /**
+     * Constructs a newr UnknownXidException with a reason message
+     *
+     * @param reason The reason why the xid is unknown
+     * @param xid    The unknown xid.
+     */
+    public UnknownXidException(Xid xid, String reason)
+    {
+        super("The Xid: " + xid + " is unknown, The reason is: " + reason);
+    }
+
+    /**
+     * Constructs a newr UnknownXidException with a reason message and cause
+     *
+     * @param reason The reason why the xid is unknown
+     * @param xid    The unknown xid.
+     * @param cause  The casue for the xid to be unknown
+     */
+    public UnknownXidException(Xid xid, String reason, Throwable cause)
+    {
+        super("The Xid: " + xid + " is unknown, The reason is: " + reason, cause);
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Wed May  2 09:49:03 2007
@@ -29,7 +29,8 @@
 import org.apache.qpid.server.protocol.ExchangeInitialiser;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.InternalErrorException;
 
 public class DefaultExchangeRegistry implements ExchangeRegistry
 {
@@ -65,7 +66,13 @@
         _exchangeMap.put(exchange.getName(), exchange);
         if(exchange.isDurable())
         {
-            getMessageStore().createExchange(exchange);
+            try
+            {
+                getMessageStore().createExchange(exchange);
+            } catch (InternalErrorException e)
+            {
+                throw new AMQException("problem registering excahgne " + exchange, e);
+            }
         }
     }
 
@@ -87,7 +94,13 @@
         {
             if(e.isDurable())
             {
-                getMessageStore().removeExchange(e);
+                try
+                {
+                    getMessageStore().removeExchange(e);
+                } catch (InternalErrorException e1)
+                {
+                    throw new AMQException("Problem unregistering Exchange " + name, e1);
+                }
             }
             e.close();
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Wed May  2 09:49:03 2007
@@ -49,7 +49,8 @@
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
 
-        final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
+        final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getTransactionManager(),
+                                                  virtualHost.getMessageStore(),
                                                   virtualHost.getExchangeRegistry());
         session.addChannel(channel);
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed May  2 09:49:03 2007
@@ -42,9 +42,11 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
 import org.apache.commons.configuration.Configuration;
 
 public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -103,7 +105,13 @@
                     queue = createQueue(body, virtualHost, session);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
-                        store.createQueue(queue);
+                        try
+                        {
+                            store.createQueue(queue);
+                        } catch (Exception e)
+                        {
+                           throw new AMQException("Problem when creating queue " + queue,  e);
+                        }
                     }
                     queueRegistry.registerQueue(queue);
                     if (autoRegister)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed May  2 09:49:03 2007
@@ -30,9 +30,11 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueDoesntExistException;
 
 public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
 {
@@ -107,7 +109,13 @@
 
                 if (queue.isDurable())
                 {
-                    store.removeQueue(queue.getName());
+                    try
+                    {
+                        store.destroyQueue(queue);
+                    } catch (Exception e)
+                    {
+                      throw new AMQException("problem when destroying queue " + queue, e);
+                    }
                 }
                 
                 // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java Wed May  2 09:49:03 2007
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 26-Apr-2007
+ * Time: 08:23:45
+ */
+public class MemoryMessageStore  implements MessageStore
+{
+
+    public void removeExchange(Exchange exchange)
+            throws
+            InternalErrorException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+          throws
+          InternalErrorException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void createExchange(Exchange exchange)
+            throws
+            InternalErrorException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+            throws
+            InternalErrorException,
+            IllegalArgumentException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void close()
+            throws
+            InternalErrorException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void createQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueAlreadyExistsException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void destroyQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void stage(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageAlreadyStagedException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public byte[] loadContent(StorableMessage m, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        return new byte[0];  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void destroy(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Collection<StorableQueue> getAllQueues()
+            throws
+            InternalErrorException
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+            throws
+            InternalErrorException
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getNewMessageId()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java Wed May  2 09:49:03 2007
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:34:02
+ */
+public interface MessageStore
+{
+    /**
+     * Create a new exchange
+     *
+     * @param exchange the exchange to be persisted
+     * @throws InternalErrorException If an error occurs
+     */
+    public void createExchange(Exchange exchange)
+            throws
+            InternalErrorException;
+
+     /**
+     * Remove an exchange
+     * @param exchange  The exchange to be removed
+     * @throws InternalErrorException If an error occurs
+     */
+     public void removeExchange(Exchange exchange) throws
+            InternalErrorException;
+
+    /**
+     * Bind a queue with an exchange given a routing key
+     *
+     * @param exchange   The exchange to bind the queue with
+     * @param routingKey The routing key
+     * @param queue      The queue to be bound
+     * @param args       Args
+     * @throws InternalErrorException If an error occurs
+     */
+    public void bindQueue(Exchange exchange,
+                          AMQShortString routingKey,
+                          StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException;
+
+      /**
+     * Unbind a queue from an exchange
+     *
+     * @param exchange   The exchange the queue was bound to
+     * @param routingKey The routing queue
+     * @param queue      The queue to unbind
+     * @param args       args
+     * @throws InternalErrorException If an error occurs
+     */
+    public void unbindQueue(Exchange exchange,
+                            AMQShortString routingKey,
+                            StorableQueue queue, FieldTable args)
+            throws
+            InternalErrorException;
+    
+    /**
+     * Called after instantiation in order to configure the message store. A particular implementation can define
+     * whatever parameters it wants.
+     *
+     * @param virtualHost The virtual host using by this store
+     * @param tm          The transaction manager implementation
+     * @param base        The base element identifier from which all configuration items are relative. For example, if the base
+     *                    element is "store", the all elements used by concrete classes will be "store.foo" etc.
+     * @param config      The apache commons configuration object
+     * @throws InternalErrorException   If an error occurs that means the store is unable to configure itself
+     * @throws IllegalArgumentException If the configuration arguments are illegal
+     */
+    void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+            throws
+            InternalErrorException,
+            IllegalArgumentException;
+
+    /**
+     * Called to close and cleanup any resources used by the message store.
+     *
+     * @throws InternalErrorException if close fails
+     */
+    void close()
+            throws
+            InternalErrorException;
+
+    /**
+     * Create a queue
+     *
+     * @param queue the queue to be created
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws QueueAlreadyExistsException If the queue already exists in the store
+     */
+    public void createQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueAlreadyExistsException;
+
+    /**
+     * Destroy a queue
+     *
+     * @param queue The queue to be destroyed
+     * @throws InternalErrorException    In case of internal message store problem
+     * @throws QueueDoesntExistException If the queue does not exist in the store
+     */
+    public void destroyQueue(StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException;
+
+    /**
+     * Stage the message before effective enqueue
+     *
+     * @param m The message to stage
+     * @throws InternalErrorException        In case of internal message store problem
+     * @throws MessageAlreadyStagedException If the message is already staged
+     */
+    public void stage(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageAlreadyStagedException;
+
+
+    /**
+     * Append more data with a previously staged message
+     *
+     * @param m      The message to which data must be appended
+     * @param data   Data to happen to the message
+     * @param offset The number of bytes from the beginning of the payload
+     * @param size   The number of bytes to be written
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message has not been staged
+     */
+    public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Get the content of previously staged or enqueued message.
+     * The message headers are also set.
+     *
+     * @param m      The message for which the content must be loaded
+     * @param offset The number of bytes from the beginning of the payload
+     * @param size   The number of bytes to be loaded
+     * @return The message content
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message does not exist
+     */
+    public byte[] loadContent(StorableMessage m, int offset, int size)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Destroy a previously staged message
+     *
+     * @param m the message to be destroyed
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws MessageDoesntExistException If the message does not exist in the store
+     */
+    public void destroy(StorableMessage m)
+            throws
+            InternalErrorException,
+            MessageDoesntExistException;
+
+    /**
+     * Enqueue a message under the scope of the transaction branch
+     * identified by xid when specified.
+     * <p> This operation is propagated to the queue and the message.
+     * <p> A message that has been previously staged is assumed to have had
+     * its payload already added (see appendContent)
+     *
+     * @param xid   The xid of the transaction branch under which the message must be enqueued.
+     *              <p> It he xid is null then the message is enqueued outside the scope of any transaction.
+     * @param m     The message to be enqueued
+     * @param queue The queue into which the message must be enqueued
+     * @throws InternalErrorException      In case of internal message store problem
+     * @throws QueueDoesntExistException   If the queue does not exist in the store
+     * @throws InvalidXidException         The transaction branch is invalid
+     * @throws UnknownXidException         The transaction branch is unknown
+     * @throws MessageDoesntExistException If the Message does not exist
+     */
+    public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException;
+
+    /**
+     * Dequeue a message under the scope of the transaction branch identified by xid
+     * if specified.
+     * <p> This operation is propagated to the queue and the message.
+     *
+     * @param xid   The xid of the transaction branch under which the message must be dequeued.
+     *              <p> It he xid is null then the message is dequeued outside the scope of any transaction.
+     * @param m     The message to be dequeued
+     * @param queue The queue from which the message must be dequeued
+     * @throws InternalErrorException    In case of internal message store problem
+     * @throws QueueDoesntExistException If the queue does not exist in the store
+     * @throws InvalidXidException       The transaction branch is invalid
+     * @throws UnknownXidException       The transaction branch is unknown
+     */
+    public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException;
+
+    //=========================================================
+    // Recovery specific methods
+    //=========================================================
+
+    /**
+     * List all the persistent queues
+     *
+     * @return All the persistent queues
+     * @throws InternalErrorException In case of internal message store problem
+     */
+    public Collection<StorableQueue> getAllQueues()
+            throws
+            InternalErrorException;
+
+    /**
+     * All enqueued messages of a given queue
+     *
+     * @param queue The queue where the message are retrieved from
+     * @return The list all enqueued messages of a given queue
+     * @throws InternalErrorException In case of internal message store problem
+     */
+    public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+            throws
+            InternalErrorException;
+
+    /**
+     * Get a new message ID
+     *
+     * @return A new message ID
+     */
+    public long getNewMessageId();
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java Wed May  2 09:49:03 2007
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+/**
+ * A storable message can be persisted in the message store.
+ *
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:56:48
+ */
+public interface StorableMessage
+{
+    /**
+     * The message ID is used by the store to identify a message.
+     *
+     * @return The message identifier
+     */
+    public long getMessageId();
+
+    /**
+     * Get the message header body that is saved when the message is staged.
+     *
+     * @return The message header body
+     */
+    public byte[] getHeaderBody();
+
+    /**
+     * Get the message header body size in bytes.
+     *
+     * @return The message header body size
+     */
+    public int getHeaderSize();
+
+    /**
+     * Get the message payload. This is required when the message is
+     * enqueued without any prior staging.
+     * <p> When the message is staged, the payload  can be partial or even empty.
+     *
+     * @return The message payload
+     */
+    public byte[] getData();
+
+    /**
+     * Get the message payload size in bytes.
+     *
+     * @return The message payload size in bytes
+     */
+    public int getPayloadSize();
+
+    /**
+     * Specify whether this message has been enqueued
+     * 
+     * @return true if this message is enqueued, false otherwise
+     */
+    public boolean isEnqueued();
+
+    /**
+     * This is called by the message store when this message is enqueued in the message store.
+     *
+     * @param queue The storable queue into which the message is enqueued
+     */
+    public void enqueue(StorableQueue queue);
+
+    /**
+     *  This is called by the message store when this message is dequeued.
+     *
+     * @param queue The storable queue out of which the message is dequeued
+     */
+    public void dequeue(StorableQueue queue);
+
+    /**
+     * A message can be enqueued in several queues.
+     * The queue position represents the index of the provided queue within the ordered
+     * list of queues the message has been enqueued.
+     * <p>For example:
+     * <p> If the message is successively enqueued in queue Q1, Q2 and Q3 then
+     * the position of Q1 is 0, position of Q2 is 1 and position of Q3 is 2.
+     * <p> If the message is dequeud form Q2 then position of Q1 is stil 0 but position
+     * of Q3 becomes 1.
+     *
+     * @param queue The storable queue for which the position should be determined
+     *
+     * @return The position of the specified storable queue
+     */
+    public int getQueuePosition(StorableQueue queue);
+
+    /**
+     * Indicates whether this message has been staged.
+     * 
+     * @return True if the message has been staged, false otherwise
+     */
+    public boolean isStaged();
+
+    /**
+     * Call by the message store when this message is staged. 
+     */
+    public void staged();
+
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java Wed May  2 09:49:03 2007
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * A storable queue can store storable messages and can be persisted in the store.
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:52:18
+ */
+public interface StorableQueue
+{
+    /**
+     * Get This queue unique id.
+     *
+     * @return The queue ID
+     */
+    public int getQueueID();
+
+    /**
+     * Set the queue ID.
+     *
+     * @param id This queue ID
+     */
+    public void setQueueID(int id);
+
+    /**
+     * Get this queue owner.
+     *
+     * @return This queue owner
+     */
+    public AMQShortString getOwner();
+
+    /**
+     * Get this queue name.
+     *
+     * @return the name of this queue
+     */
+    public AMQShortString getName();
+
+    /**
+     * Signifies to this queue that a message is dequeued.
+     * This operation is called by the store.
+     *
+     * @param m The dequeued message
+     */
+    public void dequeue(StorableMessage m);
+
+    /**
+     * Signifies to this queue that a message is enqueued.
+     * This operation is called by the store.
+     *
+     * @param m The enqueued message
+     */
+    public void enqueue(StorableMessage m);
+        
+}