You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC

svn commit: r827724 [3/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apach...

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Tue Oct 20 16:23:01 2009
@@ -27,27 +27,34 @@
 package org.apache.qpid.server.output.amqp0_8;
 
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
 
-import java.util.Iterator;
+import java.nio.ByteBuffer;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
 
-    private final MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+    private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
+            METHOD_REGISTRY.getProtocolVersionMethodConverter();
 
     public static Factory getInstanceFactory()
     {
         return new Factory()
         {
-    
+
             public ProtocolOutputConverter newInstance(AMQProtocolSession session)
             {
                 return new ProtocolOutputConverterImpl(session);
@@ -71,67 +78,44 @@
     public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        AMQMessage message = (AMQMessage) entry.getMessage();
         AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
-
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-        final int bodyCount = messageHandle.getBodyCount();
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+    }
 
-        if(bodyCount == 0)
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+            throws AMQException
+    {
+        if(entry.getMessage() instanceof AMQMessage)
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
-
-            writeFrame(compositeBlock);
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
         }
         else
         {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-            }
-
-
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize(); 
+            return chb;
         }
-
-
     }
 
 
     public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
+        AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+    }
 
-        final AMQMessage message = (AMQMessage) entry.getMessage();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+            throws AMQException
+    {
 
-        AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
 
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
 
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
 
-        final int bodyCount = messageHandle.getBodyCount();
-        if(bodyCount == 0)
+        final int bodySize = (int) message.getSize();
+        if(bodySize == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
                                                                              contentHeader);
@@ -139,48 +123,63 @@
         }
         else
         {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
 
+            final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+            ByteBuffer buf = ByteBuffer.allocate(capacity);
 
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(0);
+            int writtenSize = 0;
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            writtenSize += message.getContent(buf, writtenSize);
+            buf.flip();
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
             writeFrame(compositeBlock);
 
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
+            while(writtenSize < bodySize)
             {
-                cb = messageHandle.getContentChunk(i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                buf = java.nio.ByteBuffer.allocate(capacity);
+                writtenSize += message.getContent(buf, writtenSize);
+                buf.flip();
+                writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
             }
 
-
         }
-
-
     }
 
 
     private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        final AMQMessage message = (AMQMessage) entry.getMessage();
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
 
 
         BasicDeliverBody deliverBody =
-                _methodRegistry.createBasicDeliverBody(consumerTag,
+                METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
                                                       deliveryTag,
-                                                      entry.isRedelivered(),
-                                                      pb.getExchange(),
-                                                      pb.getRoutingKey());
+                                                      isRedelivered,
+                                                      exchangeName,
+                                                      routingKey);
+
         AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
 
@@ -190,14 +189,31 @@
     private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
             throws AMQException
     {
-        final AMQMessage message = (AMQMessage) entry.getMessage();
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
 
         BasicGetOkBody getOkBody =
-                _methodRegistry.createBasicGetOkBody(deliveryTag,
-                                                    entry.isRedelivered(),
-                                                    pb.getExchange(),
-                                                    pb.getRoutingKey(),
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
                                                     queueSize);
         AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
@@ -217,7 +233,7 @@
     private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
         BasicReturnBody basicReturnBody =
-                _methodRegistry.createBasicReturnBody(replyCode,
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
                                                      replyText,
                                                      messagePublishInfo.getExchange(),
                                                      messagePublishInfo.getRoutingKey());
@@ -228,7 +244,7 @@
 
     public void writeReturn(MessagePublishInfo messagePublishInfo,
                             ContentHeaderBody header,
-                            Iterator<AMQDataBlock> bodyFrameIterator,
+                            MessageContentSource content,
                             int channelId,
                             int replyCode,
                             AMQShortString replyText)
@@ -237,36 +253,8 @@
 
         AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
 
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      header);
+        writeMessageDelivery(content, header, channelId, returnFrame);
 
-
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
-            writeFrame(compositeBlock);
-        }
-
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
-        {
-            writeFrame(bodyFrameIterator.next());
-        }
     }
 
 
@@ -278,7 +266,7 @@
 
     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
     {
-        BasicCancelOkBody basicCancelOkBody = _methodRegistry.createBasicCancelOkBody(consumerTag);
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
         writeFrame(basicCancelOkBody.generateFrame(channelId));
 
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Tue Oct 20 16:23:01 2009
@@ -1,6 +1,6 @@
 package org.apache.qpid.server.output.amqp0_9;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,39 +8,41 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
 import org.apache.mina.common.ByteBuffer;
 
-import java.util.Iterator;
-
 import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
     private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-    private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+    private static final ProtocolVersionMethodConverter
+            PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
 
 
     public static Factory getInstanceFactory()
@@ -71,16 +73,43 @@
     public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        AMQMessage message = (AMQMessage) entry.getMessage();
-        AMQBody deliverBody = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
-        final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+        writeMessageDelivery(entry, channelId, deliverBody);
+    }
+
+
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+            throws AMQException
+    {
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+        }
+        else
+        {
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize();
+            return chb;
+        }
+    }
+
+
+    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+    }
 
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
 
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
 
-        final int bodyCount = messageHandle.getBodyCount();
+        int bodySize = (int) message.getSize();
 
-        if(bodyCount == 0)
+        if(bodySize == 0)
         {
             SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
                                                                              contentHeaderBody);
@@ -89,36 +118,37 @@
         }
         else
         {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
 
+            final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+            java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
 
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(0);
+            int writtenSize = 0;
 
-            AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
 
-            CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+            writtenSize += message.getContent(buf, writtenSize);
+            buf.flip();
+            AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+
+            CompositeAMQBodyBlock
+                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
             writeFrame(compositeBlock);
 
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
+            while(writtenSize < bodySize)
             {
-                cb = messageHandle.getContentChunk(i);
-                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
-            }
-
+                buf = java.nio.ByteBuffer.allocate(capacity);
 
+                writtenSize += message.getContent(buf, writtenSize);
+                buf.flip();
+                writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+            }
         }
-        
     }
 
     private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
     {
-        
+
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       contentHeaderBody);
         return contentHeader;
@@ -127,63 +157,36 @@
 
     public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
-        final AMQMessage message = (AMQMessage) entry.getMessage();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+        writeMessageDelivery(entry, channelId, deliver);
+    }
 
-        AMQFrame deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
 
+    private AMQBody createEncodedDeliverBody(QueueEntry entry,
+                                              final long deliveryTag,
+                                              final AMQShortString consumerTag)
+            throws AMQException
+    {
 
-        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
 
-        final int bodyCount = messageHandle.getBodyCount();
-        if(bodyCount == 0)
+        if(entry.getMessage() instanceof AMQMessage)
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
-            writeFrame(compositeBlock);
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
         }
         else
         {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(i);
-                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
-            }
-
-
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
         }
 
-
-    }
-
-
-    private AMQBody createEncodedDeliverFrame(QueueEntry entry, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
-            throws AMQException
-    {
-        final AMQMessage message = (AMQMessage) entry.getMessage();
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
         final boolean isRedelivered = entry.isRedelivered();
-        final AMQShortString exchangeName = pb.getExchange();
-        final AMQShortString routingKey = pb.getRoutingKey();
 
         final AMQBody returnBlock = new AMQBody()
         {
@@ -236,21 +239,37 @@
         return returnBlock;
     }
 
-    private AMQFrame createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
             throws AMQException
     {
-        final AMQMessage message = (AMQMessage) entry.getMessage();
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
 
         BasicGetOkBody getOkBody =
                 METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
-                                                    entry.isRedelivered(),
-                                                    pb.getExchange(),
-                                                    pb.getRoutingKey(),
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
                                                     queueSize);
-        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
-        return getOkFrame;
+        return getOkBody;
     }
 
     public byte getProtocolMinorVersion()
@@ -263,7 +282,9 @@
         return getProtocolSession().getProtocolMajorVersion();
     }
 
-    private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+                                             int replyCode,
+                                             AMQShortString replyText) throws AMQException
     {
 
         BasicReturnBody basicReturnBody =
@@ -271,47 +292,18 @@
                                                      replyText,
                                                      messagePublishInfo.getExchange(),
                                                      messagePublishInfo.getRoutingKey());
-        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
 
-        return returnFrame;
+
+        return basicReturnBody;
     }
 
-    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, Iterator<AMQDataBlock> bodyFrameIterator, int channelId, int replyCode, AMQShortString replyText)
+    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
             throws AMQException
     {
 
-        AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
-
-        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, header);
+        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
 
-
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
-            writeFrame(compositeBlock);
-        }
-
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
-        {
-            writeFrame(bodyFrameIterator.next());
-        }
+        writeMessageDelivery(message, header, channelId, returnFrame);
     }
 
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Oct 20 16:23:01 2009
@@ -83,8 +83,8 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Sender;
 
@@ -139,7 +139,7 @@
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
-    
+
     // Create a simple ID that increments for ever new Session
     private final long _sessionID = idGenerator.getAndIncrement();
 
@@ -152,11 +152,12 @@
 
     private long _writtenBytes;
     private long _readBytes;
-    
+
     private Job _readJob;
     private Job _writeJob;
 
     private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+    private long _maxFrameSize;
 
     public ManagedObject getManagedObject()
     {
@@ -167,7 +168,7 @@
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _networkDriver = driver;
-        
+
         _codecFactory = new AMQCodecFactory(true, this);
         _poolReference.acquireExecutorService();
         _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
@@ -178,17 +179,9 @@
 
     }
 
-    private AMQProtocolSessionMBean createMBean() throws AMQException
+    private AMQProtocolSessionMBean createMBean() throws JMException
     {
-        try
-        {
-            return new AMQProtocolSessionMBean(this);
-        }
-        catch (JMException ex)
-        {
-            _logger.error("AMQProtocolSession MBean creation has failed ", ex);
-            throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
-        }
+        return new AMQProtocolSessionMBean(this);
     }
 
     public long getSessionID()
@@ -200,7 +193,17 @@
     {
         return _actor;
     }
-    
+
+    public void setMaxFrameSize(long frameMax)
+    {
+        _maxFrameSize = frameMax;
+    }
+
+    public long getMaxFrameSize()
+    {
+        return _maxFrameSize;
+    }
+
     public void received(final ByteBuffer msg)
     {
         _lastIoTime = System.currentTimeMillis();
@@ -290,7 +293,7 @@
                 else
                 {
                     // The channel has been told to close, we don't process any more frames until
-                    // it's closed. 
+                    // it's closed.
                     return;
                 }
             }
@@ -545,7 +548,7 @@
     private void checkForNotification()
     {
         int channelsCount = _channelMap.size();
-        if (channelsCount >= _maxNoOfChannels)
+        if (_managedObject != null && channelsCount >= _maxNoOfChannels)
         {
             _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
         }
@@ -560,7 +563,7 @@
     {
         _maxNoOfChannels = value;
     }
-    
+
     public void commitTransactions(AMQChannel channel) throws AMQException
     {
         if ((channel != null) && channel.isTransactional())
@@ -576,7 +579,7 @@
             channel.rollback();
         }
     }
-    
+
     /**
      * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
      * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -697,7 +700,7 @@
             {
                 task.doTask(this);
             }
-            
+
             _closed = true;
             _poolReference.releaseExecutorService();
             CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
@@ -775,7 +778,7 @@
             throw new IllegalArgumentException("Unsupported socket address class: " + address);
         }
     }
-    
+
     public SaslServer getSaslServer()
     {
         return _saslServer;
@@ -865,8 +868,15 @@
 
         _virtualHost.getConnectionRegistry().registerConnection(this);
 
-        _managedObject = createMBean();
-        _managedObject.register();
+        try
+        {
+            _managedObject = createMBean();
+            _managedObject.register();
+        }
+        catch (JMException e)
+        {
+            _logger.error(e);
+        }
     }
 
     public void addSessionCloseTask(Task task)
@@ -878,7 +888,7 @@
     {
         _taskList.remove(task);
     }
-    
+
     public ProtocolOutputConverter getProtocolOutputConverter()
     {
         return _protocolOutputConverter;
@@ -905,7 +915,7 @@
     public SocketAddress getRemoteAddress()
     {
         return _networkDriver.getRemoteAddress();
-    }    
+    }
 
     public SocketAddress getLocalAddress()
     {
@@ -941,7 +951,7 @@
 
     public void setNetworkDriver(NetworkDriver driver)
     {
-        _networkDriver = driver;        
+        _networkDriver = driver;
     }
 
     public void writerIdle()
@@ -970,7 +980,7 @@
 
             MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
             ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
-                        
+
             writeFrame(closeBody.generateFrame(0));
 
             _networkDriver.close();
@@ -986,7 +996,7 @@
     {
         // Do nothing
     }
-    
+
     public long getReadBytes()
     {
         return _readBytes;
@@ -1006,7 +1016,7 @@
     {
         return _sessionIdentifier;
     }
-    
+
     public String getClientVersion()
     {
         return (_clientVersion == null) ? null : _clientVersion.toString();
@@ -1024,5 +1034,5 @@
             }
         }
     }
-    
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,8 +28,7 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.PrincipalHolder;
-import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -44,6 +43,10 @@
 
     LogActor getLogActor();
 
+    void setMaxFrameSize(long frameMax);
+
+    long getMaxFrameSize();
+
     public static final class ProtocolSessionIdentifier
     {
         private final Object _sessionIdentifier;
@@ -227,5 +230,5 @@
     List<AMQChannel> getChannels();
 
     void closeIfLingeringClosedChannels();
-    
+
 }

Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java Tue Oct 20 16:23:01 2009
@@ -18,22 +18,20 @@
  * under the License.
  *
  */
-package org.apache.qpid.server;
+package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.protocol.ProtocolEngineFactory;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.protocol.ProtocolEngine_0_10;
 
 public class ProtocolEngineFactory_0_10 implements ProtocolEngineFactory
 {
     private ConnectionDelegate _delegate;
 
-    public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
-
     public ProtocolEngineFactory_0_10(ConnectionDelegate delegate)
     {
         _delegate = delegate;
@@ -41,10 +39,8 @@
 
     public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
     {
-        Connection conn = new Connection();
+        Connection conn = new ServerConnection();
         conn.setConnectionDelegate(_delegate);
-        Disassembler dis = new Disassembler(networkDriver, MAX_FRAME_SIZE);
-        conn.setSender(dis);
-        return new ProtocolEngine_0_10(conn, networkDriver);  //To change body of implemented methods use File | Settings | File Templates.
+        return new ProtocolEngine_0_10(conn, networkDriver);
     }
 }

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Tue Oct 20 16:23:01 2009
@@ -18,31 +18,38 @@
  * under the License.
  *
  */
-package org.apache.qpid.server;
+package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
 
 import java.net.SocketAddress;
 
 public class ProtocolEngine_0_10  extends InputHandler implements ProtocolEngine
 {
+    public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
     private NetworkDriver _networkDriver;
     private long _readBytes;
     private long _writtenBytes;
+    private Connection _connection;
 
     public ProtocolEngine_0_10(Connection conn, NetworkDriver networkDriver)
     {
         super(new Assembler(conn));
+        _connection = conn;
         _networkDriver = networkDriver;
     }
 
     public void setNetworkDriver(NetworkDriver driver)
     {
         _networkDriver = driver;
+        Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
+        _connection.setSender(dis);
     }
 
     public SocketAddress getRemoteAddress()

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Tue Oct 20 16:23:01 2009
@@ -24,7 +24,6 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.AMQException;
 
 public class AMQPriorityQueue extends SimpleAMQQueue
 {
@@ -34,7 +33,6 @@
                                final boolean autoDelete,
                                final VirtualHost virtualHost,
                                int priorities)
-            throws AMQException
     {
         super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
     }
@@ -43,7 +41,7 @@
                             boolean durable,
                             String owner,
                             boolean autoDelete,
-                            VirtualHost virtualHost, int priorities) throws AMQException
+                            VirtualHost virtualHost, int priorities)
     {
         this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
     }
@@ -70,7 +68,7 @@
                     QueueEntry released = context._releasedEntry;
                     while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
                     {
-                        if(_releasedUpdater.compareAndSet(context,released,entry))
+                        if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
                         {
                             break;
                         }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Oct 20 16:23:01 2009
@@ -22,16 +22,16 @@
 
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.PrincipalHolder;
-import org.apache.qpid.server.ExchangeReferrer;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
@@ -40,7 +40,7 @@
 import java.util.Set;
 import java.util.Map;
 
-public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer
+public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
 {
 
 
@@ -51,6 +51,8 @@
 
     AMQShortString getName();
 
+    void setNoLocal(boolean b);
+
     boolean isDurable();
 
     boolean isAutoDelete();
@@ -114,7 +116,7 @@
 
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
-    
+
 
     void addQueueDeleteTask(final Task task);
 
@@ -128,11 +130,11 @@
     List<Long> getMessagesOnTheQueue(int num, int offest);
 
     QueueEntry getMessageOnTheQueue(long messageId);
-    
+
     /**
      * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
-     * 
-     * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. 
+     *
+     * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
      * Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
      * @param fromPosition
      * @param toPosition
@@ -142,9 +144,9 @@
 
 
     void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-                                                        StoreContext storeContext);
+                                                        ServerTransaction transaction);
 
-    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext);
+    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction);
 
     void removeMessagesFromQueue(long fromMessageId, long toMessageId);
 
@@ -265,6 +267,6 @@
     }
 
     void configure(QueueConfiguration config);
-    
+
     ManagedObject getManagedObject();
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Tue Oct 20 16:23:01 2009
@@ -27,7 +27,6 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.HashMap;
 
 
 public class AMQQueueFactory
@@ -130,7 +129,6 @@
                                               AMQShortString owner,
                                               boolean autoDelete,
                                               VirtualHost virtualHost, final FieldTable arguments)
-            throws AMQException
     {
         final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Oct 20 16:23:01 2009
@@ -22,21 +22,19 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 
 import javax.management.JMException;
 import javax.management.MBeanNotificationInfo;
@@ -125,7 +123,7 @@
         _msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
         _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
         _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
-        _msgContentType = new CompositeType("Message Content", "AMQ Message Content", 
+        _msgContentType = new CompositeType("Message Content", "AMQ Message Content",
                     VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, VIEW_MSG_CONTENT_COMPOSITE_ITEM_DESCRIPTIONS,
                     _msgContentAttributeTypes);
 
@@ -135,9 +133,9 @@
         _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
         _msgAttributeTypes[4] = SimpleType.LONG; // For queue position
 
-        _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES, 
+        _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES,
                                 VIEW_MSGS_COMPOSITE_ITEM_DESCRIPTIONS, _msgAttributeTypes);
-        _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, 
+        _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
                                                 VIEW_MSGS_TABULAR_UNIQUE_INDEX);
     }
 
@@ -299,7 +297,7 @@
 
     /**
      * Clears the queue of non-acquired messages
-     * 
+     *
      * @return the number of messages deleted
      * @see AMQQueue#clearQueue
      */
@@ -321,60 +319,40 @@
         }
 
         ServerMessage serverMsg = entry.getMessage();
-
-        if(serverMsg instanceof AMQMessage)
-        {
-            AMQMessage msg = (AMQMessage) serverMsg;
-            // get message content
-            Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
-            List<Byte> msgContent = new ArrayList<Byte>();
-            while (cBodies.hasNext())
-            {
-                ContentChunk body = cBodies.next();
-                if (body.getSize() != 0)
-                {
-                    if (body.getSize() != 0)
-                    {
-                        ByteBuffer slice = body.getData().slice();
-                        for (int j = 0; j < slice.limit(); j++)
-                        {
-                            msgContent.add(slice.get());
-                        }
-                    }
-                }
-            }
+        final int bodySize = (int) serverMsg.getSize();
 
 
-            try
-            {
-                // Create header attributes list
-                CommonContentHeaderProperties headerProperties =
-                    (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
-                String mimeType = null, encoding = null;
-                if (headerProperties != null)
-                {
-                    AMQShortString mimeTypeShortSting = headerProperties.getContentType();
-                    mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
-                    encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
-                }
+        List<Byte> msgContent = new ArrayList<Byte>();
 
-                Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+        java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize);
+        int position = 0;
 
-                return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
-            }
-            catch (AMQException e)
+        while(position < bodySize)
+        {
+            position += serverMsg.getContent(buf, position);
+            buf.flip();
+            for(int i = 0; i < buf.limit(); i++)
             {
-                JMException jme = new JMException("Error creating header attributes list: " + e);
-                jme.initCause(e);
-                throw jme;
+                msgContent.add(buf.get(i));
             }
-
+            buf.clear();
         }
-        else
+
+        AMQMessageHeader header = serverMsg.getMessageHeader();
+
+        String mimeType = null, encoding = null;
+        if (header != null)
         {
-            // TODO 0-10 Messages for MBean
-            return null;
+            mimeType = header.getMimeType();
+
+            encoding = header.getEncoding();
         }
+
+
+        Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+        return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+
     }
 
     /**
@@ -386,8 +364,8 @@
     {
         return viewMessages((long)beginIndex,(long)endIndex);
     }
-    
-    
+
+
     /**
      * Returns the header contents of the messages stored in this queue in tabular form.
      * @param startPosition The queue position of the first message to be viewed
@@ -400,7 +378,7 @@
             throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition
                 + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
         }
-        
+
         if ((endPosition - startPosition) > Integer.MAX_VALUE)
         {
             throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size");
@@ -489,7 +467,9 @@
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, new StoreContext());
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+        txn.commit();
     }
 
     /**
@@ -507,7 +487,7 @@
 
         _queue.removeMessagesFromQueue(fromMessageId, toMessageId);
     }
-    
+
     /**
      * @see ManagedQueue#copyMessages
      * @param fromMessageId
@@ -522,9 +502,15 @@
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, new StoreContext());
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+
+        txn.commit();
+
+
     }
-    
+
     /**
      * returns Notifications sent by this MBean.
      */

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,12 +44,12 @@
         return _virtualHost;
     }
 
-    public void registerQueue(AMQQueue queue) throws AMQException
+    public void registerQueue(AMQQueue queue)
     {
         _queueMap.put(queue.getName(), queue);
     }
 
-    public void unregisterQueue(AMQShortString name) throws AMQException
+    public void unregisterQueue(AMQShortString name)
     {
         _queueMap.remove(name);
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Tue Oct 20 16:23:01 2009
@@ -25,21 +25,22 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.ContentHeaderBodyAdapter;
 import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.nio.ByteBuffer;
 
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, BodyContentHolder
+public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
 {
 
     /** Used for debugging purposes. */
@@ -51,9 +52,6 @@
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
 
-    private AMQMessageHandle _messageHandle;
-    private final Long _messageId;
-
 
     /**
      * Keeps a track of how many bytes we have received in body frames
@@ -70,25 +68,29 @@
     private long _expiration;
 
     private Exchange _exchange;
-    private AMQMessageHeader _messageHeader;
 
 
     private int _receivedChunkCount = 0;
     private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
 
+    // we keep both the original meta data object and the store reference to it just in case the
+    // store would otherwise flow it to disk
+
+    private MessageMetaData _messageMetaData;
+
+    private StoredMessage<MessageMetaData> _storedMessageHandle;
+
 
-    public IncomingMessage(final Long messageId,
-                           final MessagePublishInfo info,
-                           final AMQProtocolSession publisher)
+    public IncomingMessage(
+            final MessagePublishInfo info
+    )
     {
-        _messageId = messageId;
         _messagePublishInfo = info;
     }
 
     public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
     {
         _contentHeaderBody = contentHeaderBody;
-        _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody);
     }
 
     public void setExpiration()
@@ -122,11 +124,10 @@
 
     }
 
-    public MessageMetaData routingComplete(final MessageStore store,
-                                final MessageHandleFactory factory) throws AMQException
+    public MessageMetaData headersReceived()
     {
-        _messageHandle = factory.createMessageHandle(_messageId, store, isPersistent());
-        return _messageHandle.setPublishAndContentHeaderBody(_messagePublishInfo, _contentHeaderBody);
+        _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+        return _messageMetaData;
     }
 
 
@@ -135,20 +136,15 @@
         return _destinationQueues;
     }
 
-
-    public AMQMessageHandle getMessageHandle()
-    {
-        return _messageHandle;
-    }
-
-
     public int addContentBodyFrame(final ContentChunk contentChunk)
             throws AMQException
     {
-
+        _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
-        _messageHandle.addContentBodyFrame(contentChunk, allContentReceived());
+
+
+
         return _receivedChunkCount++;
     }
 
@@ -192,7 +188,7 @@
 
     public AMQMessageHeader getMessageHeader()
     {
-        return _messageHeader;
+        return _messageMetaData.getMessageHeader();
     }
 
     public boolean isPersistent()
@@ -207,6 +203,7 @@
         return false;
     }
 
+
     public long getSize()
     {
         return getContentHeader().bodySize;
@@ -214,7 +211,7 @@
 
     public Long getMessageNumber()
     {
-        return _messageId;
+        return _storedMessageHandle.getMessageNumber();
     }
 
     public void setExchange(final Exchange e)
@@ -258,4 +255,46 @@
     {
         return _contentChunks.get(index);
     }
+
+
+    public int getContent(ByteBuffer buf, int offset)
+    {
+        int pos = 0;
+        int written = 0;
+        for(ContentChunk cb : _contentChunks)
+        {
+            ByteBuffer data = cb.getData().buf();
+            if(offset+written >= pos && offset < pos + data.limit())
+            {
+                ByteBuffer src = data.duplicate();
+                src.position(offset+written - pos);
+                src = src.slice();
+
+                if(buf.remaining() < src.limit())
+                {
+                    src.limit(buf.remaining());
+                }
+                int count = src.limit();
+                buf.put(src);
+                written += count;
+                if(buf.remaining() == 0)
+                {
+                    break;
+                }
+            }
+            pos+=data.limit();
+        }
+        return written;
+
+    }
+
+    public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
+    {
+        _storedMessageHandle = storedMessageHandle;
+    }
+
+    public StoredMessage<MessageMetaData> getStoredMessage()
+    {
+        return _storedMessageHandle;
+    }
 }

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+final class QueueContext implements AMQQueue.Context
+{
+    volatile QueueEntry _lastSeenEntry;
+    volatile QueueEntry _releasedEntry;
+
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+            _lastSeenUpdater =
+        AtomicReferenceFieldUpdater.newUpdater
+        (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+            _releasedUpdater =
+        AtomicReferenceFieldUpdater.newUpdater
+        (QueueContext.class, QueueEntry.class, "_releasedEntry");
+
+    public QueueContext(QueueEntry head)
+    {
+        _lastSeenEntry = head;
+    }
+
+    public QueueEntry getLastSeenEntry()
+    {
+        return _lastSeenEntry;
+    }
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Oct 20 16:23:01 2009
@@ -180,7 +180,7 @@
 
     boolean immediateAndNotDelivered();
 
-    void setRedelivered(boolean b);
+    void setRedelivered();
 
     boolean isRedelivered();
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Oct 20 16:23:01 2009
@@ -27,12 +27,10 @@
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.log4j.Logger;
 
-import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -80,8 +78,11 @@
     private volatile long _entryId;
 
     volatile QueueEntryImpl _next;
-    private boolean _deliveredToConsumer;
-    private boolean _redelivered;
+
+    private static final int DELIVERED_TO_CONSUMER = 1;
+    private static final int REDELIVERED = 2;
+
+    private volatile int _deliveryState;
 
 
     QueueEntryImpl(SimpleQueueEntryList queueEntryList)
@@ -94,6 +95,7 @@
     public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
     {
         _queueEntryList = queueEntryList;
+
         _message = message == null ? null : message.newReference();
 
         _entryIdUpdater.set(this, entryId);
@@ -132,7 +134,7 @@
 
     public boolean getDeliveredToConsumer()
     {
-        return _deliveredToConsumer;
+        return (_deliveryState & DELIVERED_TO_CONSUMER) != 0;
     }
 
     public boolean expired() throws AMQException
@@ -194,9 +196,9 @@
     public boolean acquire(Subscription sub)
     {
         final boolean acquired = acquire(sub.getOwningState());
-        if(acquired && !_deliveredToConsumer)
+        if(acquired)
         {
-            _deliveredToConsumer = true;
+            _deliveryState |= DELIVERED_TO_CONSUMER;
         }
         return acquired;
     }
@@ -259,9 +261,9 @@
 
     }
 
-    public boolean immediateAndNotDelivered() 
+    public boolean immediateAndNotDelivered()
     {
-        return !_deliveredToConsumer && isImmediate();
+        return !getDeliveredToConsumer() && isImmediate();
     }
 
     private boolean isImmediate()
@@ -270,9 +272,9 @@
         return message != null && message.isImmediate();
     }
 
-    public void setRedelivered(boolean b)
+    public void setRedelivered()
     {
-        _redelivered = b;
+        _deliveryState |= REDELIVERED;
     }
 
     public AMQMessageHeader getMessageHeader()
@@ -289,7 +291,7 @@
 
     public boolean isRedelivered()
     {
-        return _redelivered;
+        return (_deliveryState & REDELIVERED) != 0;
     }
 
     public Subscription getDeliveredSubscription()
@@ -329,7 +331,7 @@
     }
 
     public boolean isRejectedBy(Subscription subscription)
-    {        
+    {
 
         if (_rejectedBy != null) // We have subscriptions that rejected this message
         {
@@ -410,9 +412,9 @@
                 final ServerMessage message = getMessage();
                 if(rerouteQueues != null && rerouteQueues.size() != 0)
                 {
-                    Transaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+                    ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
 
-                    txn.enqueue(rerouteQueues, message, new Transaction.Action() {
+                    txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
                         public void postCommit()
                         {
                             try
@@ -434,7 +436,7 @@
                         }
                     });
                     txn.dequeue(currentQueue,message,
-                                new Transaction.Action()
+                                new ServerTransaction.Action()
                                 {
                                     public void postCommit()
                                     {
@@ -523,7 +525,7 @@
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
-            _queueEntryList.advanceHead();            
+            _queueEntryList.advanceHead();
             return true;
         }
         else

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,9 +30,9 @@
 {
     VirtualHost getVirtualHost();
 
-    void registerQueue(AMQQueue queue) throws AMQException;
+    void registerQueue(AMQQueue queue);
 
-    void unregisterQueue(AMQShortString name) throws AMQException;
+    void unregisterQueue(AMQShortString name);
 
     AMQQueue getQueue(AMQShortString name);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org