You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/20 17:20:46 UTC

svn commit: r509628 [1/2] - in /incubator/qpid/trunk/qpid: gentools/templ.java/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/configuration/ java/broker/src/main/java/org/apache/qpid/server/exchange/...

Author: rgodfrey
Date: Tue Feb 20 08:20:41 2007
New Revision: 509628

URL: http://svn.apache.org/viewvc?view=rev&rev=509628
Log:
QPID-325 : Persist durable exchange information in the store
QPID-318 : Remove hardcoding of version numbers (as applies to store)

Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
Modified:
    incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
    incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java

Modified: incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl (original)
+++ incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl Tue Feb 20 08:20:41 2007
@@ -48,6 +48,8 @@
     static
     {
 %{CLIST}	${reg_map_put_method}
+
+        configure();
     }
     
     public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
@@ -126,5 +128,26 @@
         
     }
 
+	
+    private static void configure()
+    {
+        for(int i = 0 ; i < _specificRegistries.length; i++)
+        {
+            VersionSpecificRegistry[] registries = _specificRegistries[i];
+            if(registries != null)
+            {
+                for(int j = 0 ; j < registries.length; j++)
+                {
+                    VersionSpecificRegistry registry = registries[j];
+                    
+                    if(registry != null)
+                    {
+                        registry.configure();
+                    }
+                }
+            }
+        }
+        
+    }
     
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 20 08:20:41 2007
@@ -33,17 +33,16 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.exchange.MessageRouter;
 import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -202,9 +201,11 @@
     }
 
 
-    public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
+    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
     {
-        _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody,
+
+
+        _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
                                          _txnContext);
         // TODO: used in clustering only I think (RG)
         _currentMessage.setPublisher(publisher);
@@ -252,7 +253,7 @@
 
             // returns true iff the message was delivered (i.e. if all data was
             // received
-            if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
+            if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody)))
             {
                 // callback to allow the context to do any post message processing
                 // primary use is to allow message return processing in the non-tx case

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Tue Feb 20 08:20:41 2007
@@ -198,11 +198,17 @@
                 for(Object routingKeyNameObj : routingKeys)
                 {
                     AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
-                    exchange.registerQueue(routingKey, queue, null);
+                    
+
+                    queue.bind(routingKey, null, exchange);
 
-                    queue.bind(routingKey, exchange);
 
                     _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
+                }
+
+                if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
+                {
+                    queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());                    
                 }
             }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Tue Feb 20 08:20:41 2007
@@ -28,6 +28,8 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.protocol.ExchangeInitialiser;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
 
 public class DefaultExchangeRegistry implements ExchangeRegistry
 {
@@ -39,23 +41,32 @@
     private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
 
     private Exchange _defaultExchange;
+    private VirtualHost _host;
 
-    public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
+    public DefaultExchangeRegistry(VirtualHost host)
     {
         //create 'standard' exchanges:
-        try
-        {
-            new ExchangeInitialiser().initialise(exchangeFactory, this);
-        }
-        catch(AMQException e)
-        {
-            _log.error("Failed to initialise exchanges: ", e);
-        }
+        _host = host;
+
     }
 
-    public void registerExchange(Exchange exchange)
+    public void initialise() throws AMQException
+    {
+        new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
+    }
+
+    public MessageStore getMessageStore()
+    {
+        return _host.getMessageStore();
+    }
+
+    public void registerExchange(Exchange exchange) throws AMQException
     {
         _exchangeMap.put(exchange.getName(), exchange);
+        if(exchange.isDurable())
+        {
+            getMessageStore().createExchange(exchange);
+        }
     }
 
     public void setDefaultExchange(Exchange exchange)
@@ -74,6 +85,10 @@
         Exchange e = _exchangeMap.remove(name);
         if (e != null)
         {
+            if(e.isDurable())
+            {
+                getMessageStore().removeExchange(e);
+            }
             e.close();
         }
         else
@@ -102,7 +117,7 @@
      */
     public void routeContent(AMQMessage payload) throws AMQException
     {
-        final AMQShortString exchange = payload.getPublishBody().exchange;
+        final AMQShortString exchange = payload.getMessagePublishInfo().getExchange();
         final Exchange exch = getExchange(exchange);
         // there is a small window of opportunity for the exchange to be deleted in between
         // the BasicPublish being received (where the exchange is validated) and the final

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Tue Feb 20 08:20:41 2007
@@ -43,6 +43,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -126,8 +127,7 @@
 
             try
             {
-                registerQueue(new AMQShortString(binding), queue, null);
-                queue.bind(new AMQShortString(binding), DestNameExchange.this);
+                queue.bind(new AMQShortString(binding), null, DestNameExchange.this);
             }
             catch (AMQException ex)
             {
@@ -170,7 +170,7 @@
         }
     }
 
-    public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
         assert routingKey != null;
@@ -184,13 +184,13 @@
 
     public void route(AMQMessage payload) throws AMQException
     {
-        final BasicPublishBody publishBody = payload.getPublishBody();
-        final AMQShortString routingKey = publishBody.routingKey;
+        final MessagePublishInfo info = payload.getMessagePublishInfo();
+        final AMQShortString routingKey = info.getRoutingKey();
         final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
         if (queues == null || queues.isEmpty())
         {
             String msg = "Routing key " + routingKey + " is not known to " + this;
-            if (publishBody.mandatory)
+            if (info.isMandatory())
             {
                 throw new NoRouteException(msg, payload);
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Tue Feb 20 08:20:41 2007
@@ -45,6 +45,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -125,8 +126,7 @@
 
             try
             {
-                registerQueue(new AMQShortString(binding), queue, null);
-                queue.bind(new AMQShortString(binding), DestWildExchange.this);
+                queue.bind(new AMQShortString(binding), null, DestWildExchange.this);
             }
             catch (AMQException ex)
             {
@@ -168,9 +168,9 @@
 
     public void route(AMQMessage payload) throws AMQException
     {
-        BasicPublishBody publishBody = payload.getPublishBody();
+        MessagePublishInfo info = payload.getMessagePublishInfo();
 
-        final AMQShortString routingKey = publishBody.routingKey;
+        final AMQShortString routingKey = info.getRoutingKey();
         List<AMQQueue> queues = _routingKey2queues.get(routingKey);
         // if we have no registered queues we have nothing to do
         // TODO: add support for the immediate flag
@@ -221,7 +221,7 @@
         return !_routingKey2queues.isEmpty();
     }
 
-    public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
         assert routingKey != null;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Tue Feb 20 08:20:41 2007
@@ -47,7 +47,7 @@
 
     void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
 
-    void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+    void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
 
     void route(AMQMessage message) throws AMQException;
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Tue Feb 20 08:20:41 2007
@@ -26,7 +26,7 @@
 
 public interface ExchangeRegistry extends MessageRouter
 {
-    void registerExchange(Exchange exchange);
+    void registerExchange(Exchange exchange) throws AMQException;
 
     /**
      * Unregister an exchange
@@ -42,4 +42,6 @@
     void setDefaultExchange(Exchange exchange);
 
     Exchange getDefaultExchange();
+
+    void initialise() throws AMQException;
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Feb 20 08:20:41 2007
@@ -19,8 +19,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -98,9 +98,8 @@
             }
 
             try
-            {
-                registerQueue(new AMQShortString(binding), queue, null);
-                queue.bind(new AMQShortString(binding), FanoutExchange.this);
+            {                
+                queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
             }
             catch (AMQException ex)
             {
@@ -144,10 +143,10 @@
         }
     }
 
-    public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         assert queue != null;
-        assert routingKey != null;
+
 
         if (!_queues.remove(queue))
         {
@@ -158,12 +157,12 @@
 
     public void route(AMQMessage payload) throws AMQException
     {
-        final BasicPublishBody publishBody = payload.getPublishBody();
-        final AMQShortString routingKey = publishBody.routingKey;
+        final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
+        final AMQShortString routingKey = publishInfo.getRoutingKey();
         if (_queues == null || _queues.isEmpty())
         {
             String msg = "No queues bound to " + this;
-            if (publishBody.mandatory)
+            if (publishInfo.isMandatory())
             {
                 throw new NoRouteException(msg, payload);
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Feb 20 08:20:41 2007
@@ -200,10 +200,10 @@
         _bindings.add(new Registration(new HeadersBinding(args), queue));
     }
 
-    public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+    public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
-        _bindings.remove(new Registration(null, queue));
+        _bindings.remove(new Registration(new HeadersBinding(args), queue));
     }
 
     public void route(AMQMessage payload) throws AMQException
@@ -232,7 +232,7 @@
 
             String msg = "Exchange " + getName() + ": message not routable.";
 
-            if (payload.getPublishBody().mandatory)
+            if (payload.getMessagePublishInfo().isMandatory())
             {
                 throw new NoRouteException(msg, payload);
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Feb 20 08:20:41 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -85,7 +86,8 @@
                 throw body.getChannelNotFoundException(evt.getChannelId());
             }
 
-            channel.setPublishFrame(body, session);
+            MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
+            channel.setPublishFrame(info, session);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Feb 20 08:20:41 2007
@@ -98,8 +98,8 @@
             throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
         }
         try
-        {
-            exch.registerQueue(body.routingKey, queue, body.arguments);
+        {            
+            queue.bind(body.routingKey, body.arguments, exch);
         }
         catch (AMQInvalidRoutingKeyException rke)
         {
@@ -109,7 +109,7 @@
         {
             throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
         }
-        queue.bind(body.routingKey, exch);
+
         if (_log.isInfoEnabled())
         {
             _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Feb 20 08:20:41 2007
@@ -108,8 +108,8 @@
                     if (autoRegister)
                     {
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
-                        defaultExchange.registerQueue(body.queue, queue, null);
-                        queue.bind(body.queue, defaultExchange);
+
+                        queue.bind(body.queue, null, defaultExchange);
                         _log.info("Queue " + body.queue + " bound to default exchange");
                     }
                 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java Tue Feb 20 08:20:41 2007
@@ -41,6 +41,9 @@
     private void define(ExchangeRegistry r, ExchangeFactory f,
                         AMQShortString name, AMQShortString type) throws AMQException
     {
-        r.registerExchange(f.createExchange(name, type, true, false, 0));
+        if(r.getExchange(name)== null)
+        {
+            r.registerExchange(f.createExchange(name, type, true, false, 0));
+        }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Feb 20 08:20:41 2007
@@ -27,20 +27,13 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicGetOkBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.SmallCompositeAMQDataBlock;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.txn.TransactionalContext;
 
 /**
@@ -98,10 +91,12 @@
         private int _channel;
 
         private int _index = -1;
+        private AMQProtocolSession _protocolSession;
 
-        private BodyFrameIterator(int channel)
+        private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
         {
             _channel = channel;
+            _protocolSession = protocolSession;
         }
 
         public boolean hasNext()
@@ -121,8 +116,9 @@
         {
             try
             {
-                ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
-                return ContentBody.createAMQFrame(_channel, cb);
+
+                AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+                return new AMQFrame(_channel, cb);
             }
             catch (AMQException e)
             {
@@ -132,6 +128,11 @@
 
         }
 
+        private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+        {
+            return _protocolSession.getRegistry().getProtocolVersionMethodConverter();
+        }
+
         public void remove()
         {
             throw new UnsupportedOperationException();
@@ -143,7 +144,7 @@
         return _txnContext.getStoreContext();
     }
 
-    private class BodyContentIterator implements Iterator<ContentBody>
+    private class BodyContentIterator implements Iterator<ContentChunk>
     {
 
         private int _index = -1;
@@ -161,11 +162,11 @@
             }
         }
 
-        public ContentBody next()
+        public ContentChunk next()
         {
             try
             {
-                return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
+                return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
             }
             catch (AMQException e)
             {
@@ -179,13 +180,13 @@
         }
     }
 
-    public AMQMessage(Long messageId, BasicPublishBody publishBody,
+    public AMQMessage(Long messageId, MessagePublishInfo info,
                       TransactionalContext txnContext)
     {
         _messageId = messageId;
         _txnContext = txnContext;
-        _immediate = publishBody.immediate;
-        _transientMessageData.setPublishBody(publishBody);
+        _immediate = info.isImmediate();
+        _transientMessageData.setMessagePublishInfo(info);
 
         _taken = new AtomicBoolean(false);
         if (_log.isDebugEnabled())
@@ -215,14 +216,14 @@
      * Used in testing only. This allows the passing of the content header immediately
      * on construction.
      * @param messageId
-     * @param publishBody
+     * @param info
      * @param txnContext
      * @param contentHeader
      */
-    public AMQMessage(Long messageId, BasicPublishBody publishBody,
+    public AMQMessage(Long messageId, MessagePublishInfo info,
                       TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
     {
-        this(messageId, publishBody, txnContext);
+        this(messageId, info, txnContext);
         setContentHeaderBody(contentHeader);
     }
 
@@ -230,23 +231,23 @@
      * Used in testing only. This allows the passing of the content header and some body fragments on
      * construction.
      * @param messageId
-     * @param publishBody
+     * @param info
      * @param txnContext
      * @param contentHeader
      * @param destinationQueues
      * @param contentBodies
      * @throws AMQException
      */
-    public AMQMessage(Long messageId, BasicPublishBody publishBody,
+    public AMQMessage(Long messageId, MessagePublishInfo info,
                       TransactionalContext txnContext,
                       ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
-                      List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext,
+                      List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
                       MessageHandleFactory messageHandleFactory) throws AMQException
     {
-        this(messageId, publishBody, txnContext, contentHeader);
+        this(messageId, info, txnContext, contentHeader);
         _transientMessageData.setDestinationQueues(destinationQueues);
         routingComplete(messageStore, storeContext, messageHandleFactory);
-        for (ContentBody cb : contentBodies)
+        for (ContentChunk cb : contentBodies)
         {
             addContentBodyFrame(storeContext, cb);
         }
@@ -261,12 +262,12 @@
         _transientMessageData = msg._transientMessageData;
     }
 
-    public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
+    public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
     {
-        return new BodyFrameIterator(channel);
+        return new BodyFrameIterator(protocolSession, channel);
     }
 
-    public Iterator<ContentBody> getContentBodyIterator()
+    public Iterator<ContentChunk> getContentBodyIterator()
     {
         return new BodyContentIterator();
     }
@@ -311,11 +312,11 @@
         }
     }
 
-    public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException
+    public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
     {
-        _transientMessageData.addBodyLength(contentBody.getSize());
+        _transientMessageData.addBodyLength(contentChunk.getSize());
         final boolean allContentReceived = isAllContentReceived();
-        _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody, allContentReceived);
+        _messageHandle.addContentBodyFrame(storeContext, _messageId, contentChunk, allContentReceived);
         if (allContentReceived)
         {
             deliver(storeContext);
@@ -502,16 +503,16 @@
         }        
     }
 
-    public BasicPublishBody getPublishBody() throws AMQException
+    public MessagePublishInfo getMessagePublishInfo() throws AMQException
     {
-        BasicPublishBody pb;
+        MessagePublishInfo pb;
         if (_transientMessageData != null)
         {
-            pb = _transientMessageData.getPublishBody();
+            pb = _transientMessageData.getMessagePublishInfo();
         }
         else
         {
-            pb = _messageHandle.getPublishBody(getStoreContext(),_messageId);
+            pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
         }
         return pb;
     }
@@ -554,7 +555,7 @@
         {
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
-            _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(),
+            _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
                                                           _transientMessageData.getContentHeaderBody());
 
             // we then allow the transactional context to do something with the message content
@@ -598,9 +599,9 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
+            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
 
-            AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
             protocolSession.writeFrame(compositeBlock);
@@ -610,8 +611,8 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
-                protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+                cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+                protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
@@ -641,9 +642,9 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
+            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
 
-            AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
             protocolSession.writeFrame(compositeBlock);
@@ -653,8 +654,8 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
-                protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+                cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+                protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
@@ -667,10 +668,10 @@
     private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        BasicPublishBody pb = getPublishBody();
+        MessagePublishInfo pb = getMessagePublishInfo();
         AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
-                                                                deliveryTag, pb.exchange, _messageHandle.isRedelivered(),
-                                                                pb.routingKey);
+                                                                deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+                                                                pb.getRoutingKey());
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
         deliverFrame.writePayload(buf);
         buf.flip();
@@ -680,14 +681,14 @@
     private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
             throws AMQException
     {
-        BasicPublishBody pb = getPublishBody();
+        MessagePublishInfo pb = getMessagePublishInfo();
         AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
                                                             protocolSession.getProtocolMajorVersion(),
                                                             protocolSession.getProtocolMinorVersion(),
-                                                                deliveryTag, pb.exchange,
+                                                                deliveryTag, pb.getExchange(),
                                                                 queueSize,
                                                                 _messageHandle.isRedelivered(),
-                                                                pb.routingKey);
+                                                                pb.getRoutingKey());
         ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
         getOkFrame.writePayload(buf);
         buf.flip();
@@ -699,9 +700,9 @@
         AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
                                                               protocolSession.getProtocolMajorVersion(),
                                                               protocolSession.getProtocolMinorVersion(), 
-                                                              getPublishBody().exchange,
+                                                              getMessagePublishInfo().getExchange(),
                                                               replyCode, replyText,
-                                                              getPublishBody().routingKey);
+                                                              getMessagePublishInfo().getRoutingKey());
         ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
         returnFrame.writePayload(buf);
         buf.flip();
@@ -716,7 +717,7 @@
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       getContentHeaderBody());
 
-        Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+        Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
         //
         // Optimise the case where we have a single content body. In that case we create a composite block
         // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
@@ -767,7 +768,7 @@
     public void restoreTransientMessageData() throws AMQException
     {
         TransientMessageData transientMessageData = new TransientMessageData();
-        transientMessageData.setPublishBody(getPublishBody());
+        transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
         transientMessageData.setContentHeaderBody(getContentHeaderBody());
         transientMessageData.addBodyLength(getContentHeaderBody().getSize());
         _transientMessageData = transientMessageData; 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Tue Feb 20 08:20:41 2007
@@ -21,10 +21,10 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 
 /**
  * A pluggable way of getting message data. Implementations can provide intelligent caching for example or
@@ -53,11 +53,11 @@
      * @return a content body
      * @throws IllegalArgumentException if the index is invalid
      */
-    ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
+    ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
 
-    void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
+    void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
 
-    BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException;
+    MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException;
 
     boolean isRedelivered();
 
@@ -65,7 +65,7 @@
 
     boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
 
-    void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+    void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
                                         ContentHeaderBody contentHeaderBody)
             throws AMQException;
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Feb 20 08:20:41 2007
@@ -441,9 +441,24 @@
         return _deliveryMgr.clearAllMessages(storeContext);
     }
 
-    public void bind(AMQShortString routingKey, Exchange exchange)
+    public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
     {
-        _bindings.addBinding(routingKey, exchange);
+        exchange.registerQueue(routingKey, this, arguments);        
+        if(isDurable() && exchange.isDurable())
+        {
+            _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments);
+        }
+        _bindings.addBinding(routingKey, arguments, exchange);
+    }
+
+    public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
+    {
+        exchange.deregisterQueue(routingKey, this, arguments);
+        if(isDurable() && exchange.isDurable())
+        {
+            _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments);
+        }
+        _bindings.remove(routingKey, arguments, exchange);
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Feb 20 08:20:41 2007
@@ -44,6 +44,7 @@
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
@@ -322,16 +323,16 @@
             throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
         }
         // get message content
-        Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
+        Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
         List<Byte> msgContent = new ArrayList<Byte>();
         while (cBodies.hasNext())
         {
-            ContentBody body = cBodies.next();
+            ContentChunk body = cBodies.next();
             if (body.getSize() != 0)
             {
                 if (body.getSize() != 0)
                 {
-                    ByteBuffer slice = body.payload.slice();
+                    ByteBuffer slice = body.getData().slice();
                     for (int j = 0; j < slice.limit(); j++)
                     {
                         msgContent.add(slice.get());

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb 20 08:20:41 2007
@@ -33,7 +33,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -114,11 +114,11 @@
         // Shrink the ContentBodies to their actual size to save memory.
         if (compressBufferOnQueue)
         {
-            Iterator<ContentBody> it = msg.getContentBodyIterator();
+            Iterator<ContentChunk> it = msg.getContentBodyIterator();
             while (it.hasNext())
             {
-                ContentBody cb = it.next();
-                cb.reduceBufferToFit();
+                ContentChunk cb = it.next();
+                cb.reduceToFit();
             }
         }
 
@@ -493,7 +493,7 @@
                 {
                     _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
                 }
-                if (!msg.getPublishBody().immediate)
+                if (!msg.getMessagePublishInfo().isImmediate())
                 {
                     addMessageToQueue(msg);
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Tue Feb 20 08:20:41 2007
@@ -26,6 +26,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 
 /**
@@ -35,42 +36,55 @@
  */
 class ExchangeBindings
 {
+    private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
+
     static class ExchangeBinding
     {
-        private final Exchange exchange;
-        private final AMQShortString routingKey;
+        private final Exchange _exchange;
+        private final AMQShortString _routingKey;
+        private final FieldTable _arguments;
 
         ExchangeBinding(AMQShortString routingKey, Exchange exchange)
         {
-            this.routingKey = routingKey;
-            this.exchange = exchange;
+            this(routingKey, exchange,EMPTY_ARGUMENTS);
+        }
+
+        ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
+        {
+            _routingKey = routingKey;
+            _exchange = exchange;
+            _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
         }
 
         void unbind(AMQQueue queue) throws AMQException
         {
-            exchange.deregisterQueue(routingKey, queue);
+            _exchange.deregisterQueue(_routingKey, queue, _arguments);
         }
 
         public Exchange getExchange()
         {
-            return exchange;
+            return _exchange;
         }
 
         public AMQShortString getRoutingKey()
         {
-            return routingKey;
+            return _routingKey;
         }
 
         public int hashCode()
         {
-            return (exchange == null ? 0 : exchange.hashCode()) + (routingKey == null ? 0 : routingKey.hashCode());
+            return (_exchange == null ? 0 : _exchange.hashCode())
+                   + (_routingKey == null ? 0 : _routingKey.hashCode())
+                   + (_arguments == null ? 0 : _arguments.hashCode());
         }
 
         public boolean equals(Object o)
         {
             if (!(o instanceof ExchangeBinding)) return false;
             ExchangeBinding eb = (ExchangeBinding) o;
-            return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+            return _exchange.equals(eb._exchange)
+                   && _routingKey.equals(eb._routingKey)
+                   && _arguments.equals(eb._arguments);
         }
     }
 
@@ -88,11 +102,18 @@
      * are being tracked by the instance has been bound to the exchange
      * @param exchange the exchange bound to
      */
-    void addBinding(AMQShortString routingKey, Exchange exchange)
+    void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
+    {
+        _bindings.add(new ExchangeBinding(routingKey, exchange, arguments ));
+    }
+
+
+    public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
     {
-        _bindings.add(new ExchangeBinding(routingKey, exchange));
+        _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments ));
     }
 
+    
     /**
      * Deregisters this queue from any exchange it has been bound to
      */

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Tue Feb 20 08:20:41 2007
@@ -25,9 +25,10 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.store.StoreContext;
 
 /**
@@ -37,9 +38,9 @@
 
     private ContentHeaderBody _contentHeaderBody;
 
-    private BasicPublishBody _publishBody;
+    private MessagePublishInfo _messagePublishInfo;
 
-    private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+    private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>();
 
     private boolean _redelivered;
 
@@ -64,7 +65,7 @@
         return getContentHeaderBody(context, messageId).bodySize;
     }
 
-    public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -74,15 +75,15 @@
         return _contentBodies.get(index);
     }
 
-    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody)
+    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
             throws AMQException
     {
         _contentBodies.add(contentBody);
     }
 
-    public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
+    public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
     {
-        return _publishBody;
+        return _messagePublishInfo;
     }
 
     public boolean isRedelivered()
@@ -106,15 +107,15 @@
 
     /**
      * This is called when all the content has been received.
-     * @param publishBody
+     * @param messagePublishInfo
      * @param contentHeaderBody
      * @throws AMQException
      */
-    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
                                                ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
-        _publishBody = publishBody;
+        _messagePublishInfo = messagePublishInfo;
         _contentHeaderBody = contentHeaderBody;
         _arrivalTime = System.currentTimeMillis();
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Tue Feb 20 08:20:41 2007
@@ -16,8 +16,8 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 
 /**
  * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
@@ -25,7 +25,7 @@
  */
 public class MessageMetaData
 {
-    private BasicPublishBody _publishBody;
+    private MessagePublishInfo _messagePublishInfo;
 
     private ContentHeaderBody _contentHeaderBody;
 
@@ -33,15 +33,15 @@
 
     private long _arrivalTime;
 
-    public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
     {
         this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
     }
 
-    public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+    public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
     {
         _contentHeaderBody = contentHeaderBody;
-        _publishBody = publishBody;
+        _messagePublishInfo = publishBody;
         _contentChunkCount = contentChunkCount;
         _arrivalTime = arrivalTime;
     }
@@ -66,14 +66,14 @@
         _contentHeaderBody = contentHeaderBody;
     }
 
-    public BasicPublishBody getPublishBody()
+    public MessagePublishInfo getMessagePublishInfo()
     {
-        return _publishBody;
+        return _messagePublishInfo;
     }
 
-    public void setPublishBody(BasicPublishBody publishBody)
+    public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
     {
-        _publishBody = publishBody;
+        _messagePublishInfo = messagePublishInfo;
     }
 
     public long getArrivalTime()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Tue Feb 20 08:20:41 2007
@@ -21,8 +21,8 @@
 import java.util.List;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 
 /**
@@ -40,7 +40,7 @@
      * Stored temporarily until the header has been received at which point it is used when
      * constructing the handle
      */
-    private BasicPublishBody _publishBody;
+    private MessagePublishInfo _messagePublishInfo;
 
     /**
      * Also stored temporarily.
@@ -59,14 +59,14 @@
      */
     private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
 
-    public BasicPublishBody getPublishBody()
+    public MessagePublishInfo getMessagePublishInfo()
     {
-        return _publishBody;
+        return _messagePublishInfo;
     }
 
-    public void setPublishBody(BasicPublishBody publishBody)
+    public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
     {
-        _publishBody = publishBody;
+        _messagePublishInfo = messagePublishInfo;
     }
 
     public List<AMQQueue> getDestinationQueues()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Tue Feb 20 08:20:41 2007
@@ -27,9 +27,9 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
@@ -40,9 +40,9 @@
 {
     private WeakReference<ContentHeaderBody> _contentHeaderBody;
 
-    private WeakReference<BasicPublishBody> _publishBody;
+    private WeakReference<MessagePublishInfo> _messagePublishInfo;
 
-    private List<WeakReference<ContentBody>> _contentBodies;
+    private List<WeakReference<ContentChunk>> _contentBodies;
 
     private boolean _redelivered;
 
@@ -79,7 +79,7 @@
     {
         _arrivalTime = mmd.getArrivalTime();
         _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
-        _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+        _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
     }
 
     public int getBodyCount(StoreContext context, Long messageId) throws AMQException
@@ -88,10 +88,10 @@
         {
             MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
             int chunkCount = mmd.getContentChunkCount();
-            _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
+            _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
             for (int i = 0; i < chunkCount; i++)
             {
-                _contentBodies.add(new WeakReference<ContentBody>(null));
+                _contentBodies.add(new WeakReference<ContentChunk>(null));
             }
         }
         return _contentBodies.size();
@@ -102,19 +102,19 @@
         return getContentHeaderBody(context, messageId).bodySize;
     }
 
-    public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
             throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
                                                (_contentBodies.size() - 1));
         }
-        WeakReference<ContentBody> wr = _contentBodies.get(index);
-        ContentBody cb = wr.get();
+        WeakReference<ContentChunk> wr = _contentBodies.get(index);
+        ContentChunk cb = wr.get();
         if (cb == null)
         {
             cb = _messageStore.getContentBodyChunk(context, messageId, index);
-            _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+            _contentBodies.set(index, new WeakReference<ContentChunk>(cb));
         }
         return cb;
     }
@@ -124,35 +124,36 @@
      *
      * @param storeContext
      * @param messageId
-     * @param contentBody
+     * @param contentChunk
      * @param isLastContentBody
      * @throws AMQException
      */
-    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException
+    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
     {
         if (_contentBodies == null && isLastContentBody)
         {
-            _contentBodies = new ArrayList<WeakReference<ContentBody>>(1);
+            _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1);
         }
         else
         {
             if (_contentBodies == null)
             {
-                _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+                _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
             }
         }
-        _contentBodies.add(new WeakReference<ContentBody>(contentBody));
-        _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
+        _contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
+        _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1,
+                                            contentChunk, isLastContentBody);
     }
 
-    public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
+    public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
     {
-        BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
+        MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
         if (bpb == null)
         {
             MessageMetaData mmd = loadMessageMetaData(context, messageId);
 
-            bpb = mmd.getPublishBody();
+            bpb = mmd.getMessagePublishInfo();
         }
         return bpb;
     }
@@ -182,7 +183,7 @@
      * @param contentHeaderBody
      * @throws AMQException
      */
-    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo publishBody,
                                                ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
@@ -190,7 +191,7 @@
         // create en empty list here
         if (contentHeaderBody.bodySize == 0)
         {
-            _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+            _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
         }
 
         final long arrivalTime = System.currentTimeMillis();

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentChunkAdapter
+{
+    public static ContentBody toConentBody(ContentChunk contentBodyChunk)
+    {
+        return new ContentBody(contentBodyChunk.getData());
+    }
+
+    public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
+    {
+        return new ContentChunk() {
+
+            public int getSize()
+            {
+                return contentBodyChunk.getSize();
+            }
+
+            public ByteBuffer getData()
+            {
+                return contentBodyChunk.payload;
+            }
+
+            public void reduceToFit()
+            {
+                contentBodyChunk.reduceBufferToFit();
+            }
+        };
+
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Tue Feb 20 08:20:41 2007
@@ -31,10 +31,12 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
 
 /**
  * A simple message store that stores the messages in a threadsafe structure in memory.
@@ -49,7 +51,7 @@
 
     protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
 
-    protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
+    protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
 
     private final AtomicLong _messageId = new AtomicLong(1);
 
@@ -57,7 +59,7 @@
     {
         _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
         _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
-        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
+        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
     }
 
     public void configure(String base, Configuration config)
@@ -65,7 +67,7 @@
         int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
         _log.info("Using capacity " + hashtableCapacity + " for hash tables");
         _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
-        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
+        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
     }
 
     public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
@@ -97,6 +99,26 @@
         _contentBodyMap.remove(messageId);
     }
 
+    public void createExchange(Exchange exchange) throws AMQException
+    {
+
+    }
+
+    public void removeExchange(Exchange exchange) throws AMQException
+    {
+
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+
+    }
+
     public void createQueue(AMQQueue queue) throws AMQException
     {
         // Not required to do anything
@@ -147,10 +169,10 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody)
+    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
             throws AMQException
     {
-        List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
 
         if(bodyList == null && lastContentBody)
         {
@@ -160,7 +182,7 @@
         {
             if (bodyList == null)
             {
-                bodyList = new ArrayList<ContentBody>();
+                bodyList = new ArrayList<ContentChunk>();
                 _contentBodyMap.put(messageId, bodyList);
             }
 
@@ -179,9 +201,9 @@
         return _metaDataMap.get(messageId);
     }
 
-    public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
     {
-        List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
         return bodyList.get(index);
     }
 }

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+public class MessagePublishInfoAdapter
+{
+    private final byte _majorVersion;
+    private final byte _minorVersion;
+    private final int _classId;
+    private final int _methodId;
+
+
+    public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
+    {
+        _majorVersion = majorVersion;
+        _minorVersion = minorVersion;
+        _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
+        _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
+    }
+
+    public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
+    {
+        return new BasicPublishBody(_majorVersion,
+                                    _minorVersion,
+                                    _classId,
+                                    _methodId,
+                                    pubInfo.getExchange(),
+                                    pubInfo.isImmediate(),
+                                    pubInfo.isMandatory(),
+                                    pubInfo.getRoutingKey(),
+                                    0) ; // ticket
+    }
+
+    public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
+    {
+        return new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return body.getExchange();
+            }
+
+            public boolean isImmediate()
+            {
+                return body.getImmediate();
+            }
+
+            public boolean isMandatory()
+            {
+                return body.getMandatory();
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return body.getRoutingKey();
+            }
+        };
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Tue Feb 20 08:20:41 2007
@@ -20,15 +20,15 @@
  */
 package org.apache.qpid.server.store;
 
-import java.util.List;
-
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
 
 public interface MessageStore
 {
@@ -51,6 +51,15 @@
 
     void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
 
+    void createExchange(Exchange exchange) throws AMQException;
+
+    void removeExchange(Exchange exchange) throws AMQException;
+
+    void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+    void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+
     void createQueue(AMQQueue queue) throws AMQException;
 
     void removeQueue(AMQShortString name) throws AMQException;
@@ -68,24 +77,17 @@
     boolean inTran(StoreContext context);
 
     /**
-     * Recreate all queues that were persisted, including re-enqueuing of existing messages
-     * @return
-     * @throws AMQException
-     */
-    List<AMQQueue> createQueues() throws AMQException;
-
-    /**
      * Return a valid, currently unused message id.
      * @return a message id
      */
     Long getNewMessageId();
 
-    void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException;
+    void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException;
 
     void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
 
     MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
 
-    ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+    ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
 
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Tue Feb 20 08:20:41 2007
@@ -99,9 +99,11 @@
 
         _queueRegistry = new DefaultQueueRegistry(this);
         _exchangeFactory = new DefaultExchangeFactory(this);
-        _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+        _exchangeRegistry = new DefaultExchangeRegistry(this);
 
         _messageStore = store;
+        
+        _exchangeRegistry.initialise();
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
@@ -117,9 +119,11 @@
         
         _queueRegistry = new DefaultQueueRegistry(this);
         _exchangeFactory = new DefaultExchangeFactory(this);
-        _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+        _exchangeRegistry = new DefaultExchangeRegistry(this);
 
         initialiseMessageStore(hostConfig);
+
+        _exchangeRegistry.initialise();
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();