You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/02/13 19:10:58 UTC
svn commit: r627552 - in /incubator/qpid/branches/M2.1/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/se...
Author: rgodfrey
Date: Wed Feb 13 10:10:53 2008
New Revision: 627552
URL: http://svn.apache.org/viewvc?rev=627552&view=rev
Log:
QPID-790 : Performance Improvements
Added:
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Feb 13 10:10:53 2008
@@ -35,6 +35,7 @@
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageStore;
@@ -199,11 +200,12 @@
_prefetch_HighWaterMark = prefetchCount;
}
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException
{
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext);
_currentMessage.setPublisher(publisher);
+ _currentMessage.setExchange(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
@@ -285,7 +287,7 @@
{
try
{
- _exchanges.routeContent(_currentMessage);
+ _currentMessage.route();
}
catch (NoRouteException e)
{
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Wed Feb 13 10:10:53 2008
@@ -239,7 +239,7 @@
{
MessagePublishInfo info = payload.getMessagePublishInfo();
- final AMQShortString routingKey = normalize(info.getRoutingKey());
+ final AMQShortString routingKey = info.getRoutingKey();
List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Wed Feb 13 10:10:53 2008
@@ -91,7 +91,8 @@
}
MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
- channel.setPublishFrame(info, session);
+ info.setExchange(exchange);
+ channel.setPublishFrame(info, session, e);
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Wed Feb 13 10:10:53 2008
@@ -72,7 +72,7 @@
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -127,7 +127,7 @@
final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -171,7 +171,7 @@
}
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -187,10 +187,10 @@
AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
- return deliverFrame.toByteBuffer();
+ return deliverFrame;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
@@ -205,7 +205,7 @@
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -218,7 +218,7 @@
return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
BasicReturnBody basicReturnBody =
@@ -228,13 +228,13 @@
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Wed Feb 13 10:10:53 2008
@@ -16,6 +16,7 @@
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
public static Factory getInstanceFactory()
@@ -46,9 +47,10 @@
public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- ByteBuffer deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
+ final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- message.getContentHeaderBody());
+ contentHeaderBody);
final AMQMessageHandle messageHandle = message.getMessageHandle();
final StoreContext storeContext = message.getStoreContext();
@@ -101,7 +103,7 @@
final StoreContext storeContext = message.getStoreContext();
final long messageId = message.getMessageId();
- ByteBuffer deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
+ AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
@@ -145,41 +147,54 @@
}
- private ByteBuffer createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+ private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicDeliverBody deliverBody =
- methodRegistry.createBasicDeliverBody(consumerTag,
- deliveryTag,
- messageHandle.isRedelivered(),
- pb.getExchange(),
- pb.getRoutingKey());
- AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+ final boolean isRedelivered = messageHandle.isRedelivered();
+ final AMQShortString exchangeName = pb.getExchange();
+ final AMQShortString routingKey = pb.getRoutingKey();
+
+ final AMQDataBlock returnBlock = new DeferredDataBlock()
+ {
+
+ protected AMQDataBlock createAMQDataBlock()
+ {
+ BasicDeliverBody deliverBody =
+ METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+ AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
+
+
+ return deliverFrame;
- return deliverFrame.toByteBuffer();
+ }
+ };
+ return returnBlock;
}
- private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+ private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
final MessagePublishInfo pb = message.getMessagePublishInfo();
final AMQMessageHandle messageHandle = message.getMessageHandle();
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicGetOkBody getOkBody =
- methodRegistry.createBasicGetOkBody(deliveryTag,
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
messageHandle.isRedelivered(),
pb.getExchange(),
pb.getRoutingKey(),
queueSize);
AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
- return getOkFrame.toByteBuffer();
+ return getOkFrame;
}
public byte getProtocolMinorVersion()
@@ -192,23 +207,23 @@
return getProtocolSession().getProtocolMajorVersion();
}
- private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+
BasicReturnBody basicReturnBody =
- methodRegistry.createBasicReturnBody(replyCode,
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
replyText,
message.getMessagePublishInfo().getExchange(),
message.getMessagePublishInfo().getRoutingKey());
AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
- return returnFrame.toByteBuffer();
+ return returnFrame;
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
throws AMQException
{
- ByteBuffer returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
+ AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
message.getContentHeaderBody());
@@ -252,8 +267,8 @@
public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
{
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
- BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Feb 13 10:10:53 2008
@@ -35,6 +35,7 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.exchange.Exchange;
import java.util.HashMap;
import java.util.HashSet;
@@ -84,6 +85,10 @@
private final int hashcode = System.identityHashCode(this);
+ private Exchange _exchange;
+ private static final boolean SYNCED_CLOCKS =
+ ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);
+
public String debugIdentity()
{
@@ -97,7 +102,7 @@
long timestamp =
((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
- if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+ if (SYNCED_CLOCKS)
{
_expiration = expiration;
}
@@ -124,6 +129,16 @@
public boolean isReferenced()
{
return _referenceCount.get() > 0;
+ }
+
+ public void setExchange(final Exchange exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void route() throws AMQException
+ {
+ _exchange.route(this);
}
/**
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Wed Feb 13 10:10:53 2008
@@ -589,6 +589,11 @@
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
public boolean isImmediate()
{
return false;
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Wed Feb 13 10:10:53 2008
@@ -242,6 +242,11 @@
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return immediate;
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Wed Feb 13 10:10:53 2008
@@ -234,6 +234,11 @@
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return immediate;
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Wed Feb 13 10:10:53 2008
@@ -24,7 +24,7 @@
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
- private ByteBuffer _encodedBlock;
+ private AMQDataBlock _firstFrame;
private AMQDataBlock[] _blocks;
@@ -39,10 +39,10 @@
* @param encodedBlock already-encoded data
* @param blocks some blocks to be encoded.
*/
- public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks)
+ public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
{
this(blocks);
- _encodedBlock = encodedBlock;
+ _firstFrame = encodedBlock;
}
public AMQDataBlock[] getBlocks()
@@ -50,9 +50,9 @@
return _blocks;
}
- public ByteBuffer getEncodedBlock()
+ public AMQDataBlock getFirstFrame()
{
- return _encodedBlock;
+ return _firstFrame;
}
public long getSize()
@@ -62,19 +62,18 @@
{
frameSize += _blocks[i].getSize();
}
- if (_encodedBlock != null)
+ if (_firstFrame != null)
{
- _encodedBlock.rewind();
- frameSize += _encodedBlock.remaining();
+ frameSize += _firstFrame.getSize();
}
return frameSize;
}
public void writePayload(ByteBuffer buffer)
{
- if (_encodedBlock != null)
+ if (_firstFrame != null)
{
- buffer.put(_encodedBlock);
+ _firstFrame.writePayload(buffer);
}
for (int i = 0; i < _blocks.length; i++)
{
@@ -91,7 +90,7 @@
else
{
StringBuilder buf = new StringBuilder(this.getClass().getName());
- buf.append("{encodedBlock=").append(_encodedBlock);
+ buf.append("{encodedBlock=").append(_firstFrame);
for (int i = 0 ; i < _blocks.length; i++)
{
buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");
Added: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java?rev=627552&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java (added)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java Wed Feb 13 10:10:53 2008
@@ -0,0 +1,50 @@
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public abstract class DeferredDataBlock extends AMQDataBlock
+{
+ private AMQDataBlock _underlyingDataBlock;
+
+
+ public long getSize()
+ {
+ if(_underlyingDataBlock == null)
+ {
+ _underlyingDataBlock = createAMQDataBlock();
+ }
+ return _underlyingDataBlock.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingDataBlock == null)
+ {
+ _underlyingDataBlock = createAMQDataBlock();
+ }
+ _underlyingDataBlock.writePayload(buffer);
+ }
+
+ abstract protected AMQDataBlock createAMQDataBlock();
+
+}
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Wed Feb 13 10:10:53 2008
@@ -25,7 +25,7 @@
public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
- private ByteBuffer _encodedBlock;
+ private AMQDataBlock _firstFrame;
private AMQDataBlock _block;
@@ -40,10 +40,10 @@
* @param encodedBlock already-encoded data
* @param block a block to be encoded.
*/
- public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block)
+ public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block)
{
this(block);
- _encodedBlock = encodedBlock;
+ _firstFrame = encodedBlock;
}
public AMQDataBlock getBlock()
@@ -51,28 +51,28 @@
return _block;
}
- public ByteBuffer getEncodedBlock()
+ public AMQDataBlock getFirstFrame()
{
- return _encodedBlock;
+ return _firstFrame;
}
public long getSize()
{
long frameSize = _block.getSize();
- if (_encodedBlock != null)
+ if (_firstFrame != null)
{
- _encodedBlock.rewind();
- frameSize += _encodedBlock.remaining();
+
+ frameSize += _firstFrame.getSize();
}
return frameSize;
}
public void writePayload(ByteBuffer buffer)
{
- if (_encodedBlock != null)
+ if (_firstFrame != null)
{
- buffer.put(_encodedBlock);
+ _firstFrame.writePayload(buffer);
}
_block.writePayload(buffer);
@@ -87,7 +87,7 @@
else
{
StringBuilder buf = new StringBuilder(this.getClass().getName());
- buf.append("{encodedBlock=").append(_encodedBlock);
+ buf.append("{encodedBlock=").append(_firstFrame);
buf.append(" _block=[").append(_block.toString()).append("]");
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java Wed Feb 13 10:10:53 2008
@@ -27,6 +27,8 @@
public AMQShortString getExchange();
+ public void setExchange(AMQShortString exchange);
+
public boolean isImmediate();
public boolean isMandatory();
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Wed Feb 13 10:10:53 2008
@@ -67,7 +67,7 @@
final AMQShortString exchange = publishBody.getExchange();
final AMQShortString routingKey = publishBody.getRoutingKey();
- return new MethodConverter_0_9.MessagePublishInfoImpl(exchange == null ? null : exchange.intern(),
+ return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
publishBody.getImmediate(),
publishBody.getMandatory(),
routingKey == null ? null : routingKey.intern());
@@ -87,7 +87,7 @@
private static class MessagePublishInfoImpl implements MessagePublishInfo
{
- private final AMQShortString _exchange;
+ private AMQShortString _exchange;
private final boolean _immediate;
private final boolean _mandatory;
private final AMQShortString _routingKey;
@@ -106,6 +106,11 @@
public AMQShortString getExchange()
{
return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
}
public boolean isImmediate()
Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Wed Feb 13 10:10:53 2008
@@ -107,7 +107,7 @@
private static class MessagePublishInfoImpl implements MessagePublishInfo
{
- private final AMQShortString _exchange;
+ private AMQShortString _exchange;
private final boolean _immediate;
private final boolean _mandatory;
private final AMQShortString _routingKey;
@@ -126,6 +126,11 @@
public AMQShortString getExchange()
{
return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
}
public boolean isImmediate()
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Wed Feb 13 10:10:53 2008
@@ -117,6 +117,11 @@
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return false;
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Wed Feb 13 10:10:53 2008
@@ -107,6 +107,11 @@
return new AMQShortString("someExchange");
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return false;
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Wed Feb 13 10:10:53 2008
@@ -70,6 +70,11 @@
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return immediate;
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=627552&r1=627551&r2=627552&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Wed Feb 13 10:10:53 2008
@@ -61,6 +61,11 @@
return null;
}
+ public void setExchange(AMQShortString exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isImmediate()
{
return false;