You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/07/02 16:17:46 UTC
svn commit: r552499 - in /incubator/qpid/branches/M2/java:
broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
perftests/pom.xml
Author: rupertlssmith
Date: Mon Jul 2 07:17:45 2007
New Revision: 552499
URL: http://svn.apache.org/viewvc?view=rev&rev=552499
Log:
Added some documentation.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
incubator/qpid/branches/M2/java/perftests/pom.xml
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jul 2 07:17:45 2007
@@ -20,45 +20,44 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
-/** Combines the information that make up a deliverable message into a more manageable form. */
-import org.apache.log4j.Logger;
-
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-/** Combines the information that make up a deliverable message into a more manageable form. */
+/**
+ * A deliverable message.
+ */
public class AMQMessage
{
+ /** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /** Used in clustering */
+ /** Used in clustering. @todo What for? */
private Set<Object> _tokens;
- /** Only use in clustering - should ideally be removed? */
+ /** Only use in clustering. @todo What for? */
private AMQProtocolSession _publisher;
private final Long _messageId;
@@ -67,33 +66,27 @@
private AMQMessageHandle _messageHandle;
- // TODO: ideally this should be able to go into the transient message date - check this! (RG)
+ /** Holds the transactional context in which this message is being processed. */
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
- * messages published with the 'immediate' flag.
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
- /**
- * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
- * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
- * removed from the store.
- */
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
private boolean _immediate;
- // private Subscription _takenBySubcription;
- // private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
-
private Set<Subscription> _rejectedBy = null;
-
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
-
private final int hashcode = System.identityHashCode(this);
private long _expiration;
@@ -104,8 +97,10 @@
public void setExpiration()
{
- long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
- long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+ long expiration =
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp =
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
{
@@ -118,10 +113,10 @@
{
if (timestamp != 0L)
{
- //todo perhaps use arrival time
+ // todo perhaps use arrival time
long diff = (System.currentTimeMillis() - timestamp);
- if (diff > 1000L || diff < 1000L)
+ if ((diff > 1000L) || (diff < 1000L))
{
_expiration = expiration + diff;
}
@@ -152,11 +147,12 @@
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+ return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Unable to get body count: " + e, e);
+
return false;
}
}
@@ -166,7 +162,10 @@
try
{
- AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+ AMQBody cb =
+ getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+ _messageId, ++_index));
+
return new AMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -202,11 +201,12 @@
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+ return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Error getting body count: " + e, e);
+
return false;
}
}
@@ -229,8 +229,7 @@
}
}
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext)
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
@@ -250,7 +249,8 @@
*
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
+ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
+ throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
@@ -266,8 +266,8 @@
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+ ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
@@ -285,11 +285,9 @@
*
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext,
- ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
- List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
- MessageHandleFactory messageHandleFactory) throws AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+ MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
@@ -331,13 +329,13 @@
}
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- throws AMQException
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_transientMessageData.setContentHeaderBody(contentHeaderBody);
}
- public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException
+ public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
+ throws AMQException
{
final boolean persistent = isPersistent();
_messageHandle = factory.createMessageHandle(_messageId, store, persistent);
@@ -368,6 +366,7 @@
if (allContentReceived)
{
deliver(storeContext);
+
return true;
}
else
@@ -392,7 +391,8 @@
*/
public AMQMessage takeReference()
{
- incrementReference();// _referenceCount.incrementAndGet();
+ incrementReference(); // _referenceCount.incrementAndGet();
+
return this;
}
@@ -400,10 +400,10 @@
protected void incrementReference()
{
_referenceCount.incrementAndGet();
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
}
/**
@@ -427,10 +427,10 @@
{
try
{
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
@@ -441,7 +441,7 @@
}
catch (AMQException e)
{
- //to maintain consistency, we revert the count
+ // to maintain consistency, we revert the count
incrementReference();
throw new MessageCleanupException(_messageId, e);
}
@@ -450,7 +450,8 @@
{
if (count < 0)
{
- throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity()
+ + " has gone below 0.");
}
}
}
@@ -477,7 +478,7 @@
public boolean isTaken(AMQQueue queue)
{
- //return _taken.get();
+ // return _taken.get();
synchronized (this)
{
@@ -494,15 +495,15 @@
public boolean taken(AMQQueue queue, Subscription sub)
{
-// if (_taken.getAndSet(true))
-// {
-// return true;
-// }
-// else
-// {
-// _takenBySubcription = sub;
-// return false;
-// }
+ // if (_taken.getAndSet(true))
+ // {
+ // return true;
+ // }
+ // else
+ // {
+ // _takenBySubcription = sub;
+ // return false;
+ // }
synchronized (this)
{
@@ -520,6 +521,7 @@
{
_takenMap.put(queue, taken);
_takenBySubcriptionMap.put(queue, sub);
+
return false;
}
}
@@ -532,9 +534,8 @@
_log.trace("Releasing Message:" + debugIdentity());
}
-// _taken.set(false);
-// _takenBySubcription = null;
-
+ // _taken.set(false);
+ // _takenBySubcription = null;
synchronized (this)
{
@@ -568,6 +569,7 @@
else
{
_tokens.add(token);
+
return false;
}
}
@@ -629,6 +631,7 @@
{
pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
+
return pb;
}
@@ -659,7 +662,7 @@
*/
public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
{
- //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+ // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
if (_expiration != 0L)
{
@@ -668,6 +671,7 @@
if (now > _expiration)
{
dequeue(storecontext, queue);
+
return true;
}
}
@@ -690,12 +694,13 @@
{
_log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
}
+
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
- _transientMessageData.getContentHeaderBody());
+ _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
+ _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -705,9 +710,9 @@
for (AMQQueue q : destinationQueues)
{
- //Increment the references to this message for each queue delivery.
+ // Increment the references to this message for each queue delivery.
incrementReference();
- //normal deliver so add this message at the end.
+ // normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
}
@@ -719,182 +724,181 @@
}
}
-/*
- public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
+ /*
+ public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
- protocolSession.writeFrame(compositeBlock);
}
- else
+
+ public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
{
+ ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
}
- }
-
- public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
+ private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
+ deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
+ private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- protocolSession.writeFrame(compositeBlock);
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+ protocolSession.getProtocolMajorVersion(),
+ protocolSession.getProtocolMinorVersion(),
+ getMessagePublishInfo().getExchange(),
+ replyCode, replyText,
+ getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
}
- else
+
+ public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
{
+ ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+ new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
//
// Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
//
- for (int i = 1; i < bodyCount; i++)
+ while (bodyFrameIterator.hasNext())
{
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ protocolSession.writeFrame(bodyFrameIterator.next());
}
-
-
- }
-
-
- }
-
-
- private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- getMessagePublishInfo().getExchange(),
- replyCode, replyText,
- getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
- ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
- protocolSession.writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- protocolSession.writeFrame(bodyFrameIterator.next());
}
- }
-*/
+ */
public AMQMessageHandle getMessageHandle()
{
return _messageHandle;
}
-
public long getSize()
{
try
@@ -906,12 +910,12 @@
catch (AMQException e)
{
_log.error(e.toString(), e);
+
return 0;
}
}
-
public void restoreTransientMessageData() throws AMQException
{
TransientMessageData transientMessageData = new TransientMessageData();
@@ -921,25 +925,23 @@
_transientMessageData = transientMessageData;
}
-
public void clearTransientMessageData()
{
_transientMessageData = null;
}
-
public String toString()
{
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
-// _taken + " by :" + _takenBySubcription;
+ // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ // _taken + " by :" + _takenBySubcription;
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
- _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
+ + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
-// return _takenBySubcription;
+ // return _takenBySubcription;
synchronized (this)
{
return _takenBySubcriptionMap.get(queue);
@@ -967,7 +969,7 @@
{
boolean rejected = _rejectedBy != null;
- if (rejected) // We have subscriptions that rejected this message
+ if (rejected) // We have subscriptions that rejected this message
{
return _rejectedBy.contains(subscription);
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Mon Jul 2 07:17:45 2007
@@ -28,24 +28,144 @@
import org.apache.qpid.server.store.StoreContext;
/**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ * TransactionalContext provides a context in which transactional operations on {@link AMQMessage}s are performed.
+ * Different levels of transactional support for the delivery of messages may be provided by different implementations
+ * of this interface.
+ *
+ * <p/>The fundamental transactional operations that can be performed on a message queue are 'enqueue' and 'dequeue'.
+ * In this interface, these have been recast as the {@link #messageFullyReceived} and {@link #acknowledgeMessage}
+ * operations. This interface essentially provides a way to make enqueueing and dequeuing transactional.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Explicitly accept a transaction start notification.
+ * <tr><td> Commit all pending operations in a transaction.
+ * <tr><td> Rollback all pending operations in a transaction.
+ * <tr><td> Deliver a message to a queue as part of a transaction.
+ * <tr><td> Redeliver a message to a queue as part of a transaction.
+ * <tr><td> Mark a message as acknowledged as part of a transaction.
+ * <tr><td> Accept notification that a message has been completely received as part of a transaction.
+ * <tr><td> Accept notification that a message has been fully processed as part of a transaction.
+ * <tr><td> Associate a message store context with this transaction context.
+ * </table>
+ *
+ * @todo The 'fullyReceived' and 'messageProcessed' events sit uncomfortably in the responsibilities of a transactional
+ * context. They are non-transactional operations, used to trigger other side-effects. Consider moving them
+ * somewhere else, a seperate interface for example.
+ *
+ * @todo This transactional context could be written as a wrapper extension to a Queue implementation, that provides
+ * transactional management of the enqueue and dequeue operations, with added commit/rollback methods. Any
+ * queue implementation could be made transactional by wrapping it as a transactional queue. This would mean
+ * that the enqueue/dequeue operations do not need to be recast as deliver/acknowledge operations, which may be
+ * conceptually neater.
+ *
+ * For example:
+ * <pre>
+ * public interface Transactional
+ * {
+ * public void commit();
+ * public void rollback();
+ * }
+ *
+ * public interface TransactionalQueue<E> extends Transactional, SizeableQueue<E>
+ * {}
+ *
+ * public class Queues
+ * {
+ * ...
+ * // For transactional messaging, take a transactional view onto the queue.
+ * public static <E> TransactionalQueue<E> getTransactionalQueue(SizeableQueue<E> queue) { ... }
+ *
+ * // For non-transactional messaging, take a non-transactional view onto the queue.
+ * public static <E> TransactionalQueue<E> getNonTransactionalQueue(SizeableQueue<E> queue) { ... }
+ * }
+ * </pre>
*/
public interface TransactionalContext
{
+ /**
+ * Explicitly begins the transaction, if it has not already been started. {@link #commit} or {@link #rollback}
+ * should automatically begin the next transaction in the chain.
+ *
+ * @throws AMQException If the transaction cannot be started for any reason.
+ */
void beginTranIfNecessary() throws AMQException;
+ /**
+ * Makes all pending operations on the transaction permanent and visible.
+ *
+ * @throws AMQException If the transaction cannot be committed for any reason.
+ */
void commit() throws AMQException;
+ /**
+ * Erases all pending operations on the transaction.
+ *
+ * @throws AMQException If the transaction cannot be committed for any reason.
+ */
void rollback() throws AMQException;
+ /**
+ * Delivers the specified message to the specified queue. A 'deliverFirst' flag may be set if the message is a
+ * redelivery, and should be placed on the front of the queue.
+ *
+ * <p/>This is an 'enqueue' operation.
+ *
+ * @param message The message to deliver.
+ * @param queue The queue to deliver the message to.
+ * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
+ * for normal FIFO message ordering.
+ *
+ * @throws AMQException If the message cannot be delivered for any reason.
+ */
void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
+ /**
+ * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
+ * setting the 'multiple' flag. It is also possible for the acknowledged message id to be zero, when the 'multiple'
+ * flag is set, in which case an acknowledgement up to the latest delivered message should be done.
+ *
+ * <p/>This is a 'dequeue' operation.
+ *
+ * @param deliveryTag The id of the message to acknowledge, or zero, if using multiple acknowledgement
+ * up to the latest message.
+ * @param lastDeliveryTag The latest message delivered.
+ * @param multiple <tt>true</tt> if all message ids up the acknowledged one or latest delivered, are
+ * to be acknowledged, <tt>false</tt> otherwise.
+ * @param unacknowledgedMessageMap The unacknowledged messages in the transaction, to remove the acknowledged message
+ * from.
+ *
+ * @throws AMQException If the message cannot be acknowledged for any reason.
+ */
void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
- UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+ /**
+ * Notifies the transactional context that a message has been fully received. The actual message that was received
+ * is not specified. This event may be used to trigger a process related to the receipt of the message, for example,
+ * flushing its data to disk.
+ *
+ * @param persistent <tt>true</tt> if the received message is persistent, <tt>false</tt> otherwise.
+ *
+ * @throws AMQException If the fully received event cannot be processed for any reason.
+ */
void messageFullyReceived(boolean persistent) throws AMQException;
+ /**
+ * Notifies the transactional context that a message has been delivered, succesfully or otherwise. The actual
+ * message that was delivered is not specified. This event may be used to trigger a process related to the
+ * outcome of the delivery of the message, for example, cleaning up failed deliveries.
+ *
+ * @param protocolSession The protocol session of the deliverable message.
+ *
+ * @throws AMQException If the message processed event cannot be handled for any reason.
+ */
void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
+ /**
+ * Gets the message store context associated with this transactional context.
+ *
+ * @return The message store context associated with this transactional context.
+ */
StoreContext getStoreContext();
}
Modified: incubator/qpid/branches/M2/java/perftests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/pom.xml?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/pom.xml (original)
+++ incubator/qpid/branches/M2/java/perftests/pom.xml Mon Jul 2 07:17:45 2007
@@ -201,8 +201,8 @@
<TQC-Qpid-02>-n TQC-Qpid-02 -d1M -s[1000] -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=1 rate=1000 maxPending=1000000 </TQC-Qpid-02>
<TQC-Qpid-03>-n TQC-Qpid-03 -d10M -s[1000] -c[10] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=1000000 </TQC-Qpid-03>
<TQC-Qpid-04>-n TQC-Qpid-04 -d10M -s[1000] -c[10] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=1000000 </TQC-Qpid-04>
- <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-05>
- <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-06>
+ <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-05>
+ <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[1000] -c[100] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000 messageSize=256 destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-06>
<TQM-Qpid-01-512b>-n TQM-Qpid-01-512b -d10M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true commitBatchSize=10 batchSize=1000 messageSize=512 destinationsCount=1 rate=0 maxPending=20000000</TQM-Qpid-01-512b>
<TQM-Qpid-02-512b>-n TQM-Qpid-02-512b -d10M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=10 batchSize=1000 messageSize=512 destinationsCount=1 rate=0 maxPending=20000000</TQM-Qpid-02-512b>