You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [6/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker/...

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Thu Aug 14 20:40:49 2008
@@ -1,285 +1,284 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- *   8-0
- */
-package org.apache.qpid.server.output.amqp0_8;
-
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.Iterator;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-
-
-    public static Factory getInstanceFactory()
-    {
-        return new Factory()
-        {
-    
-            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
-            {
-                return new ProtocolOutputConverterImpl(session);
-            }
-        };
-    }
-
-    private final AMQProtocolSession _protocolSession;
-
-    private ProtocolOutputConverterImpl(AMQProtocolSession session)
-    {
-        _protocolSession = session;
-    }
-
-
-    public AMQProtocolSession getProtocolSession()
-    {
-        return _protocolSession;
-    }
-
-    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
-
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-        final Long messageId = message.getMessageId();
-
-        final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
-
-        if(bodyCount == 0)
-        {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
-
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(storeContext,messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-            }
-
-
-        }
-
-
-    }
-
-
-    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-        final long messageId = message.getMessageId();
-
-        AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
-
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
-
-        final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
-        if(bodyCount == 0)
-        {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(storeContext, messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-            }
-
-
-        }
-
-
-    }
-
-
-    private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-        BasicDeliverBody deliverBody =
-                methodRegistry.createBasicDeliverBody(consumerTag,
-                                                      deliveryTag,
-                                                      messageHandle.isRedelivered(),
-                                                      pb.getExchange(),
-                                                      pb.getRoutingKey());
-        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
-        return deliverFrame;
-    }
-
-    private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-        BasicGetOkBody getOkBody =
-                methodRegistry.createBasicGetOkBody(deliveryTag,
-                                                    messageHandle.isRedelivered(),
-                                                    pb.getExchange(),
-                                                    pb.getRoutingKey(),
-                                                    queueSize);
-        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
-        return getOkFrame;
-    }
-
-    public byte getProtocolMinorVersion()
-    {
-        return getProtocolSession().getProtocolMinorVersion();
-    }
-
-    public byte getProtocolMajorVersion()
-    {
-        return getProtocolSession().getProtocolMajorVersion();
-    }
-
-    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
-    {
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-        BasicReturnBody basicReturnBody =
-                methodRegistry.createBasicReturnBody(replyCode,
-                                                     replyText,
-                                                     message.getMessagePublishInfo().getExchange(),
-                                                     message.getMessagePublishInfo().getRoutingKey());
-        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
-        return returnFrame;
-    }
-
-    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
-
-        Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
-            writeFrame(compositeBlock);
-        }
-
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
-        {
-            writeFrame(bodyFrameIterator.next());
-        }
-    }
-
-
-    public void writeFrame(AMQDataBlock block)
-    {
-        getProtocolSession().writeFrame(block);
-    }
-
-
-    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
-    {
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-        BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
-        writeFrame(basicCancelOkBody.generateFrame(channelId));
-
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ *   8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+
+    public static Factory getInstanceFactory()
+    {
+        return new Factory()
+        {
+    
+            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+            {
+                return new ProtocolOutputConverterImpl(session);
+            }
+        };
+    }
+
+    private final AMQProtocolSession _protocolSession;
+
+    private ProtocolOutputConverterImpl(AMQProtocolSession session)
+    {
+        _protocolSession = session;
+    }
+
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      message.getContentHeaderBody());
+
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final StoreContext storeContext = message.getStoreContext();
+
+
+        final int bodyCount = messageHandle.getBodyCount(storeContext);
+
+        if(bodyCount == 0)
+        {
+            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                             contentHeader);
+
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+
+
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+            writeFrame(compositeBlock);
+
+            //
+            // Now start writing out the other content bodies
+            //
+            for(int i = 1; i < bodyCount; i++)
+            {
+                cb = messageHandle.getContentChunk(storeContext, i);
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+            }
+
+
+        }
+
+
+    }
+
+
+    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final StoreContext storeContext = message.getStoreContext();
+
+        AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      message.getContentHeaderBody());
+
+        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        if(bodyCount == 0)
+        {
+            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                             contentHeader);
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+
+
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+            writeFrame(compositeBlock);
+
+            //
+            // Now start writing out the other content bodies
+            //
+            for(int i = 1; i < bodyCount; i++)
+            {
+                cb = messageHandle.getContentChunk(storeContext, i);
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+            }
+
+
+        }
+
+
+    }
+
+
+    private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicDeliverBody deliverBody =
+                methodRegistry.createBasicDeliverBody(consumerTag,
+                                                      deliveryTag,
+                                                      messageHandle.isRedelivered(),
+                                                      pb.getExchange(),
+                                                      pb.getRoutingKey());
+        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+
+        return deliverFrame;
+    }
+
+    private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicGetOkBody getOkBody =
+                methodRegistry.createBasicGetOkBody(deliveryTag,
+                                                    messageHandle.isRedelivered(),
+                                                    pb.getExchange(),
+                                                    pb.getRoutingKey(),
+                                                    queueSize);
+        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
+
+        return getOkFrame;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return getProtocolSession().getProtocolMinorVersion();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return getProtocolSession().getProtocolMajorVersion();
+    }
+
+    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    {
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicReturnBody basicReturnBody =
+                methodRegistry.createBasicReturnBody(replyCode,
+                                                     replyText,
+                                                     message.getMessagePublishInfo().getExchange(),
+                                                     message.getMessagePublishInfo().getRoutingKey());
+        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+
+        return returnFrame;
+    }
+
+    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+            throws AMQException
+    {
+        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      message.getContentHeaderBody());
+
+        Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+        //
+        // Optimise the case where we have a single content body. In that case we create a composite block
+        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+        //
+        if (bodyFrameIterator.hasNext())
+        {
+            AMQDataBlock firstContentBody = bodyFrameIterator.next();
+            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
+
+            writeFrame(compositeBlock);
+        }
+
+        //
+        // Now start writing out the other content bodies
+        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+        //
+        while (bodyFrameIterator.hasNext())
+        {
+            writeFrame(bodyFrameIterator.next());
+        }
+    }
+
+
+    public void writeFrame(AMQDataBlock block)
+    {
+        getProtocolSession().writeFrame(block);
+    }
+
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Thu Aug 14 20:40:49 2008
@@ -1,397 +1,370 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.output.amqp0_9;
-
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.Iterator;
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-    private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
-
-    public static Factory getInstanceFactory()
-    {
-        return new Factory()
-        {
-
-            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
-            {
-                return new ProtocolOutputConverterImpl(session);
-            }
-        };
-    }
-
-    private final AMQProtocolSession _protocolSession;
-
-    private ProtocolOutputConverterImpl(AMQProtocolSession session)
-    {
-        _protocolSession = session;
-    }
-
-
-    public AMQProtocolSession getProtocolSession()
-    {
-        return _protocolSession;
-    }
-
-    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
-        final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
-
-
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-        final Long messageId = message.getMessageId();
-
-        final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
-
-        if(bodyCount == 0)
-        {
-            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
-                                                                             contentHeaderBody);
-
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
-            AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
-
-            CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(storeContext,messageId, i);
-                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
-            }
-
-
-        }
-
-
-    }
-
-    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
-    {
-        
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      contentHeaderBody);
-        return contentHeader;
-    }
-
-
-    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-        final long messageId = message.getMessageId();
-
-        AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
-
-
-        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
-
-        final int bodyCount = messageHandle.getBodyCount(storeContext,messageId);
-        if(bodyCount == 0)
-        {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(storeContext, messageId, i);
-                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
-            }
-
-
-        }
-
-
-    }
-
-
-    private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
-            throws AMQException
-    {
-
-
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
-        final AMQBody returnBlock = new AMQBody()
-        {
-
-
-
-            private final boolean _isRedelivered = messageHandle.isRedelivered();
-            private final AMQShortString _exchangeName = pb.getExchange();
-            private final AMQShortString _routingKey = pb.getRoutingKey();
-
-
-            public AMQBody _underlyingBody;
-
-            public AMQBody createAMQBody()
-            {
-                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
-                                                              deliveryTag,
-                                                              _isRedelivered,
-                                                              _exchangeName,
-                                                              _routingKey);
-
-
-
-
-
-            }
-
-            public byte getFrameType()
-            {
-                return AMQMethodBody.TYPE;
-            }
-
-            public int getSize()
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                return _underlyingBody.getSize();
-            }
-
-            public void writePayload(ByteBuffer buffer)
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                _underlyingBody.writePayload(buffer);
-            }
-
-            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
-                throws AMQException
-            {
-                throw new AMQException("This block should never be dispatched!");
-            }
-        };
-        return returnBlock;
-    }
-
-    private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
-        BasicGetOkBody getOkBody =
-                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
-                                                    messageHandle.isRedelivered(),
-                                                    pb.getExchange(),
-                                                    pb.getRoutingKey(),
-                                                    queueSize);
-        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
-        return getOkFrame;
-    }
-
-    public byte getProtocolMinorVersion()
-    {
-        return getProtocolSession().getProtocolMinorVersion();
-    }
-
-    public byte getProtocolMajorVersion()
-    {
-        return getProtocolSession().getProtocolMajorVersion();
-    }
-
-    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
-    {
-
-        BasicReturnBody basicReturnBody =
-                METHOD_REGISTRY.createBasicReturnBody(replyCode,
-                                                     replyText,
-                                                     message.getMessagePublishInfo().getExchange(),
-                                                     message.getMessagePublishInfo().getRoutingKey());
-        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
-        return returnFrame;
-    }
-
-    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
-
-        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
-
-        Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
-            writeFrame(compositeBlock);
-        }
-
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
-        {
-            writeFrame(bodyFrameIterator.next());
-        }
-    }
-
-
-    public void writeFrame(AMQDataBlock block)
-    {
-        getProtocolSession().writeFrame(block);
-    }
-
-
-    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
-    {
-
-        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
-        writeFrame(basicCancelOkBody.generateFrame(channelId));
-
-    }
-
-
-    public static final class CompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final AMQBody _contentBody;
-        private final int _channel;
-
-
-        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-            _contentBody = contentBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
-        }
-
-        public void writePayload(ByteBuffer buffer)
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
-        }
-    }
-
-    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final int _channel;
-
-
-        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
-        }
-
-        public void writePayload(ByteBuffer buffer)
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
-        }
-    }
-
-}
+package org.apache.qpid.server.output.amqp0_9;
+
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Iterator;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+    private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+
+
+    public static Factory getInstanceFactory()
+    {
+        return new Factory()
+        {
+
+            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+            {
+                return new ProtocolOutputConverterImpl(session);
+            }
+        };
+    }
+
+    private final AMQProtocolSession _protocolSession;
+
+    private ProtocolOutputConverterImpl(AMQProtocolSession session)
+    {
+        _protocolSession = session;
+    }
+
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+        final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+
+
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final StoreContext storeContext = message.getStoreContext();
+
+
+        final int bodyCount = messageHandle.getBodyCount(storeContext);
+
+        if(bodyCount == 0)
+        {
+            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+                                                                             contentHeaderBody);
+
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+
+
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+
+            AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
+
+            CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+            writeFrame(compositeBlock);
+
+            //
+            // Now start writing out the other content bodies
+            //
+            for(int i = 1; i < bodyCount; i++)
+            {
+                cb = messageHandle.getContentChunk(storeContext, i);
+                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
+            }
+
+
+        }
+        
+    }
+
+    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+    {
+        
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      contentHeaderBody);
+        return contentHeader;
+    }
+
+
+    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final StoreContext storeContext = message.getStoreContext();
+
+        AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+
+
+        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+
+        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        if(bodyCount == 0)
+        {
+            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                             contentHeader);
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+
+
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
+            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+            writeFrame(compositeBlock);
+
+            //
+            // Now start writing out the other content bodies
+            //
+            for(int i = 1; i < bodyCount; i++)
+            {
+                cb = messageHandle.getContentChunk(storeContext, i);
+                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
+            }
+
+
+        }
+
+
+    }
+
+
+    private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
+            throws AMQException
+    {
+        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+
+        final boolean isRedelivered = messageHandle.isRedelivered();
+        final AMQShortString exchangeName = pb.getExchange();
+        final AMQShortString routingKey = pb.getRoutingKey();
+
+        final AMQBody returnBlock = new AMQBody()
+        {
+
+            public AMQBody _underlyingBody;
+
+            public AMQBody createAMQBody()
+            {
+                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                                                              deliveryTag,
+                                                              isRedelivered,
+                                                              exchangeName,
+                                                              routingKey);
+
+
+
+
+
+            }
+
+            public byte getFrameType()
+            {
+                return AMQMethodBody.TYPE;
+            }
+
+            public int getSize()
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                return _underlyingBody.getSize();
+            }
+
+            public void writePayload(ByteBuffer buffer)
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                _underlyingBody.writePayload(buffer);
+            }
+
+            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                throws AMQException
+            {
+                throw new AMQException("This block should never be dispatched!");
+            }
+        };
+        return returnBlock;
+    }
+
+    private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        final MessagePublishInfo pb = message.getMessagePublishInfo();
+        final AMQMessageHandle messageHandle = message.getMessageHandle();
+
+
+        BasicGetOkBody getOkBody =
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+                                                    messageHandle.isRedelivered(),
+                                                    pb.getExchange(),
+                                                    pb.getRoutingKey(),
+                                                    queueSize);
+        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
+
+        return getOkFrame;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return getProtocolSession().getProtocolMinorVersion();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return getProtocolSession().getProtocolMajorVersion();
+    }
+
+    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    {
+
+        BasicReturnBody basicReturnBody =
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
+                                                     replyText,
+                                                     message.getMessagePublishInfo().getExchange(),
+                                                     message.getMessagePublishInfo().getRoutingKey());
+        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+
+        return returnFrame;
+    }
+
+    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+            throws AMQException
+    {
+        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+
+        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+
+        Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
+        //
+        // Optimise the case where we have a single content body. In that case we create a composite block
+        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+        //
+        if (bodyFrameIterator.hasNext())
+        {
+            AMQDataBlock firstContentBody = bodyFrameIterator.next();
+            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
+
+            writeFrame(compositeBlock);
+        }
+
+        //
+        // Now start writing out the other content bodies
+        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+        //
+        while (bodyFrameIterator.hasNext())
+        {
+            writeFrame(bodyFrameIterator.next());
+        }
+    }
+
+
+    public void writeFrame(AMQDataBlock block)
+    {
+        getProtocolSession().writeFrame(block);
+    }
+
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+    }
+
+
+    public static final class CompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final AMQBody _contentBody;
+        private final int _channel;
+
+
+        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+            _contentBody = contentBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+        }
+
+        public void writePayload(ByteBuffer buffer)
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+        }
+    }
+
+    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final int _channel;
+
+
+        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+        }
+
+        public void writePayload(ByteBuffer buffer)
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Aug 14 20:40:49 2008
@@ -25,6 +25,7 @@
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.CloseFuture;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 
 import org.apache.qpid.AMQChannelException;
@@ -49,6 +50,7 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.Sender;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -99,7 +101,7 @@
 
     private Object _lastSent;
 
-    private boolean _closed;
+    protected boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
 
@@ -113,6 +115,10 @@
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
+    private ProtocolSessionIdentifier _sessionIdentifier;
+
+    private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
+    private org.apache.mina.common.WriteFuture _lastWriteFuture;
 
     public ManagedObject getManagedObject()
     {
@@ -120,7 +126,7 @@
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
-        throws AMQException
+            throws AMQException
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _minaProtocolSession = session;
@@ -144,7 +150,7 @@
     }
 
     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
-        AMQStateManager stateManager) throws AMQException
+                                  AMQStateManager stateManager) throws AMQException
     {
         _stateManager = stateManager;
         _minaProtocolSession = session;
@@ -221,15 +227,14 @@
             {
                 if (_logger.isInfoEnabled())
                 {
-                    _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+                    _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
                 }
 
+                closeProtocolSession();
                 return;
             }
         }
 
-
-
         try
         {
             body.handle(channelId, this);
@@ -257,7 +262,6 @@
 
             String locales = "en_US";
 
-
             AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
                                                                                        (short) getProtocolMinorVersion(),
                                                                                        null,
@@ -265,7 +269,6 @@
                                                                                        locales.getBytes());
             _minaProtocolSession.write(responseBody.generateFrame(0));
 
-
         }
         catch (AMQException e)
         {
@@ -331,27 +334,16 @@
                         _logger.info("Closing connection due to: " + e.getMessage());
                     }
 
-                    closeSession();
-
                     AMQConnectionException ce =
-                        evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
-                            AMQConstant.CHANNEL_ERROR.getName().toString());
+                            evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+                                                                   AMQConstant.CHANNEL_ERROR.getName().toString());
 
-                    _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                    writeFrame(ce.getCloseFrame(channelId));
+                    closeConnection(channelId, ce, false);
                 }
             }
             catch (AMQConnectionException e)
             {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Closing connection due to: " + e.getMessage());
-                }
-
-                markChannelAwaitingCloseOk(channelId);
-                closeSession();
-                _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                writeFrame(e.getCloseFrame(channelId));
+                closeConnection(channelId, e, false);
             }
         }
         catch (Exception e)
@@ -364,7 +356,7 @@
 
             _logger.error("Unexpected exception while processing frame.  Closing connection.", e);
 
-            _minaProtocolSession.close();
+            closeProtocolSession();
         }
     }
 
@@ -373,7 +365,7 @@
 
         AMQChannel channel = getAndAssertChannel(channelId);
 
-        channel.publishContentHeader(body, this);
+        channel.publishContentHeader(body);
 
     }
 
@@ -381,7 +373,7 @@
     {
         AMQChannel channel = getAndAssertChannel(channelId);
 
-        channel.publishContentBody(body, this);
+        channel.publishContentBody(body);
     }
 
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
@@ -398,7 +390,8 @@
     public void writeFrame(AMQDataBlock frame)
     {
         _lastSent = frame;
-        _minaProtocolSession.write(frame);
+
+        _lastWriteFuture = _minaProtocolSession.write(frame);
     }
 
     public AMQShortString getContextKey()
@@ -430,7 +423,7 @@
     public AMQChannel getChannel(int channelId) throws AMQException
     {
         final AMQChannel channel =
-            ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+                ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
             return null;
@@ -443,7 +436,7 @@
 
     public boolean channelAwaitingClosure(int channelId)
     {
-        return _closingChannelsList.contains(channelId);
+        return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
     }
 
     public void addChannel(AMQChannel channel) throws AMQException
@@ -463,8 +456,8 @@
         if (_channelMap.size() == _maxNoOfChannels)
         {
             String errorMessage =
-                toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
-                + "); can't create channel";
+                    toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+                    + "); can't create channel";
             _logger.error(errorMessage);
             throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
         }
@@ -536,7 +529,7 @@
         {
             try
             {
-                channel.close(this);
+                channel.close();
                 markChannelAwaitingCloseOk(channelId);
             }
             finally
@@ -602,7 +595,7 @@
     {
         for (AMQChannel channel : _channelMap.values())
         {
-            channel.close(this);
+            channel.close();
         }
 
         _channelMap.clear();
@@ -618,6 +611,12 @@
         if (!_closed)
         {
             _closed = true;
+
+            if (_virtualHost != null)
+            {
+                _virtualHost.getConnectionRegistry().deregisterConnection(this);
+            }
+
             closeAllChannels();
             if (_managedObject != null)
             {
@@ -631,9 +630,54 @@
         }
     }
 
+    public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException
+    {
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Closing connection due to: " + e.getMessage());
+        }
+
+        markChannelAwaitingCloseOk(channelId);
+        closeSession();
+        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        writeFrame(e.getCloseFrame(channelId));
+
+        if (closeProtocolSession)
+        {
+            closeProtocolSession();
+        }
+    }
+
+    public void closeProtocolSession()
+    {
+        closeProtocolSession(true);
+    }
+
+    public void closeProtocolSession(boolean waitLast)
+    {
+        _logger.debug("Waiting for last write to join.");
+        if (waitLast && (_lastWriteFuture != null))
+        {
+            _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+        }
+
+        _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
+        final CloseFuture future = _minaProtocolSession.close();
+        future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+
+        try
+        {
+            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+        }
+        catch (AMQException e)
+        {
+            _logger.info(e.getMessage());
+        }
+    }
+
     public String toString()
     {
-        return "AMQProtocolSession(" + _minaProtocolSession.getRemoteAddress() + ")";
+        return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
     }
 
     public String dump()
@@ -702,6 +746,7 @@
                 _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
             }
         }
+        _sessionIdentifier = new ProtocolSessionIdentifier(this);
     }
 
     private void setProtocolVersion(ProtocolVersion pv)
@@ -739,7 +784,7 @@
 
     public Object getClientIdentifier()
     {
-        return _minaProtocolSession.getRemoteAddress();
+        return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
     }
 
     public VirtualHost getVirtualHost()
@@ -750,6 +795,9 @@
     public void setVirtualHost(VirtualHost virtualHost) throws AMQException
     {
         _virtualHost = virtualHost;
+
+        _virtualHost.getConnectionRegistry().registerConnection(this);
+
         _managedObject = createMBean();
         _managedObject.register();
     }
@@ -789,8 +837,23 @@
         return _dispatcher;
     }
 
+    public ProtocolSessionIdentifier getSessionIdentifier()
+    {
+        return _sessionIdentifier;
+    }
+
     public String getClientVersion()
     {
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
+
+    public void setSender(Sender<java.nio.ByteBuffer> sender)
+    {
+       // No-op, interface munging between this and AMQProtocolSession
+    }
+
+    public void init()
+    {
+       // No-op, interface munging between this and AMQProtocolSession
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java Thu Aug 14 20:40:49 2008
@@ -1,46 +1,46 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-
-/**
- * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to handle an AMQP method.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a
- *       Runtime.
- */
-public class AMQNoMethodHandlerException extends AMQException
-{
-    public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
-    {
-        super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+/**
+ * AMQNoMethodHandlerException represents the case where no method handler exists to handle an AQMP method.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to handle an AMQP method.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Missing method handler. Unlikely to ever happen, and if it does its a coding error. Consider replacing with a
+ *       Runtime.
+ */
+public class AMQNoMethodHandlerException extends AMQException
+{
+    public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+    {
+        super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Thu Aug 14 20:40:49 2008
@@ -265,6 +265,10 @@
      */
     public void messageSent(IoSession protocolSession, Object object) throws Exception
     {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Message sent: " + object);
+        }
     }
 
     protected boolean isSSLClient(ConnectorConfiguration connectionConfig,

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Thu Aug 14 20:40:49 2008
@@ -23,7 +23,9 @@
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -35,7 +37,27 @@
 public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
 {
 
+    public static final class ProtocolSessionIdentifier
+    {
+        private final Object _sessionIdentifier;
+        private final Object _sessionInstance;
 
+        ProtocolSessionIdentifier(AMQProtocolSession session)
+        {
+            _sessionIdentifier = session.getClientIdentifier();
+            _sessionInstance = session.getClientProperties() == null ? null : session.getClientProperties().getObject(ClientProperties.instance.toAMQShortString());
+        }
+
+        public Object getSessionIdentifier()
+        {
+            return _sessionIdentifier;
+        }
+
+        public Object getSessionInstance()
+        {
+            return _sessionInstance;
+        }
+    }
 
     public static interface Task
     {
@@ -129,6 +151,10 @@
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     void closeSession() throws AMQException;
 
+    /** This must be called to close the session in order to free up any resources managed by the session. */
+    void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException;
+
+
     /** @return a key that uniquely identifies this session */
     Object getKey();
 
@@ -175,5 +201,7 @@
     public MethodRegistry getMethodRegistry();
 
     public MethodDispatcher getMethodDispatcher();
+
+    public ProtocolSessionIdentifier getSessionIdentifier();
     
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java Thu Aug 14 20:40:49 2008
@@ -1,46 +1,46 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-
-/**
- * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to cast a frame to its expected type.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
- *       be better just to leave that as a ClassCastException. However, check the framing layer catches this error
- *       first.
- */
-public class UnknnownMessageTypeException extends AMQException
-{
-    public UnknnownMessageTypeException(AMQDataBlock message)
-    {
-        super("Unknown message type: " + message.getClass().getName() + ": " + message);
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+
+/**
+ * UnknnownMessageTypeException represents a failure when Mina passes an unexpected frame type.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to cast a frame to its expected type.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
+ *       be better just to leave that as a ClassCastException. However, check the framing layer catches this error
+ *       first.
+ */
+public class UnknnownMessageTypeException extends AMQException
+{
+    public UnknnownMessageTypeException(AMQDataBlock message)
+    {
+        super("Unknown message type: " + message.getClass().getName() + ": " + message);
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
            ('svn:eol-style' removed)