You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC

svn commit: r1225178 [3/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbstore/src/test/ bdbstore/src/test/jav...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Wed Dec 28 13:02:41 2011
@@ -1,419 +1,418 @@
-package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-
-
-    public static Factory getInstanceFactory()
-    {
-        return new Factory()
-        {
-
-            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
-            {
-                return new ProtocolOutputConverterImpl(session);
-            }
-        };
-    }
-
-    private final AMQProtocolSession _protocolSession;
-
-    private ProtocolOutputConverterImpl(AMQProtocolSession session)
-    {
-        _protocolSession = session;
-    }
-
-
-    public AMQProtocolSession getProtocolSession()
-    {
-        return _protocolSession;
-    }
-
-    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
-        writeMessageDelivery(entry, channelId, deliverBody);
-    }
-
-
-    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
-            throws AMQException
-    {
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
-        }
-        else
-        {
-            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
-            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
-            chb.bodySize = message.getSize();
-            return chb;
-        }
-    }
-
-
-    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
-    }
-
-    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-
-
-        int bodySize = (int) message.getSize();
-
-        if(bodySize == 0)
-        {
-            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
-                                                                             contentHeaderBody);
-
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
-            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
-            int writtenSize = capacity;
-
-            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
-
-            CompositeAMQBodyBlock
-                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
-
-            while(writtenSize < bodySize)
-            {
-                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
-                writtenSize += capacity;
-
-                writeFrame(new AMQFrame(channelId, body));
-            }
-        }
-    }
-
-    private class MessageContentSourceBody implements AMQBody
-    {
-        public static final byte TYPE = 3;
-        private int _length;
-        private MessageContentSource _message;
-        private int _offset;
-
-        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
-        {
-            _message = message;
-            _offset = offset;
-            _length = length;
-        }
-
-        public byte getFrameType()
-        {
-            return TYPE;
-        }
-
-        public int getSize()
-        {
-            return _length;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            byte[] data = new byte[_length];
-
-            _message.getContent(ByteBuffer.wrap(data), _offset);
-
-            buffer.write(data);
-        }
-
-        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-
-    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
-    {
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      contentHeaderBody);
-        return contentHeader;
-    }
-
-
-    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
-        writeMessageDelivery(entry, channelId, deliver);
-    }
-
-
-    private AMQBody createEncodedDeliverBody(QueueEntry entry,
-                                              final long deliveryTag,
-                                              final AMQShortString consumerTag)
-            throws AMQException
-    {
-
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        final AMQBody returnBlock = new AMQBody()
-        {
-
-            public AMQBody _underlyingBody;
-
-            public AMQBody createAMQBody()
-            {
-                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
-                                                              deliveryTag,
-                                                              isRedelivered,
-                                                              exchangeName,
-                                                              routingKey);
-
-
-
-
-
-            }
-
-            public byte getFrameType()
-            {
-                return AMQMethodBody.TYPE;
-            }
-
-            public int getSize()
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                return _underlyingBody.getSize();
-            }
-
-            public void writePayload(DataOutputStream buffer) throws IOException
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                _underlyingBody.writePayload(buffer);
-            }
-
-            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
-                throws AMQException
-            {
-                throw new AMQException("This block should never be dispatched!");
-            }
-        };
-        return returnBlock;
-    }
-
-    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        BasicGetOkBody getOkBody =
-                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
-                                                    isRedelivered,
-                                                    exchangeName,
-                                                    routingKey,
-                                                    queueSize);
-
-        return getOkBody;
-    }
-
-    public byte getProtocolMinorVersion()
-    {
-        return getProtocolSession().getProtocolMinorVersion();
-    }
-
-    public byte getProtocolMajorVersion()
-    {
-        return getProtocolSession().getProtocolMajorVersion();
-    }
-
-    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
-                                             int replyCode,
-                                             AMQShortString replyText) throws AMQException
-    {
-
-        BasicReturnBody basicReturnBody =
-                METHOD_REGISTRY.createBasicReturnBody(replyCode,
-                                                     replyText,
-                                                     messagePublishInfo.getExchange(),
-                                                     messagePublishInfo.getRoutingKey());
-
-
-        return basicReturnBody;
-    }
-
-    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-
-        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
-        writeMessageDelivery(message, header, channelId, returnFrame);
-    }
-
-
-    public void writeFrame(AMQDataBlock block)
-    {
-        getProtocolSession().writeFrame(block);
-    }
-
-
-    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
-    {
-
-        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
-        writeFrame(basicCancelOkBody.generateFrame(channelId));
-
-    }
-
-
-    public static final class CompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final AMQBody _contentBody;
-        private final int _channel;
-
-
-        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-            _contentBody = contentBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
-        }
-    }
-
-    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final int _channel;
-
-
-        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
-        }
-    }
-
-}
+package org.apache.qpid.server.output.amqp0_9;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
+
+    public static Factory getInstanceFactory()
+    {
+        return new Factory()
+        {
+
+            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+            {
+                return new ProtocolOutputConverterImpl(session);
+            }
+        };
+    }
+
+    private final AMQProtocolSession _protocolSession;
+
+    private ProtocolOutputConverterImpl(AMQProtocolSession session)
+    {
+        _protocolSession = session;
+    }
+
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+        writeMessageDelivery(entry, channelId, deliverBody);
+    }
+
+
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+            throws AMQException
+    {
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+        }
+        else
+        {
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize();
+            return chb;
+        }
+    }
+
+
+    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+    }
+
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+
+
+        int bodySize = (int) message.getSize();
+
+        if(bodySize == 0)
+        {
+            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+                                                                             contentHeaderBody);
+
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+            int writtenSize = capacity;
+
+            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+
+            CompositeAMQBodyBlock
+                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+            writeFrame(compositeBlock);
+
+            while(writtenSize < bodySize)
+            {
+                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
+
+                writeFrame(new AMQFrame(channelId, body));
+            }
+        }
+    }
+
+    private class MessageContentSourceBody implements AMQBody
+    {
+        public static final byte TYPE = 3;
+        private int _length;
+        private MessageContentSource _message;
+        private int _offset;
+
+        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        {
+            _message = message;
+            _offset = offset;
+            _length = length;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
+        }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            byte[] data = new byte[_length];
+
+            _message.getContent(ByteBuffer.wrap(data), _offset);
+
+            buffer.write(data);
+        }
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+
+    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+    {
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      contentHeaderBody);
+        return contentHeader;
+    }
+
+
+    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+        writeMessageDelivery(entry, channelId, deliver);
+    }
+
+
+    private AMQBody createEncodedDeliverBody(QueueEntry entry,
+                                              final long deliveryTag,
+                                              final AMQShortString consumerTag)
+            throws AMQException
+    {
+
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        final AMQBody returnBlock = new AMQBody()
+        {
+
+            public AMQBody _underlyingBody;
+
+            public AMQBody createAMQBody()
+            {
+                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                                                              deliveryTag,
+                                                              isRedelivered,
+                                                              exchangeName,
+                                                              routingKey);
+
+
+
+
+
+            }
+
+            public byte getFrameType()
+            {
+                return AMQMethodBody.TYPE;
+            }
+
+            public int getSize()
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                return _underlyingBody.getSize();
+            }
+
+            public void writePayload(DataOutput buffer) throws IOException
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                _underlyingBody.writePayload(buffer);
+            }
+
+            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                throws AMQException
+            {
+                throw new AMQException("This block should never be dispatched!");
+            }
+        };
+        return returnBlock;
+    }
+
+    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        BasicGetOkBody getOkBody =
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
+                                                    queueSize);
+
+        return getOkBody;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return getProtocolSession().getProtocolMinorVersion();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return getProtocolSession().getProtocolMajorVersion();
+    }
+
+    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+                                             int replyCode,
+                                             AMQShortString replyText) throws AMQException
+    {
+
+        BasicReturnBody basicReturnBody =
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
+                                                     replyText,
+                                                     messagePublishInfo.getExchange(),
+                                                     messagePublishInfo.getRoutingKey());
+
+
+        return basicReturnBody;
+    }
+
+    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+            throws AMQException
+    {
+
+        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+        writeMessageDelivery(message, header, channelId, returnFrame);
+    }
+
+
+    public void writeFrame(AMQDataBlock block)
+    {
+        getProtocolSession().writeFrame(block);
+    }
+
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+    }
+
+
+    public static final class CompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final AMQBody _contentBody;
+        private final int _channel;
+
+
+        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+            _contentBody = contentBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+        }
+    }
+
+    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final int _channel;
+
+
+        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+        }
+    }
+
+}

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java Wed Dec 28 13:02:41 2011
@@ -1,414 +1,425 @@
-package org.apache.qpid.server.output.amqp0_9_1;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
-
-    public static Factory getInstanceFactory()
-    {
-        return new Factory()
-        {
-
-            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
-            {
-                return new ProtocolOutputConverterImpl(session);
-            }
-        };
-    }
-
-    private final AMQProtocolSession _protocolSession;
-
-    private ProtocolOutputConverterImpl(AMQProtocolSession session)
-    {
-        _protocolSession = session;
-    }
-
-
-    public AMQProtocolSession getProtocolSession()
-    {
-        return _protocolSession;
-    }
-
-    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
-        writeMessageDelivery(entry, channelId, deliverBody);
-    }
-
-
-    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
-            throws AMQException
-    {
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
-        }
-        else
-        {
-            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
-            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
-            chb.bodySize = message.getSize();
-            return chb;
-        }
-    }
-
-
-    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
-    }
-
-    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-
-
-        int bodySize = (int) message.getSize();
-
-        if(bodySize == 0)
-        {
-            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
-                                                                             contentHeaderBody);
-
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
-            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
-            int writtenSize = capacity;
-
-            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
-            CompositeAMQBodyBlock
-                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
-
-            while(writtenSize < bodySize)
-            {
-                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
-                writtenSize += capacity;
-
-                writeFrame(new AMQFrame(channelId, body));
-            }
-        }
-    }
-
-    private class MessageContentSourceBody implements AMQBody
-    {
-        public static final byte TYPE = 3;
-        private int _length;
-        private MessageContentSource _message;
-        private int _offset;
-
-        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
-        {
-            _message = message;
-            _offset = offset;
-            _length = length;
-        }
-
-        public byte getFrameType()
-        {
-            return TYPE;
-        }
-
-        public int getSize()
-        {
-            return _length;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            byte[] data = new byte[_length];
-
-            _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
-            buffer.write(data);
-        }
-
-        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
-    {
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      contentHeaderBody);
-        return contentHeader;
-    }
-
-
-    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
-        writeMessageDelivery(entry, channelId, deliver);
-    }
-
-
-    private AMQBody createEncodedDeliverBody(QueueEntry entry,
-                                              final long deliveryTag,
-                                              final AMQShortString consumerTag)
-            throws AMQException
-    {
-
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        final AMQBody returnBlock = new AMQBody()
-        {
-
-            public AMQBody _underlyingBody;
-
-            public AMQBody createAMQBody()
-            {
-                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
-                                                              deliveryTag,
-                                                              isRedelivered,
-                                                              exchangeName,
-                                                              routingKey);
-
-
-
-
-
-            }
-
-            public byte getFrameType()
-            {
-                return AMQMethodBody.TYPE;
-            }
-
-            public int getSize()
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                return _underlyingBody.getSize();
-            }
-
-            public void writePayload(DataOutputStream buffer) throws IOException
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                _underlyingBody.writePayload(buffer);
-            }
-
-            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
-                throws AMQException
-            {
-                throw new AMQException("This block should never be dispatched!");
-            }
-        };
-        return returnBlock;
-    }
-
-    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        BasicGetOkBody getOkBody =
-                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
-                                                    isRedelivered,
-                                                    exchangeName,
-                                                    routingKey,
-                                                    queueSize);
-
-        return getOkBody;
-    }
-
-    public byte getProtocolMinorVersion()
-    {
-        return getProtocolSession().getProtocolMinorVersion();
-    }
-
-    public byte getProtocolMajorVersion()
-    {
-        return getProtocolSession().getProtocolMajorVersion();
-    }
-
-    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
-                                             int replyCode,
-                                             AMQShortString replyText) throws AMQException
-    {
-
-        BasicReturnBody basicReturnBody =
-                METHOD_REGISTRY.createBasicReturnBody(replyCode,
-                                                     replyText,
-                                                     messagePublishInfo.getExchange(),
-                                                     messagePublishInfo.getRoutingKey());
-
-
-        return basicReturnBody;
-    }
-
-    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-
-        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
-        writeMessageDelivery(message, header, channelId, returnFrame);
-    }
-
-
-    public void writeFrame(AMQDataBlock block)
-    {
-        getProtocolSession().writeFrame(block);
-    }
-
-
-    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
-    {
-
-        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
-        writeFrame(basicCancelOkBody.generateFrame(channelId));
-
-    }
-
-
-    public static final class CompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final AMQBody _contentBody;
-        private final int _channel;
-
-
-        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-            _contentBody = contentBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
-        }
-    }
-
-    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final int _channel;
-
-
-        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
-        }
-    }
-
+package org.apache.qpid.server.output.amqp0_9_1;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+
+    public static Factory getInstanceFactory()
+    {
+        return new Factory()
+        {
+
+            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+            {
+                return new ProtocolOutputConverterImpl(session);
+            }
+        };
+    }
+
+    private final AMQProtocolSession _protocolSession;
+
+    private ProtocolOutputConverterImpl(AMQProtocolSession session)
+    {
+        _protocolSession = session;
+    }
+
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+        writeMessageDelivery(entry, channelId, deliverBody);
+    }
+
+
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+            throws AMQException
+    {
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+        }
+        else
+        {
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize();
+            return chb;
+        }
+    }
+
+
+    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+    }
+
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+
+
+        int bodySize = (int) message.getSize();
+
+        if(bodySize == 0)
+        {
+            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+                                                                             contentHeaderBody);
+
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+            int writtenSize = capacity;
+
+            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+            CompositeAMQBodyBlock
+                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+            writeFrame(compositeBlock);
+
+            while(writtenSize < bodySize)
+            {
+                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
+
+                writeFrame(new AMQFrame(channelId, body));
+            }
+        }
+    }
+
+    private class MessageContentSourceBody implements AMQBody
+    {
+        public static final byte TYPE = 3;
+        private int _length;
+        private MessageContentSource _message;
+        private int _offset;
+
+        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        {
+            _message = message;
+            _offset = offset;
+            _length = length;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
+        }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            ByteBuffer buf = _message.getContent(_offset, _length);
+
+            if(buf.hasArray())
+            {
+                buffer.write(buf.array(), buf.arrayOffset()+buf.position(), buf.remaining());
+            }
+            else
+            {
+
+                byte[] data = new byte[_length];
+
+                buf.get(data);
+
+                buffer.write(data);
+            }
+        }
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+    {
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      contentHeaderBody);
+        return contentHeader;
+    }
+
+
+    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+        writeMessageDelivery(entry, channelId, deliver);
+    }
+
+
+    private AMQBody createEncodedDeliverBody(QueueEntry entry,
+                                              final long deliveryTag,
+                                              final AMQShortString consumerTag)
+            throws AMQException
+    {
+
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        final AMQBody returnBlock = new AMQBody()
+        {
+
+            public AMQBody _underlyingBody;
+
+            public AMQBody createAMQBody()
+            {
+                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                                                              deliveryTag,
+                                                              isRedelivered,
+                                                              exchangeName,
+                                                              routingKey);
+
+
+
+
+
+            }
+
+            public byte getFrameType()
+            {
+                return AMQMethodBody.TYPE;
+            }
+
+            public int getSize()
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                return _underlyingBody.getSize();
+            }
+
+            public void writePayload(DataOutput buffer) throws IOException
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                _underlyingBody.writePayload(buffer);
+            }
+
+            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                throws AMQException
+            {
+                throw new AMQException("This block should never be dispatched!");
+            }
+        };
+        return returnBlock;
+    }
+
+    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        BasicGetOkBody getOkBody =
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
+                                                    queueSize);
+
+        return getOkBody;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return getProtocolSession().getProtocolMinorVersion();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return getProtocolSession().getProtocolMajorVersion();
+    }
+
+    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+                                             int replyCode,
+                                             AMQShortString replyText) throws AMQException
+    {
+
+        BasicReturnBody basicReturnBody =
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
+                                                     replyText,
+                                                     messagePublishInfo.getExchange(),
+                                                     messagePublishInfo.getRoutingKey());
+
+
+        return basicReturnBody;
+    }
+
+    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+            throws AMQException
+    {
+
+        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+        writeMessageDelivery(message, header, channelId, returnFrame);
+    }
+
+
+    public void writeFrame(AMQDataBlock block)
+    {
+        getProtocolSession().writeFrame(block);
+    }
+
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+    }
+
+
+    public static final class CompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final AMQBody _contentBody;
+        private final int _channel;
+
+
+        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+            _contentBody = contentBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+        }
+    }
+
+    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final int _channel;
+
+
+        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+        }
+    }
+
 }
\ No newline at end of file

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Wed Dec 28 13:02:41 2011
@@ -111,6 +111,7 @@ public class AMQProtocolEngine implement
     // to save boxing the channelId and looking up in a map... cache in an array the low numbered
     // channels.  This value must be of the form 2^x - 1.
     private static final int CHANNEL_CACHE_SIZE = 0xff;
+    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
 
     private AMQShortString _contextKey;
 
@@ -280,6 +281,7 @@ public class AMQProtocolEngine implement
                     closeProtocolSession();
                 }
             }
+            receiveComplete();
         }
         catch (Exception e)
         {
@@ -288,6 +290,15 @@ public class AMQProtocolEngine implement
         }
     }
 
+    private void receiveComplete()
+    {
+        for (AMQChannel channel : _channelMap.values())
+        {
+            channel.receivedComplete();
+        }
+
+    }
+
     public void dataBlockReceived(AMQDataBlock message) throws Exception
     {
         _lastReceived = message;
@@ -405,35 +416,51 @@ public class AMQProtocolEngine implement
         }
     }
 
+
+    private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
     private ByteBuffer asByteBuffer(AMQDataBlock block)
     {
-        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+        final int size = (int) block.getSize();
 
-        try
-        {
-            block.writePayload(new DataOutputStream(new OutputStream()
-            {
+        final byte[] data;
 
 
-                @Override
-                public void write(int b) throws IOException
-                {
-                    buf.put((byte) b);
-                }
+        if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
+        {
+            data= new byte[size];
+        }
+        else
+        {
 
-                @Override
-                public void write(byte[] b, int off, int len) throws IOException
-                {
-                    buf.put(b, off, len);
-                }
-            }));
+            data = _reusableBytes;
+        }
+        _reusableDataOutput.setBuffer(data);
+
+        try
+        {
+            block.writePayload(_reusableDataOutput);
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
 
-        buf.flip();
+        final ByteBuffer buf;
+
+        if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
+        {
+            buf = _reusableByteBuffer;
+            buf.position(0);
+        }
+        else
+        {
+            buf = ByteBuffer.wrap(data);
+        }
+        buf.limit(_reusableDataOutput.length());
+
         return buf;
     }
 
@@ -1425,7 +1452,6 @@ public class AMQProtocolEngine implement
         _statisticsEnabled = enabled;
     }
 
-    @Override
     public boolean isSessionNameUnique(byte[] name)
     {
         // 0-8/0-9/0-9-1 sessions don't have names
@@ -1437,9 +1463,6 @@ public class AMQProtocolEngine implement
         _deferFlush = deferFlush;
     }
 
-
-
-    @Override
     public String getUserName()
     {
         return getAuthorizedPrincipal().getName();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Wed Dec 28 13:02:41 2011
@@ -557,7 +557,7 @@ public class AMQQueueMBean extends AMQMa
         List<String> list = new ArrayList<String>();
 
         AMQMessageHeader header = msg.getMessageHeader();
-        MessageProperties msgProps = msg.getHeader().get(MessageProperties.class);
+        MessageProperties msgProps = msg.getHeader().getMessageProperties();
 
         String appID = null;
         String userID = null;
@@ -619,7 +619,7 @@ public class AMQQueueMBean extends AMQMa
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
         _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
         txn.commit();
     }
@@ -654,7 +654,7 @@ public class AMQQueueMBean extends AMQMa
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
 
         _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Dec 28 13:02:41 2011
@@ -35,6 +35,7 @@ public interface BaseQueue extends Trans
 
     void enqueue(ServerMessage message) throws AMQException;
     void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
+    void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
 
     boolean isDurable();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Dec 28 13:02:41 2011
@@ -100,7 +100,7 @@ public class ConflationQueueList extends
     {
         if(entry.acquire())
         {
-            ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+            ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
             txn.dequeue(entry.getQueue(),entry.getMessage(),
                                     new ServerTransaction.Action()
                                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Wed Dec 28 13:02:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.framing.AMQShortString;
 
 public class InboundMessageAdapter implements InboundMessage
 {
@@ -44,6 +45,11 @@ public class InboundMessageAdapter imple
     }
 
 
+    public AMQShortString getRoutingKeyShortString()
+    {
+        return AMQShortString.valueOf(_entry.getMessage());
+    }
+
     public String getRoutingKey()
     {
         return _entry.getMessage().getRoutingKey();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Wed Dec 28 13:02:41 2011
@@ -63,7 +63,7 @@ public class IncomingMessage implements 
      * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
      * by the message handle.
      */
-    private ArrayList<? extends BaseQueue> _destinationQueues;
+    private List<? extends BaseQueue> _destinationQueues;
 
     private long _expiration;
 
@@ -126,12 +126,18 @@ public class IncomingMessage implements 
 
     public MessageMetaData headersReceived()
     {
-        _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+
+        return headersReceived(System.currentTimeMillis());
+    }
+
+    public MessageMetaData headersReceived(long currentTime)
+    {
+        _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
         return _messageMetaData;
     }
 
 
-    public ArrayList<? extends BaseQueue> getDestinationQueues()
+    public List<? extends BaseQueue> getDestinationQueues()
     {
         return _destinationQueues;
     }
@@ -158,6 +164,11 @@ public class IncomingMessage implements 
         return _messagePublishInfo.getExchange();
     }
 
+    public AMQShortString getRoutingKeyShortString()
+    {
+        return _messagePublishInfo.getRoutingKey();
+    }
+
     public String getRoutingKey()
     {
         return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
@@ -209,7 +220,7 @@ public class IncomingMessage implements 
         return getContentHeader().bodySize;
     }
 
-    public Long getMessageNumber()
+    public long getMessageNumber()
     {
         return _storedMessageHandle.getMessageNumber();
     }
@@ -225,7 +236,7 @@ public class IncomingMessage implements 
 
     }
 
-    public void enqueue(final ArrayList<? extends BaseQueue> queues)
+    public void enqueue(final List<? extends BaseQueue> queues)
     {
         _destinationQueues = queues;
     }
@@ -288,6 +299,15 @@ public class IncomingMessage implements 
 
     }
 
+
+    public ByteBuffer getContent(int offset, int size)
+    {
+        ByteBuffer buf = ByteBuffer.allocate(size);
+        getContent(buf,offset);
+        buf.flip();
+        return buf;
+    }
+
     public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
     {
         _storedMessageHandle = storedMessageHandle;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Dec 28 13:02:41 2011
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 import java.util.Collection;
@@ -424,7 +425,7 @@ public abstract class QueueEntryImpl imp
             if (rerouteQueues != null && rerouteQueues.size() != 0)
             {
 
-                ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+                ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
 
                 txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
                 {
@@ -447,7 +448,8 @@ public abstract class QueueEntryImpl imp
                     {
 
                     }
-                });
+                }, 0L);
+
                 txn.dequeue(currentQueue, message, new ServerTransaction.Action()
                 {
                     public void postCommit()
@@ -460,8 +462,10 @@ public abstract class QueueEntryImpl imp
 
                     }
                 });
-                }
+
+                txn.commit();
             }
+        }
     }
 
     public boolean isQueueDeleted()
@@ -549,4 +553,11 @@ public abstract class QueueEntryImpl imp
         _deliveryCountUpdater.decrementAndGet(this);
     }
 
+    public String toString()
+    {
+        return "QueueEntryImpl{" +
+                "_entryId=" + _entryId +
+                ", _state=" + _state +
+                '}';
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Wed Dec 28 13:02:41 2011
@@ -51,7 +51,6 @@ public class QueueRunner implements Read
 
     private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
 
-    private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
     private final AtomicBoolean _stateChange = new AtomicBoolean();
 
     private final AtomicLong _lastRunAgain = new AtomicLong();
@@ -65,8 +64,6 @@ public class QueueRunner implements Read
         _queue = queue;
     }
 
-    private int trouble = 0;
-
     public void run()
     {
         if(_scheduled.compareAndSet(SCHEDULED,RUNNING))

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Dec 28 13:02:41 2011
@@ -584,8 +584,16 @@ public class SimpleAMQQueue implements A
 
     public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
+        enqueue(message, false, action);
+    }
+
+    public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+    {
 
-        incrementTxnEnqueueStats(message);
+        if(transactional)
+        {
+            incrementTxnEnqueueStats(message);
+        }
         incrementQueueCount();
         incrementQueueSize(message);
 
@@ -733,13 +741,8 @@ public class SimpleAMQQueue implements A
     
     private void incrementTxnEnqueueStats(final ServerMessage message)
     {
-        SessionConfig session = message.getSessionConfig();
-        
-        if(session !=null && session.isTransactional())
-        {
-            _msgTxnEnqueues.incrementAndGet();
-            _byteTxnEnqueues.addAndGet(message.getSize());
-        }
+        _msgTxnEnqueues.incrementAndGet();
+        _byteTxnEnqueues.addAndGet(message.getSize());
     }
     
     private void incrementTxnDequeueStats(QueueEntry entry)
@@ -1447,7 +1450,7 @@ public class SimpleAMQQueue implements A
                                         {
 
                                         }
-                                    });
+                                    }, 0L);
                         txn.dequeue(this, entry.getMessage(),
                                     new ServerTransaction.Action()
                                     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Wed Dec 28 13:02:41 2011
@@ -32,11 +32,9 @@ import static org.apache.qpid.server.sec
 
 import java.net.SocketAddress;
 import java.security.Principal;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.security.auth.Subject;
 
@@ -192,6 +190,15 @@ public class SecurityManager
         return _logger;
     }
 
+    private static class CachedPropertiesMap extends LinkedHashMap<String, PublishAccessCheck>
+    {
+        @Override
+        protected boolean removeEldestEntry(Entry<String, PublishAccessCheck> eldest)
+        {
+            return size() >= 200;
+        }
+    }
+
     private abstract class AccessCheck
     {
         abstract Result allowed(SecurityPlugin plugin);
@@ -204,56 +211,61 @@ public class SecurityManager
             return true;
         }
 
-        HashMap<String, SecurityPlugin> remainingPlugins = new HashMap<String, SecurityPlugin>(_globalPlugins);
+        Map<String, SecurityPlugin> remainingPlugins = _globalPlugins.isEmpty()
+                ? Collections.<String, SecurityPlugin>emptyMap()
+                : _hostPlugins.isEmpty() ? _globalPlugins : new HashMap<String, SecurityPlugin>(_globalPlugins);
 		
-		for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+		if(!_hostPlugins.isEmpty())
         {
-		    // Create set of global only plugins
-			SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
-			if (globalPlugin != null)
-			{
-				remainingPlugins.remove(hostEntry.getKey());
-			}
-			
-            Result host = checker.allowed(hostEntry.getValue());
-			
-			if (host == Result.DENIED)
-			{
-				// Something vetoed the access, we're done
-				return false;
-			}
-            
-			// host allow overrides global allow, so only check global on abstain or defer
-			if (host != Result.ALLOWED)
-			{
-				if (globalPlugin == null)
-				{
-				    if (host == Result.DEFER)
-				    {
-				        host = hostEntry.getValue().getDefault();
-                    }
-                    if (host == Result.DENIED)
+            for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+            {
+                // Create set of global only plugins
+                SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
+                if (globalPlugin != null)
+                {
+                    remainingPlugins.remove(hostEntry.getKey());
+                }
+
+                Result host = checker.allowed(hostEntry.getValue());
+
+                if (host == Result.DENIED)
+                {
+                    // Something vetoed the access, we're done
+                    return false;
+                }
+
+                // host allow overrides global allow, so only check global on abstain or defer
+                if (host != Result.ALLOWED)
+                {
+                    if (globalPlugin == null)
                     {
-                        return false;
+                        if (host == Result.DEFER)
+                        {
+                            host = hostEntry.getValue().getDefault();
+                        }
+                        if (host == Result.DENIED)
+                        {
+                            return false;
+                        }
                     }
-				}
-				else
-				{
-				    Result global = checker.allowed(globalPlugin);
-					if (global == Result.DEFER)
-					{
-					    global = globalPlugin.getDefault();
-					}
-					if (global == Result.ABSTAIN && host == Result.DEFER)
-					{
-					    global = hostEntry.getValue().getDefault();
-					}
-					if (global == Result.DENIED)
+                    else
                     {
-                        return false;
+                        Result global = checker.allowed(globalPlugin);
+                        if (global == Result.DEFER)
+                        {
+                            global = globalPlugin.getDefault();
+                        }
+                        if (global == Result.ABSTAIN && host == Result.DEFER)
+                        {
+                            global = hostEntry.getValue().getDefault();
+                        }
+                        if (global == Result.DENIED)
+                        {
+                            return false;
+                        }
                     }
-				}
-			}
+                }
+            }
         }
 
         for (SecurityPlugin plugin : remainingPlugins.values())
@@ -371,15 +383,33 @@ public class SecurityManager
         });
     }
 
+
+    private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _immediatePublishPropsCache
+            = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+    private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _publishPropsCache
+            = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+
     public boolean authorisePublish(final boolean immediate, final String routingKey, final String exchangeName)
     {
-        return checkAllPlugins(new AccessCheck()
+        PublishAccessCheck check;
+        ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> cache =
+                immediate ? _immediatePublishPropsCache : _publishPropsCache;
+
+        ConcurrentHashMap<String, PublishAccessCheck> exchangeMap = cache.get(exchangeName);
+        if(exchangeMap == null)
         {
-            Result allowed(SecurityPlugin plugin)
+            cache.putIfAbsent(exchangeName, new ConcurrentHashMap<String, PublishAccessCheck>());
+            exchangeMap = cache.get(exchangeName);
+        }
+
+            check = exchangeMap.get(routingKey);
+            if(check == null)
             {
-                return plugin.authorise(PUBLISH, EXCHANGE, new ObjectProperties(exchangeName, routingKey, immediate));
+                check = new PublishAccessCheck(new ObjectProperties(exchangeName, routingKey, immediate));
+                exchangeMap.put(routingKey, check);
             }
-        });
+
+        return checkAllPlugins(check);
     }
 
     public boolean authorisePurge(final AMQQueue queue)
@@ -413,4 +443,19 @@ public class SecurityManager
 
         return current;
     }
+
+    private class PublishAccessCheck extends AccessCheck
+    {
+        private final ObjectProperties _props;
+
+        public PublishAccessCheck(ObjectProperties props)
+        {
+            _props = props;
+        }
+
+        Result allowed(SecurityPlugin plugin)
+        {
+            return plugin.authorise(PUBLISH, EXCHANGE, _props);
+        }
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org