You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC
svn commit: r827724 [3/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apach...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Tue Oct 20 16:23:01 2009
@@ -27,27 +27,34 @@
package org.apache.qpid.server.output.amqp0_8;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
-import java.util.Iterator;
+import java.nio.ByteBuffer;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
- private final MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+ private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
+ METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
{
return new Factory()
{
-
+
public ProtocolOutputConverter newInstance(AMQProtocolSession session)
{
return new ProtocolOutputConverterImpl(session);
@@ -71,67 +78,44 @@
public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQMessage message = (AMQMessage) entry.getMessage();
AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
-
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
- final int bodyCount = messageHandle.getBodyCount();
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ }
- if(bodyCount == 0)
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
-
- writeFrame(compositeBlock);
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
}
else
{
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
}
-
-
}
public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+ }
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+ throws AMQException
+ {
- AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
- final int bodyCount = messageHandle.getBodyCount();
- if(bodyCount == 0)
+ final int bodySize = (int) message.getSize();
+ if(bodySize == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
contentHeader);
@@ -139,48 +123,63 @@
}
else
{
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+ final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+ ByteBuffer buf = ByteBuffer.allocate(capacity);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(0);
+ int writtenSize = 0;
- AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
+ while(writtenSize < bodySize)
{
- cb = messageHandle.getContentChunk(i);
- writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ buf = java.nio.ByteBuffer.allocate(capacity);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
}
-
}
-
-
}
private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
BasicDeliverBody deliverBody =
- _methodRegistry.createBasicDeliverBody(consumerTag,
+ METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
deliveryTag,
- entry.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey());
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
@@ -190,14 +189,31 @@
private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
BasicGetOkBody getOkBody =
- _methodRegistry.createBasicGetOkBody(deliveryTag,
- entry.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey(),
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
@@ -217,7 +233,7 @@
private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
BasicReturnBody basicReturnBody =
- _methodRegistry.createBasicReturnBody(replyCode,
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
messagePublishInfo.getExchange(),
messagePublishInfo.getRoutingKey());
@@ -228,7 +244,7 @@
public void writeReturn(MessagePublishInfo messagePublishInfo,
ContentHeaderBody header,
- Iterator<AMQDataBlock> bodyFrameIterator,
+ MessageContentSource content,
int channelId,
int replyCode,
AMQShortString replyText)
@@ -237,36 +253,8 @@
AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- header);
+ writeMessageDelivery(content, header, channelId, returnFrame);
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
- writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- writeFrame(bodyFrameIterator.next());
- }
}
@@ -278,7 +266,7 @@
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
- BasicCancelOkBody basicCancelOkBody = _methodRegistry.createBasicCancelOkBody(consumerTag);
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Tue Oct 20 16:23:01 2009
@@ -1,6 +1,6 @@
package org.apache.qpid.server.output.amqp0_9;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,39 +8,41 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import org.apache.mina.common.ByteBuffer;
-import java.util.Iterator;
-
import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+ private static final ProtocolVersionMethodConverter
+ PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
public static Factory getInstanceFactory()
@@ -71,16 +73,43 @@
public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQMessage message = (AMQMessage) entry.getMessage();
- AMQBody deliverBody = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
- final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
- final AMQMessageHandle messageHandle = message.getMessageHandle();
- final int bodyCount = messageHandle.getBodyCount();
+ int bodySize = (int) message.getSize();
- if(bodyCount == 0)
+ if(bodySize == 0)
{
SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
contentHeaderBody);
@@ -89,36 +118,37 @@
}
else
{
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+ final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(0);
+ int writtenSize = 0;
- AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
- CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
+ while(writtenSize < bodySize)
{
- cb = messageHandle.getContentChunk(i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
+ buf = java.nio.ByteBuffer.allocate(capacity);
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ }
}
-
}
private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
{
-
+
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
contentHeaderBody);
return contentHeader;
@@ -127,63 +157,36 @@
public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
- AMQFrame deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
- final int bodyCount = messageHandle.getBodyCount();
- if(bodyCount == 0)
+ if(entry.getMessage() instanceof AMQMessage)
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- writeFrame(compositeBlock);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
}
else
{
-
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = messageHandle.getContentChunk(0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
- AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for(int i = 1; i < bodyCount; i++)
- {
- cb = messageHandle.getContentChunk(i);
- writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
- }
-
-
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
}
-
- }
-
-
- private AMQBody createEncodedDeliverFrame(QueueEntry entry, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
- throws AMQException
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
final boolean isRedelivered = entry.isRedelivered();
- final AMQShortString exchangeName = pb.getExchange();
- final AMQShortString routingKey = pb.getRoutingKey();
final AMQBody returnBlock = new AMQBody()
{
@@ -236,21 +239,37 @@
return returnBlock;
}
- private AMQFrame createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
throws AMQException
{
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
BasicGetOkBody getOkBody =
METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
- entry.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey(),
+ isRedelivered,
+ exchangeName,
+ routingKey,
queueSize);
- AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame;
+ return getOkBody;
}
public byte getProtocolMinorVersion()
@@ -263,7 +282,9 @@
return getProtocolSession().getProtocolMajorVersion();
}
- private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
{
BasicReturnBody basicReturnBody =
@@ -271,47 +292,18 @@
replyText,
messagePublishInfo.getExchange(),
messagePublishInfo.getRoutingKey());
- AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame;
+
+ return basicReturnBody;
}
- public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, Iterator<AMQDataBlock> bodyFrameIterator, int channelId, int replyCode, AMQShortString replyText)
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
-
- AMQDataBlock contentHeader = createContentHeaderBlock(channelId, header);
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
- writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
- writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- writeFrame(bodyFrameIterator.next());
- }
+ writeMessageDelivery(message, header, channelId, returnFrame);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Oct 20 16:23:01 2009
@@ -83,8 +83,8 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
@@ -139,7 +139,7 @@
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
-
+
// Create a simple ID that increments for ever new Session
private final long _sessionID = idGenerator.getAndIncrement();
@@ -152,11 +152,12 @@
private long _writtenBytes;
private long _readBytes;
-
+
private Job _readJob;
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private long _maxFrameSize;
public ManagedObject getManagedObject()
{
@@ -167,7 +168,7 @@
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_networkDriver = driver;
-
+
_codecFactory = new AMQCodecFactory(true, this);
_poolReference.acquireExecutorService();
_readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
@@ -178,17 +179,9 @@
}
- private AMQProtocolSessionMBean createMBean() throws AMQException
+ private AMQProtocolSessionMBean createMBean() throws JMException
{
- try
- {
- return new AMQProtocolSessionMBean(this);
- }
- catch (JMException ex)
- {
- _logger.error("AMQProtocolSession MBean creation has failed ", ex);
- throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
- }
+ return new AMQProtocolSessionMBean(this);
}
public long getSessionID()
@@ -200,7 +193,17 @@
{
return _actor;
}
-
+
+ public void setMaxFrameSize(long frameMax)
+ {
+ _maxFrameSize = frameMax;
+ }
+
+ public long getMaxFrameSize()
+ {
+ return _maxFrameSize;
+ }
+
public void received(final ByteBuffer msg)
{
_lastIoTime = System.currentTimeMillis();
@@ -290,7 +293,7 @@
else
{
// The channel has been told to close, we don't process any more frames until
- // it's closed.
+ // it's closed.
return;
}
}
@@ -545,7 +548,7 @@
private void checkForNotification()
{
int channelsCount = _channelMap.size();
- if (channelsCount >= _maxNoOfChannels)
+ if (_managedObject != null && channelsCount >= _maxNoOfChannels)
{
_managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
}
@@ -560,7 +563,7 @@
{
_maxNoOfChannels = value;
}
-
+
public void commitTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
@@ -576,7 +579,7 @@
channel.rollback();
}
}
-
+
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -697,7 +700,7 @@
{
task.doTask(this);
}
-
+
_closed = true;
_poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
@@ -775,7 +778,7 @@
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
-
+
public SaslServer getSaslServer()
{
return _saslServer;
@@ -865,8 +868,15 @@
_virtualHost.getConnectionRegistry().registerConnection(this);
- _managedObject = createMBean();
- _managedObject.register();
+ try
+ {
+ _managedObject = createMBean();
+ _managedObject.register();
+ }
+ catch (JMException e)
+ {
+ _logger.error(e);
+ }
}
public void addSessionCloseTask(Task task)
@@ -878,7 +888,7 @@
{
_taskList.remove(task);
}
-
+
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
@@ -905,7 +915,7 @@
public SocketAddress getRemoteAddress()
{
return _networkDriver.getRemoteAddress();
- }
+ }
public SocketAddress getLocalAddress()
{
@@ -941,7 +951,7 @@
public void setNetworkDriver(NetworkDriver driver)
{
- _networkDriver = driver;
+ _networkDriver = driver;
}
public void writerIdle()
@@ -970,7 +980,7 @@
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
-
+
writeFrame(closeBody.generateFrame(0));
_networkDriver.close();
@@ -986,7 +996,7 @@
{
// Do nothing
}
-
+
public long getReadBytes()
{
return _readBytes;
@@ -1006,7 +1016,7 @@
{
return _sessionIdentifier;
}
-
+
public String getClientVersion()
{
return (_clientVersion == null) ? null : _clientVersion.toString();
@@ -1024,5 +1034,5 @@
}
}
}
-
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,8 +28,7 @@
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.PrincipalHolder;
-import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -44,6 +43,10 @@
LogActor getLogActor();
+ void setMaxFrameSize(long frameMax);
+
+ long getMaxFrameSize();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
@@ -227,5 +230,5 @@
List<AMQChannel> getChannels();
void closeIfLingeringClosedChannels();
-
+
}
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java Tue Oct 20 16:23:01 2009
@@ -18,22 +18,20 @@
* under the License.
*
*/
-package org.apache.qpid.server;
+package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.protocol.ProtocolEngine_0_10;
public class ProtocolEngineFactory_0_10 implements ProtocolEngineFactory
{
private ConnectionDelegate _delegate;
- public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
-
public ProtocolEngineFactory_0_10(ConnectionDelegate delegate)
{
_delegate = delegate;
@@ -41,10 +39,8 @@
public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
{
- Connection conn = new Connection();
+ Connection conn = new ServerConnection();
conn.setConnectionDelegate(_delegate);
- Disassembler dis = new Disassembler(networkDriver, MAX_FRAME_SIZE);
- conn.setSender(dis);
- return new ProtocolEngine_0_10(conn, networkDriver); //To change body of implemented methods use File | Settings | File Templates.
+ return new ProtocolEngine_0_10(conn, networkDriver);
}
}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java
------------------------------------------------------------------------------
svn:executable = *
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Tue Oct 20 16:23:01 2009
@@ -18,31 +18,38 @@
* under the License.
*
*/
-package org.apache.qpid.server;
+package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
import java.net.SocketAddress;
public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
{
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
private NetworkDriver _networkDriver;
private long _readBytes;
private long _writtenBytes;
+ private Connection _connection;
public ProtocolEngine_0_10(Connection conn, NetworkDriver networkDriver)
{
super(new Assembler(conn));
+ _connection = conn;
_networkDriver = networkDriver;
}
public void setNetworkDriver(NetworkDriver driver)
{
_networkDriver = driver;
+ Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
+ _connection.setSender(dis);
}
public SocketAddress getRemoteAddress()
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Tue Oct 20 16:23:01 2009
@@ -24,7 +24,6 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.AMQException;
public class AMQPriorityQueue extends SimpleAMQQueue
{
@@ -34,7 +33,6 @@
final boolean autoDelete,
final VirtualHost virtualHost,
int priorities)
- throws AMQException
{
super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
}
@@ -43,7 +41,7 @@
boolean durable,
String owner,
boolean autoDelete,
- VirtualHost virtualHost, int priorities) throws AMQException
+ VirtualHost virtualHost, int priorities)
{
this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
}
@@ -70,7 +68,7 @@
QueueEntry released = context._releasedEntry;
while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
{
- if(_releasedUpdater.compareAndSet(context,released,entry))
+ if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
{
break;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Oct 20 16:23:01 2009
@@ -22,16 +22,16 @@
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.PrincipalHolder;
-import org.apache.qpid.server.ExchangeReferrer;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
@@ -40,7 +40,7 @@
import java.util.Set;
import java.util.Map;
-public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer
+public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
{
@@ -51,6 +51,8 @@
AMQShortString getName();
+ void setNoLocal(boolean b);
+
boolean isDurable();
boolean isAutoDelete();
@@ -114,7 +116,7 @@
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
+
void addQueueDeleteTask(final Task task);
@@ -128,11 +130,11 @@
List<Long> getMessagesOnTheQueue(int num, int offest);
QueueEntry getMessageOnTheQueue(long messageId);
-
+
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
- *
- * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
+ *
+ * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
* @param fromPosition
* @param toPosition
@@ -142,9 +144,9 @@
void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext);
+ ServerTransaction transaction);
- void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext);
+ void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction);
void removeMessagesFromQueue(long fromMessageId, long toMessageId);
@@ -265,6 +267,6 @@
}
void configure(QueueConfiguration config);
-
+
ManagedObject getManagedObject();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Tue Oct 20 16:23:01 2009
@@ -27,7 +27,6 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
-import java.util.HashMap;
public class AMQQueueFactory
@@ -130,7 +129,6 @@
AMQShortString owner,
boolean autoDelete,
VirtualHost virtualHost, final FieldTable arguments)
- throws AMQException
{
final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Oct 20 16:23:01 2009
@@ -22,21 +22,19 @@
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import javax.management.JMException;
import javax.management.MBeanNotificationInfo;
@@ -125,7 +123,7 @@
_msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("Message Content", "AMQ Message Content",
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content",
VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, VIEW_MSG_CONTENT_COMPOSITE_ITEM_DESCRIPTIONS,
_msgContentAttributeTypes);
@@ -135,9 +133,9 @@
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_msgAttributeTypes[4] = SimpleType.LONG; // For queue position
- _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES,
+ _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES,
VIEW_MSGS_COMPOSITE_ITEM_DESCRIPTIONS, _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
VIEW_MSGS_TABULAR_UNIQUE_INDEX);
}
@@ -299,7 +297,7 @@
/**
* Clears the queue of non-acquired messages
- *
+ *
* @return the number of messages deleted
* @see AMQQueue#clearQueue
*/
@@ -321,60 +319,40 @@
}
ServerMessage serverMsg = entry.getMessage();
-
- if(serverMsg instanceof AMQMessage)
- {
- AMQMessage msg = (AMQMessage) serverMsg;
- // get message content
- Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
- List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
- {
- ContentChunk body = cBodies.next();
- if (body.getSize() != 0)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.getData().slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
- }
+ final int bodySize = (int) serverMsg.getSize();
- try
- {
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
- {
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
+ List<Byte> msgContent = new ArrayList<Byte>();
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize);
+ int position = 0;
- return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
- }
- catch (AMQException e)
+ while(position < bodySize)
+ {
+ position += serverMsg.getContent(buf, position);
+ buf.flip();
+ for(int i = 0; i < buf.limit(); i++)
{
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
+ msgContent.add(buf.get(i));
}
-
+ buf.clear();
}
- else
+
+ AMQMessageHeader header = serverMsg.getMessageHeader();
+
+ String mimeType = null, encoding = null;
+ if (header != null)
{
- // TODO 0-10 Messages for MBean
- return null;
+ mimeType = header.getMimeType();
+
+ encoding = header.getEncoding();
}
+
+
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+ return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+
}
/**
@@ -386,8 +364,8 @@
{
return viewMessages((long)beginIndex,(long)endIndex);
}
-
-
+
+
/**
* Returns the header contents of the messages stored in this queue in tabular form.
* @param startPosition The queue position of the first message to be viewed
@@ -400,7 +378,7 @@
throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
-
+
if ((endPosition - startPosition) > Integer.MAX_VALUE)
{
throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size");
@@ -489,7 +467,9 @@
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, new StoreContext());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+ txn.commit();
}
/**
@@ -507,7 +487,7 @@
_queue.removeMessagesFromQueue(fromMessageId, toMessageId);
}
-
+
/**
* @see ManagedQueue#copyMessages
* @param fromMessageId
@@ -522,9 +502,15 @@
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, new StoreContext());
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+ _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+
+ txn.commit();
+
+
}
-
+
/**
* returns Notifications sent by this MBean.
*/
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,12 +44,12 @@
return _virtualHost;
}
- public void registerQueue(AMQQueue queue) throws AMQException
+ public void registerQueue(AMQQueue queue)
{
_queueMap.put(queue.getName(), queue);
}
- public void unregisterQueue(AMQShortString name) throws AMQException
+ public void unregisterQueue(AMQShortString name)
{
_queueMap.remove(name);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Tue Oct 20 16:23:01 2009
@@ -25,21 +25,22 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.ContentHeaderBodyAdapter;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.nio.ByteBuffer;
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, BodyContentHolder
+public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
{
/** Used for debugging purposes. */
@@ -51,9 +52,6 @@
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private AMQMessageHandle _messageHandle;
- private final Long _messageId;
-
/**
* Keeps a track of how many bytes we have received in body frames
@@ -70,25 +68,29 @@
private long _expiration;
private Exchange _exchange;
- private AMQMessageHeader _messageHeader;
private int _receivedChunkCount = 0;
private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
+ // we keep both the original meta data object and the store reference to it just in case the
+ // store would otherwise flow it to disk
+
+ private MessageMetaData _messageMetaData;
+
+ private StoredMessage<MessageMetaData> _storedMessageHandle;
+
- public IncomingMessage(final Long messageId,
- final MessagePublishInfo info,
- final AMQProtocolSession publisher)
+ public IncomingMessage(
+ final MessagePublishInfo info
+ )
{
- _messageId = messageId;
_messagePublishInfo = info;
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
- _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody);
}
public void setExpiration()
@@ -122,11 +124,10 @@
}
- public MessageMetaData routingComplete(final MessageStore store,
- final MessageHandleFactory factory) throws AMQException
+ public MessageMetaData headersReceived()
{
- _messageHandle = factory.createMessageHandle(_messageId, store, isPersistent());
- return _messageHandle.setPublishAndContentHeaderBody(_messagePublishInfo, _contentHeaderBody);
+ _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+ return _messageMetaData;
}
@@ -135,20 +136,15 @@
return _destinationQueues;
}
-
- public AMQMessageHandle getMessageHandle()
- {
- return _messageHandle;
- }
-
-
public int addContentBodyFrame(final ContentChunk contentChunk)
throws AMQException
{
-
+ _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
- _messageHandle.addContentBodyFrame(contentChunk, allContentReceived());
+
+
+
return _receivedChunkCount++;
}
@@ -192,7 +188,7 @@
public AMQMessageHeader getMessageHeader()
{
- return _messageHeader;
+ return _messageMetaData.getMessageHeader();
}
public boolean isPersistent()
@@ -207,6 +203,7 @@
return false;
}
+
public long getSize()
{
return getContentHeader().bodySize;
@@ -214,7 +211,7 @@
public Long getMessageNumber()
{
- return _messageId;
+ return _storedMessageHandle.getMessageNumber();
}
public void setExchange(final Exchange e)
@@ -258,4 +255,46 @@
{
return _contentChunks.get(index);
}
+
+
+ public int getContent(ByteBuffer buf, int offset)
+ {
+ int pos = 0;
+ int written = 0;
+ for(ContentChunk cb : _contentChunks)
+ {
+ ByteBuffer data = cb.getData().buf();
+ if(offset+written >= pos && offset < pos + data.limit())
+ {
+ ByteBuffer src = data.duplicate();
+ src.position(offset+written - pos);
+ src = src.slice();
+
+ if(buf.remaining() < src.limit())
+ {
+ src.limit(buf.remaining());
+ }
+ int count = src.limit();
+ buf.put(src);
+ written += count;
+ if(buf.remaining() == 0)
+ {
+ break;
+ }
+ }
+ pos+=data.limit();
+ }
+ return written;
+
+ }
+
+ public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
+ {
+ _storedMessageHandle = storedMessageHandle;
+ }
+
+ public StoredMessage<MessageMetaData> getStoredMessage()
+ {
+ return _storedMessageHandle;
+ }
}
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+final class QueueContext implements AMQQueue.Context
+{
+ volatile QueueEntry _lastSeenEntry;
+ volatile QueueEntry _releasedEntry;
+
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ _lastSeenUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ _releasedUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueContext.class, QueueEntry.class, "_releasedEntry");
+
+ public QueueContext(QueueEntry head)
+ {
+ _lastSeenEntry = head;
+ }
+
+ public QueueEntry getLastSeenEntry()
+ {
+ return _lastSeenEntry;
+ }
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Oct 20 16:23:01 2009
@@ -180,7 +180,7 @@
boolean immediateAndNotDelivered();
- void setRedelivered(boolean b);
+ void setRedelivered();
boolean isRedelivered();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Oct 20 16:23:01 2009
@@ -27,12 +27,10 @@
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.log4j.Logger;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -80,8 +78,11 @@
private volatile long _entryId;
volatile QueueEntryImpl _next;
- private boolean _deliveredToConsumer;
- private boolean _redelivered;
+
+ private static final int DELIVERED_TO_CONSUMER = 1;
+ private static final int REDELIVERED = 2;
+
+ private volatile int _deliveryState;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
@@ -94,6 +95,7 @@
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
+
_message = message == null ? null : message.newReference();
_entryIdUpdater.set(this, entryId);
@@ -132,7 +134,7 @@
public boolean getDeliveredToConsumer()
{
- return _deliveredToConsumer;
+ return (_deliveryState & DELIVERED_TO_CONSUMER) != 0;
}
public boolean expired() throws AMQException
@@ -194,9 +196,9 @@
public boolean acquire(Subscription sub)
{
final boolean acquired = acquire(sub.getOwningState());
- if(acquired && !_deliveredToConsumer)
+ if(acquired)
{
- _deliveredToConsumer = true;
+ _deliveryState |= DELIVERED_TO_CONSUMER;
}
return acquired;
}
@@ -259,9 +261,9 @@
}
- public boolean immediateAndNotDelivered()
+ public boolean immediateAndNotDelivered()
{
- return !_deliveredToConsumer && isImmediate();
+ return !getDeliveredToConsumer() && isImmediate();
}
private boolean isImmediate()
@@ -270,9 +272,9 @@
return message != null && message.isImmediate();
}
- public void setRedelivered(boolean b)
+ public void setRedelivered()
{
- _redelivered = b;
+ _deliveryState |= REDELIVERED;
}
public AMQMessageHeader getMessageHeader()
@@ -289,7 +291,7 @@
public boolean isRedelivered()
{
- return _redelivered;
+ return (_deliveryState & REDELIVERED) != 0;
}
public Subscription getDeliveredSubscription()
@@ -329,7 +331,7 @@
}
public boolean isRejectedBy(Subscription subscription)
- {
+ {
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
@@ -410,9 +412,9 @@
final ServerMessage message = getMessage();
if(rerouteQueues != null && rerouteQueues.size() != 0)
{
- Transaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
- txn.enqueue(rerouteQueues, message, new Transaction.Action() {
+ txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
public void postCommit()
{
try
@@ -434,7 +436,7 @@
}
});
txn.dequeue(currentQueue,message,
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
{
@@ -523,7 +525,7 @@
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
return true;
}
else
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,9 +30,9 @@
{
VirtualHost getVirtualHost();
- void registerQueue(AMQQueue queue) throws AMQException;
+ void registerQueue(AMQQueue queue);
- void unregisterQueue(AMQShortString name) throws AMQException;
+ void unregisterQueue(AMQShortString name);
AMQQueue getQueue(AMQShortString name);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org