You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/02/13 19:10:58 UTC

svn commit: r627552 - in /incubator/qpid/branches/M2.1/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/se...

Author: rgodfrey
Date: Wed Feb 13 10:10:53 2008
New Revision: 627552

URL: http://svn.apache.org/viewvc?rev=627552&view=rev
Log:
QPID-790 : Performance Improvements

Added:
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java
Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Feb 13 10:10:53 2008
@@ -35,6 +35,7 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.exchange.MessageRouter;
 import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.store.MessageStore;
@@ -199,11 +200,12 @@
         _prefetch_HighWaterMark = prefetchCount;
     }
 
-    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
+    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException
     {
 
         _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
         _currentMessage.setPublisher(publisher);
+        _currentMessage.setExchange(e);
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
@@ -285,7 +287,7 @@
     {
         try
         {
-            _exchanges.routeContent(_currentMessage);
+            _currentMessage.route();            
         }
         catch (NoRouteException e)
         {

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Wed Feb 13 10:10:53 2008
@@ -239,7 +239,7 @@
     {
         MessagePublishInfo info = payload.getMessagePublishInfo();
 
-        final AMQShortString routingKey = normalize(info.getRoutingKey());
+        final AMQShortString routingKey = info.getRoutingKey();
 
         List<AMQQueue> queues = getMatchedQueues(routingKey);
         // if we have no registered queues we have nothing to do

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Wed Feb 13 10:10:53 2008
@@ -91,7 +91,8 @@
             }
 
             MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
-            channel.setPublishFrame(info, session);
+            info.setExchange(exchange);
+            channel.setPublishFrame(info, session, e);
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Wed Feb 13 10:10:53 2008
@@ -72,7 +72,7 @@
     public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+        AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       message.getContentHeaderBody());
 
@@ -127,7 +127,7 @@
         final StoreContext storeContext = message.getStoreContext();
         final long messageId = message.getMessageId();
 
-        ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+        AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
 
 
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -171,7 +171,7 @@
     }
 
 
-    private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+    private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
         final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -187,10 +187,10 @@
         AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
 
-        return deliverFrame.toByteBuffer();
+        return deliverFrame;
     }
 
-    private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+    private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
             throws AMQException
     {
         final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -205,7 +205,7 @@
                                                     queueSize);
         AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
-        return getOkFrame.toByteBuffer();
+        return getOkFrame;
     }
 
     public byte getProtocolMinorVersion()
@@ -218,7 +218,7 @@
         return getProtocolSession().getProtocolMajorVersion();
     }
 
-    private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
         MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
         BasicReturnBody basicReturnBody =
@@ -228,13 +228,13 @@
                                                      message.getMessagePublishInfo().getRoutingKey());
         AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
 
-        return returnFrame.toByteBuffer();
+        return returnFrame;
     }
 
     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
             throws AMQException
     {
-        ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
 
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       message.getContentHeaderBody());

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Wed Feb 13 10:10:53 2008
@@ -16,6 +16,7 @@
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
 
 
     public static Factory getInstanceFactory()
@@ -46,9 +47,10 @@
     public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+        AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+        final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
+                                                                      contentHeaderBody);
 
         final AMQMessageHandle messageHandle = message.getMessageHandle();
         final StoreContext storeContext = message.getStoreContext();
@@ -101,7 +103,7 @@
         final StoreContext storeContext = message.getStoreContext();
         final long messageId = message.getMessageId();
 
-        ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+        AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
 
 
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -145,41 +147,54 @@
     }
 
 
-    private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+    private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
             throws AMQException
     {
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         final AMQMessageHandle messageHandle = message.getMessageHandle();
 
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-        BasicDeliverBody deliverBody =
-                methodRegistry.createBasicDeliverBody(consumerTag,
-                                                      deliveryTag,
-                                                      messageHandle.isRedelivered(),
-                                                      pb.getExchange(),
-                                                      pb.getRoutingKey());
-        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
+        final boolean isRedelivered = messageHandle.isRedelivered();
+        final AMQShortString exchangeName = pb.getExchange();
+        final AMQShortString routingKey = pb.getRoutingKey();
+
+        final AMQDataBlock returnBlock = new DeferredDataBlock()
+        {
+
+            protected AMQDataBlock createAMQDataBlock()
+            {
+                BasicDeliverBody deliverBody =
+                        METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                                                              deliveryTag,
+                                                              isRedelivered,
+                                                              exchangeName,
+                                                              routingKey);
+                AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+
+                return deliverFrame;
 
-        return deliverFrame.toByteBuffer();
+            }
+        };
+        return returnBlock;
     }
 
-    private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+    private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
             throws AMQException
     {
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         final AMQMessageHandle messageHandle = message.getMessageHandle();
 
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
         BasicGetOkBody getOkBody =
-                methodRegistry.createBasicGetOkBody(deliveryTag,
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
                                                     messageHandle.isRedelivered(),
                                                     pb.getExchange(),
                                                     pb.getRoutingKey(),
                                                     queueSize);
         AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
-        return getOkFrame.toByteBuffer();
+        return getOkFrame;
     }
 
     public byte getProtocolMinorVersion()
@@ -192,23 +207,23 @@
         return getProtocolSession().getProtocolMajorVersion();
     }
 
-    private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
         BasicReturnBody basicReturnBody =
-                methodRegistry.createBasicReturnBody(replyCode,
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
                                                      replyText,
                                                      message.getMessagePublishInfo().getExchange(),
                                                      message.getMessagePublishInfo().getRoutingKey());
         AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
 
-        return returnFrame.toByteBuffer();
+        return returnFrame;
     }
 
     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
             throws AMQException
     {
-        ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
 
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       message.getContentHeaderBody());
@@ -252,8 +267,8 @@
 
     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
     {
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-        BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
         writeFrame(basicCancelOkBody.generateFrame(channelId));
 
     }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Feb 13 10:10:53 2008
@@ -35,6 +35,7 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.exchange.Exchange;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,6 +85,10 @@
 
     private final int hashcode = System.identityHashCode(this);
 
+    private Exchange _exchange;
+    private static final boolean SYNCED_CLOCKS =
+            ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);
+
 
     public String debugIdentity()
     {
@@ -97,7 +102,7 @@
         long timestamp =
                 ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
 
-        if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+        if (SYNCED_CLOCKS)
         {
             _expiration = expiration;
         }
@@ -124,6 +129,16 @@
     public boolean isReferenced()
     {
         return _referenceCount.get() > 0;
+    }
+
+    public void setExchange(final Exchange exchange)
+    {
+        _exchange = exchange;
+    }
+
+    public void route() throws AMQException
+    {
+        _exchange.route(this);
     }
 
     /**

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Wed Feb 13 10:10:53 2008
@@ -589,6 +589,11 @@
             return null;
         }
 
+        public void setExchange(AMQShortString exchange)
+        {
+                        
+        }
+
         public boolean isImmediate()
         {
             return false;

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Wed Feb 13 10:10:53 2008
@@ -242,6 +242,11 @@
                 return null;
             }
 
+            public void setExchange(AMQShortString exchange)
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+
             public boolean isImmediate()
             {
                 return immediate;

Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Wed Feb 13 10:10:53 2008
@@ -234,6 +234,11 @@
                 return null;
             }
 
+            public void setExchange(AMQShortString exchange)
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+
             public boolean isImmediate()
             {
                 return immediate;

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Wed Feb 13 10:10:53 2008
@@ -24,7 +24,7 @@
 
 public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
 {
-    private ByteBuffer _encodedBlock;
+    private AMQDataBlock _firstFrame;
 
     private AMQDataBlock[] _blocks;
 
@@ -39,10 +39,10 @@
      * @param encodedBlock already-encoded data
      * @param blocks some blocks to be encoded.
      */
-    public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks)
+    public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
     {
         this(blocks);
-        _encodedBlock = encodedBlock;
+        _firstFrame = encodedBlock;
     }
 
     public AMQDataBlock[] getBlocks()
@@ -50,9 +50,9 @@
         return _blocks;
     }
 
-    public ByteBuffer getEncodedBlock()
+    public AMQDataBlock getFirstFrame()
     {
-        return _encodedBlock;
+        return _firstFrame;
     }
 
     public long getSize()
@@ -62,19 +62,18 @@
         {
             frameSize += _blocks[i].getSize();
         }
-        if (_encodedBlock != null)
+        if (_firstFrame != null)
         {
-            _encodedBlock.rewind();
-            frameSize += _encodedBlock.remaining();
+            frameSize += _firstFrame.getSize();
         }
         return frameSize;
     }
 
     public void writePayload(ByteBuffer buffer)
     {
-        if (_encodedBlock != null)
+        if (_firstFrame != null)
         {
-            buffer.put(_encodedBlock);
+            _firstFrame.writePayload(buffer);
         }
         for (int i = 0; i < _blocks.length; i++)
         {
@@ -91,7 +90,7 @@
         else
         {
             StringBuilder buf = new StringBuilder(this.getClass().getName());
-            buf.append("{encodedBlock=").append(_encodedBlock);
+            buf.append("{encodedBlock=").append(_firstFrame);
             for (int i = 0 ; i < _blocks.length; i++)
             {
                 buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");

Added: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java?rev=627552&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java (added)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java Wed Feb 13 10:10:53 2008
@@ -0,0 +1,50 @@
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public abstract class DeferredDataBlock extends AMQDataBlock
+{
+    private AMQDataBlock _underlyingDataBlock;
+
+
+    public long getSize()
+    {
+        if(_underlyingDataBlock == null)
+        {
+            _underlyingDataBlock = createAMQDataBlock();
+        }
+        return _underlyingDataBlock.getSize();
+    }
+
+    public void writePayload(ByteBuffer buffer)
+    {
+        if(_underlyingDataBlock == null)
+        {
+            _underlyingDataBlock = createAMQDataBlock();
+        }
+        _underlyingDataBlock.writePayload(buffer);
+    }
+
+    abstract protected AMQDataBlock createAMQDataBlock();
+
+}

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Wed Feb 13 10:10:53 2008
@@ -25,7 +25,7 @@
 
 public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
 {
-    private ByteBuffer _encodedBlock;
+    private AMQDataBlock _firstFrame;
 
     private AMQDataBlock _block;
 
@@ -40,10 +40,10 @@
      * @param encodedBlock already-encoded data
      * @param block a block to be encoded.
      */
-    public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+    public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block)
     {
         this(block);
-        _encodedBlock = encodedBlock;
+        _firstFrame = encodedBlock;
     }
 
     public AMQDataBlock getBlock()
@@ -51,28 +51,28 @@
         return _block;
     }
 
-    public ByteBuffer getEncodedBlock()
+    public AMQDataBlock getFirstFrame()
     {
-        return _encodedBlock;
+        return _firstFrame;
     }
 
     public long getSize()
     {
         long frameSize = _block.getSize();
 
-        if (_encodedBlock != null)
+        if (_firstFrame != null)
         {
-            _encodedBlock.rewind();
-            frameSize += _encodedBlock.remaining();
+
+            frameSize += _firstFrame.getSize();
         }
         return frameSize;
     }
 
     public void writePayload(ByteBuffer buffer)
     {
-        if (_encodedBlock != null)
+        if (_firstFrame != null)
         {
-            buffer.put(_encodedBlock);
+            _firstFrame.writePayload(buffer);
         }
         _block.writePayload(buffer);
 
@@ -87,7 +87,7 @@
         else
         {
             StringBuilder buf = new StringBuilder(this.getClass().getName());
-            buf.append("{encodedBlock=").append(_encodedBlock);
+            buf.append("{encodedBlock=").append(_firstFrame);
 
             buf.append(" _block=[").append(_block.toString()).append("]");
 

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java Wed Feb 13 10:10:53 2008
@@ -27,6 +27,8 @@
 
     public AMQShortString getExchange();
 
+    public void setExchange(AMQShortString exchange);
+
     public boolean isImmediate();
 
     public boolean isMandatory();

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Wed Feb 13 10:10:53 2008
@@ -67,7 +67,7 @@
         final AMQShortString exchange = publishBody.getExchange();
         final AMQShortString routingKey = publishBody.getRoutingKey();
 
-        return new MethodConverter_0_9.MessagePublishInfoImpl(exchange == null ? null : exchange.intern(),
+        return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
                                           publishBody.getImmediate(),
                                           publishBody.getMandatory(),
                                           routingKey == null ? null : routingKey.intern());
@@ -87,7 +87,7 @@
 
     private static class MessagePublishInfoImpl implements MessagePublishInfo
     {
-        private final AMQShortString _exchange;
+        private AMQShortString _exchange;
         private final boolean _immediate;
         private final boolean _mandatory;
         private final AMQShortString _routingKey;
@@ -106,6 +106,11 @@
         public AMQShortString getExchange()
         {
             return _exchange;
+        }
+
+        public void setExchange(AMQShortString exchange)
+        {
+            _exchange = exchange;
         }
 
         public boolean isImmediate()

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Wed Feb 13 10:10:53 2008
@@ -107,7 +107,7 @@
 
     private static class MessagePublishInfoImpl implements MessagePublishInfo
     {
-        private final AMQShortString _exchange;
+        private AMQShortString _exchange;
         private final boolean _immediate;
         private final boolean _mandatory;
         private final AMQShortString _routingKey;
@@ -126,6 +126,11 @@
         public AMQShortString getExchange()
         {
             return _exchange;
+        }
+
+        public void setExchange(AMQShortString exchange)
+        {
+            _exchange = exchange;
         }
 
         public boolean isImmediate()

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Wed Feb 13 10:10:53 2008
@@ -117,6 +117,11 @@
                         return null;
                     }
 
+                    public void setExchange(AMQShortString exchange)
+                    {
+                        //To change body of implemented methods use File | Settings | File Templates.
+                    }
+
                     public boolean isImmediate()
                     {
                         return false;

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Wed Feb 13 10:10:53 2008
@@ -107,6 +107,11 @@
                     return new AMQShortString("someExchange");
                 }
 
+                public void setExchange(AMQShortString exchange)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public boolean isImmediate()
                 {
                     return false;

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Wed Feb 13 10:10:53 2008
@@ -70,6 +70,11 @@
                 return null;
             }
 
+            public void setExchange(AMQShortString exchange)
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+
             public boolean isImmediate()
             {
                 return immediate;

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Wed Feb 13 10:10:53 2008
@@ -61,6 +61,11 @@
                 return null;
             }
 
+            public void setExchange(AMQShortString exchange)
+            {
+                //To change body of implemented methods use File | Settings | File Templates.
+            }
+
             public boolean isImmediate()
             {
                 return false;