You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC
svn commit: r1225178 [3/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbstore/src/test/ bdbstore/src/test/jav...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Wed Dec 28 13:02:41 2011
@@ -1,419 +1,418 @@
-package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-
-}
+package org.apache.qpid.server.output.amqp0_9;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
+ }
+ }
+ }
+
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ byte[] data = new byte[_length];
+
+ _message.getContent(ByteBuffer.wrap(data), _offset);
+
+ buffer.write(data);
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
+}
Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java Wed Dec 28 13:02:41 2011
@@ -1,414 +1,425 @@
-package org.apache.qpid.server.output.amqp0_9_1;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
- private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
-
- public static Factory getInstanceFactory()
- {
- return new Factory()
- {
-
- public ProtocolOutputConverter newInstance(AMQProtocolSession session)
- {
- return new ProtocolOutputConverterImpl(session);
- }
- };
- }
-
- private final AMQProtocolSession _protocolSession;
-
- private ProtocolOutputConverterImpl(AMQProtocolSession session)
- {
- _protocolSession = session;
- }
-
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
- }
-
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
- {
- if(entry.getMessage() instanceof AMQMessage)
- {
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
- }
- else
- {
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
- ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
- chb.bodySize = message.getSize();
- return chb;
- }
- }
-
-
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
- throws AMQException
- {
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
- }
-
- private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
- throws AMQException
- {
-
-
- int bodySize = (int) message.getSize();
-
- if(bodySize == 0)
- {
- SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
- contentHeaderBody);
-
- writeFrame(compositeBlock);
- }
- else
- {
- int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
- int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
- int writtenSize = capacity;
-
- AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
- CompositeAMQBodyBlock
- compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
- writeFrame(compositeBlock);
-
- while(writtenSize < bodySize)
- {
- capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
- writtenSize += capacity;
-
- writeFrame(new AMQFrame(channelId, body));
- }
- }
- }
-
- private class MessageContentSourceBody implements AMQBody
- {
- public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
-
- public MessageContentSourceBody(MessageContentSource message, int offset, int length)
- {
- _message = message;
- _offset = offset;
- _length = length;
- }
-
- public byte getFrameType()
- {
- return TYPE;
- }
-
- public int getSize()
- {
- return _length;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- byte[] data = new byte[_length];
-
- _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
- buffer.write(data);
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
- {
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- contentHeaderBody);
- return contentHeader;
- }
-
-
- public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
- }
-
-
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
- throws AMQException
- {
-
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- final AMQBody returnBlock = new AMQBody()
- {
-
- public AMQBody _underlyingBody;
-
- public AMQBody createAMQBody()
- {
- return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
- deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey);
-
-
-
-
-
- }
-
- public byte getFrameType()
- {
- return AMQMethodBody.TYPE;
- }
-
- public int getSize()
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- return _underlyingBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- if(_underlyingBody == null)
- {
- _underlyingBody = createAMQBody();
- }
- _underlyingBody.writePayload(buffer);
- }
-
- public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
- throw new AMQException("This block should never be dispatched!");
- }
- };
- return returnBlock;
- }
-
- private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
- throws AMQException
- {
- final AMQShortString exchangeName;
- final AMQShortString routingKey;
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
-
- BasicGetOkBody getOkBody =
- METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- isRedelivered,
- exchangeName,
- routingKey,
- queueSize);
-
- return getOkBody;
- }
-
- public byte getProtocolMinorVersion()
- {
- return getProtocolSession().getProtocolMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return getProtocolSession().getProtocolMajorVersion();
- }
-
- private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- BasicReturnBody basicReturnBody =
- METHOD_REGISTRY.createBasicReturnBody(replyCode,
- replyText,
- messagePublishInfo.getExchange(),
- messagePublishInfo.getRoutingKey());
-
-
- return basicReturnBody;
- }
-
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
-
- AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- writeMessageDelivery(message, header, channelId, returnFrame);
- }
-
-
- public void writeFrame(AMQDataBlock block)
- {
- getProtocolSession().writeFrame(block);
- }
-
-
- public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
- {
-
- BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
- writeFrame(basicCancelOkBody.generateFrame(channelId));
-
- }
-
-
- public static final class CompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final AMQBody _contentBody;
- private final int _channel;
-
-
- public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
- _contentBody = contentBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
- }
- }
-
- public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
- {
- public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
- private final AMQBody _methodBody;
- private final AMQBody _headerBody;
- private final int _channel;
-
-
- public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
- {
- _channel = channel;
- _methodBody = methodBody;
- _headerBody = headerBody;
-
- }
-
- public long getSize()
- {
- return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
- }
-
- public void writePayload(DataOutputStream buffer) throws IOException
- {
- AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
- }
- }
-
+package org.apache.qpid.server.output.amqp0_9_1;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+ int writtenSize = capacity;
+
+ AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+ MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+ writtenSize += capacity;
+
+ writeFrame(new AMQFrame(channelId, body));
+ }
+ }
+ }
+
+ private class MessageContentSourceBody implements AMQBody
+ {
+ public static final byte TYPE = 3;
+ private int _length;
+ private MessageContentSource _message;
+ private int _offset;
+
+ public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+ {
+ _message = message;
+ _offset = offset;
+ _length = length;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+ public int getSize()
+ {
+ return _length;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ ByteBuffer buf = _message.getContent(_offset, _length);
+
+ if(buf.hasArray())
+ {
+ buffer.write(buf.array(), buf.arrayOffset()+buf.position(), buf.remaining());
+ }
+ else
+ {
+
+ byte[] data = new byte[_length];
+
+ buf.get(data);
+
+ buffer.write(data);
+ }
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
}
\ No newline at end of file
Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Wed Dec 28 13:02:41 2011
@@ -111,6 +111,7 @@ public class AMQProtocolEngine implement
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
+ private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
private AMQShortString _contextKey;
@@ -280,6 +281,7 @@ public class AMQProtocolEngine implement
closeProtocolSession();
}
}
+ receiveComplete();
}
catch (Exception e)
{
@@ -288,6 +290,15 @@ public class AMQProtocolEngine implement
}
}
+ private void receiveComplete()
+ {
+ for (AMQChannel channel : _channelMap.values())
+ {
+ channel.receivedComplete();
+ }
+
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -405,35 +416,51 @@ public class AMQProtocolEngine implement
}
}
+
+ private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+ private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+ private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
private ByteBuffer asByteBuffer(AMQDataBlock block)
{
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+ final int size = (int) block.getSize();
- try
- {
- block.writePayload(new DataOutputStream(new OutputStream()
- {
+ final byte[] data;
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
+ if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ data= new byte[size];
+ }
+ else
+ {
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
+ data = _reusableBytes;
+ }
+ _reusableDataOutput.setBuffer(data);
+
+ try
+ {
+ block.writePayload(_reusableDataOutput);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- buf.flip();
+ final ByteBuffer buf;
+
+ if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ buf = _reusableByteBuffer;
+ buf.position(0);
+ }
+ else
+ {
+ buf = ByteBuffer.wrap(data);
+ }
+ buf.limit(_reusableDataOutput.length());
+
return buf;
}
@@ -1425,7 +1452,6 @@ public class AMQProtocolEngine implement
_statisticsEnabled = enabled;
}
- @Override
public boolean isSessionNameUnique(byte[] name)
{
// 0-8/0-9/0-9-1 sessions don't have names
@@ -1437,9 +1463,6 @@ public class AMQProtocolEngine implement
_deferFlush = deferFlush;
}
-
-
- @Override
public String getUserName()
{
return getAuthorizedPrincipal().getName();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Wed Dec 28 13:02:41 2011
@@ -557,7 +557,7 @@ public class AMQQueueMBean extends AMQMa
List<String> list = new ArrayList<String>();
AMQMessageHeader header = msg.getMessageHeader();
- MessageProperties msgProps = msg.getHeader().get(MessageProperties.class);
+ MessageProperties msgProps = msg.getHeader().getMessageProperties();
String appID = null;
String userID = null;
@@ -619,7 +619,7 @@ public class AMQQueueMBean extends AMQMa
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
_queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
txn.commit();
}
@@ -654,7 +654,7 @@ public class AMQQueueMBean extends AMQMa
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
_queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Dec 28 13:02:41 2011
@@ -35,6 +35,7 @@ public interface BaseQueue extends Trans
void enqueue(ServerMessage message) throws AMQException;
void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
+ void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
boolean isDurable();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Dec 28 13:02:41 2011
@@ -100,7 +100,7 @@ public class ConflationQueueList extends
{
if(entry.acquire())
{
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
txn.dequeue(entry.getQueue(),entry.getMessage(),
new ServerTransaction.Action()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Wed Dec 28 13:02:41 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.framing.AMQShortString;
public class InboundMessageAdapter implements InboundMessage
{
@@ -44,6 +45,11 @@ public class InboundMessageAdapter imple
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return AMQShortString.valueOf(_entry.getMessage());
+ }
+
public String getRoutingKey()
{
return _entry.getMessage().getRoutingKey();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Wed Dec 28 13:02:41 2011
@@ -63,7 +63,7 @@ public class IncomingMessage implements
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private ArrayList<? extends BaseQueue> _destinationQueues;
+ private List<? extends BaseQueue> _destinationQueues;
private long _expiration;
@@ -126,12 +126,18 @@ public class IncomingMessage implements
public MessageMetaData headersReceived()
{
- _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+
+ return headersReceived(System.currentTimeMillis());
+ }
+
+ public MessageMetaData headersReceived(long currentTime)
+ {
+ _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
return _messageMetaData;
}
- public ArrayList<? extends BaseQueue> getDestinationQueues()
+ public List<? extends BaseQueue> getDestinationQueues()
{
return _destinationQueues;
}
@@ -158,6 +164,11 @@ public class IncomingMessage implements
return _messagePublishInfo.getExchange();
}
+ public AMQShortString getRoutingKeyShortString()
+ {
+ return _messagePublishInfo.getRoutingKey();
+ }
+
public String getRoutingKey()
{
return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
@@ -209,7 +220,7 @@ public class IncomingMessage implements
return getContentHeader().bodySize;
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
return _storedMessageHandle.getMessageNumber();
}
@@ -225,7 +236,7 @@ public class IncomingMessage implements
}
- public void enqueue(final ArrayList<? extends BaseQueue> queues)
+ public void enqueue(final List<? extends BaseQueue> queues)
{
_destinationQueues = queues;
}
@@ -288,6 +299,15 @@ public class IncomingMessage implements
}
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(buf,offset);
+ buf.flip();
+ return buf;
+ }
+
public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
{
_storedMessageHandle = storedMessageHandle;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Dec 28 13:02:41 2011
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import java.util.Collection;
@@ -424,7 +425,7 @@ public abstract class QueueEntryImpl imp
if (rerouteQueues != null && rerouteQueues.size() != 0)
{
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
{
@@ -447,7 +448,8 @@ public abstract class QueueEntryImpl imp
{
}
- });
+ }, 0L);
+
txn.dequeue(currentQueue, message, new ServerTransaction.Action()
{
public void postCommit()
@@ -460,8 +462,10 @@ public abstract class QueueEntryImpl imp
}
});
- }
+
+ txn.commit();
}
+ }
}
public boolean isQueueDeleted()
@@ -549,4 +553,11 @@ public abstract class QueueEntryImpl imp
_deliveryCountUpdater.decrementAndGet(this);
}
+ public String toString()
+ {
+ return "QueueEntryImpl{" +
+ "_entryId=" + _entryId +
+ ", _state=" + _state +
+ '}';
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Wed Dec 28 13:02:41 2011
@@ -51,7 +51,6 @@ public class QueueRunner implements Read
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
- private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
private final AtomicLong _lastRunAgain = new AtomicLong();
@@ -65,8 +64,6 @@ public class QueueRunner implements Read
_queue = queue;
}
- private int trouble = 0;
-
public void run()
{
if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Dec 28 13:02:41 2011
@@ -584,8 +584,16 @@ public class SimpleAMQQueue implements A
public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
+ enqueue(message, false, action);
+ }
+
+ public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+ {
- incrementTxnEnqueueStats(message);
+ if(transactional)
+ {
+ incrementTxnEnqueueStats(message);
+ }
incrementQueueCount();
incrementQueueSize(message);
@@ -733,13 +741,8 @@ public class SimpleAMQQueue implements A
private void incrementTxnEnqueueStats(final ServerMessage message)
{
- SessionConfig session = message.getSessionConfig();
-
- if(session !=null && session.isTransactional())
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
+ _msgTxnEnqueues.incrementAndGet();
+ _byteTxnEnqueues.addAndGet(message.getSize());
}
private void incrementTxnDequeueStats(QueueEntry entry)
@@ -1447,7 +1450,7 @@ public class SimpleAMQQueue implements A
{
}
- });
+ }, 0L);
txn.dequeue(this, entry.getMessage(),
new ServerTransaction.Action()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Wed Dec 28 13:02:41 2011
@@ -32,11 +32,9 @@ import static org.apache.qpid.server.sec
import java.net.SocketAddress;
import java.security.Principal;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.Subject;
@@ -192,6 +190,15 @@ public class SecurityManager
return _logger;
}
+ private static class CachedPropertiesMap extends LinkedHashMap<String, PublishAccessCheck>
+ {
+ @Override
+ protected boolean removeEldestEntry(Entry<String, PublishAccessCheck> eldest)
+ {
+ return size() >= 200;
+ }
+ }
+
private abstract class AccessCheck
{
abstract Result allowed(SecurityPlugin plugin);
@@ -204,56 +211,61 @@ public class SecurityManager
return true;
}
- HashMap<String, SecurityPlugin> remainingPlugins = new HashMap<String, SecurityPlugin>(_globalPlugins);
+ Map<String, SecurityPlugin> remainingPlugins = _globalPlugins.isEmpty()
+ ? Collections.<String, SecurityPlugin>emptyMap()
+ : _hostPlugins.isEmpty() ? _globalPlugins : new HashMap<String, SecurityPlugin>(_globalPlugins);
- for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+ if(!_hostPlugins.isEmpty())
{
- // Create set of global only plugins
- SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
- if (globalPlugin != null)
- {
- remainingPlugins.remove(hostEntry.getKey());
- }
-
- Result host = checker.allowed(hostEntry.getValue());
-
- if (host == Result.DENIED)
- {
- // Something vetoed the access, we're done
- return false;
- }
-
- // host allow overrides global allow, so only check global on abstain or defer
- if (host != Result.ALLOWED)
- {
- if (globalPlugin == null)
- {
- if (host == Result.DEFER)
- {
- host = hostEntry.getValue().getDefault();
- }
- if (host == Result.DENIED)
+ for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+ {
+ // Create set of global only plugins
+ SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
+ if (globalPlugin != null)
+ {
+ remainingPlugins.remove(hostEntry.getKey());
+ }
+
+ Result host = checker.allowed(hostEntry.getValue());
+
+ if (host == Result.DENIED)
+ {
+ // Something vetoed the access, we're done
+ return false;
+ }
+
+ // host allow overrides global allow, so only check global on abstain or defer
+ if (host != Result.ALLOWED)
+ {
+ if (globalPlugin == null)
{
- return false;
+ if (host == Result.DEFER)
+ {
+ host = hostEntry.getValue().getDefault();
+ }
+ if (host == Result.DENIED)
+ {
+ return false;
+ }
}
- }
- else
- {
- Result global = checker.allowed(globalPlugin);
- if (global == Result.DEFER)
- {
- global = globalPlugin.getDefault();
- }
- if (global == Result.ABSTAIN && host == Result.DEFER)
- {
- global = hostEntry.getValue().getDefault();
- }
- if (global == Result.DENIED)
+ else
{
- return false;
+ Result global = checker.allowed(globalPlugin);
+ if (global == Result.DEFER)
+ {
+ global = globalPlugin.getDefault();
+ }
+ if (global == Result.ABSTAIN && host == Result.DEFER)
+ {
+ global = hostEntry.getValue().getDefault();
+ }
+ if (global == Result.DENIED)
+ {
+ return false;
+ }
}
- }
- }
+ }
+ }
}
for (SecurityPlugin plugin : remainingPlugins.values())
@@ -371,15 +383,33 @@ public class SecurityManager
});
}
+
+ private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _immediatePublishPropsCache
+ = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+ private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _publishPropsCache
+ = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+
public boolean authorisePublish(final boolean immediate, final String routingKey, final String exchangeName)
{
- return checkAllPlugins(new AccessCheck()
+ PublishAccessCheck check;
+ ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> cache =
+ immediate ? _immediatePublishPropsCache : _publishPropsCache;
+
+ ConcurrentHashMap<String, PublishAccessCheck> exchangeMap = cache.get(exchangeName);
+ if(exchangeMap == null)
{
- Result allowed(SecurityPlugin plugin)
+ cache.putIfAbsent(exchangeName, new ConcurrentHashMap<String, PublishAccessCheck>());
+ exchangeMap = cache.get(exchangeName);
+ }
+
+ check = exchangeMap.get(routingKey);
+ if(check == null)
{
- return plugin.authorise(PUBLISH, EXCHANGE, new ObjectProperties(exchangeName, routingKey, immediate));
+ check = new PublishAccessCheck(new ObjectProperties(exchangeName, routingKey, immediate));
+ exchangeMap.put(routingKey, check);
}
- });
+
+ return checkAllPlugins(check);
}
public boolean authorisePurge(final AMQQueue queue)
@@ -413,4 +443,19 @@ public class SecurityManager
return current;
}
+
+ private class PublishAccessCheck extends AccessCheck
+ {
+ private final ObjectProperties _props;
+
+ public PublishAccessCheck(ObjectProperties props)
+ {
+ _props = props;
+ }
+
+ Result allowed(SecurityPlugin plugin)
+ {
+ return plugin.authorise(PUBLISH, EXCHANGE, _props);
+ }
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org