You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/07/02 16:17:46 UTC

svn commit: r552499 - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java perftests/pom.xml

Author: rupertlssmith
Date: Mon Jul  2 07:17:45 2007
New Revision: 552499

URL: http://svn.apache.org/viewvc?view=rev&rev=552499
Log:
Added some documentation.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
    incubator/qpid/branches/M2/java/perftests/pom.xml

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jul  2 07:17:45 2007
@@ -20,45 +20,44 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
-/** Combines the information that make up a deliverable message into a more manageable form. */
 
-import org.apache.log4j.Logger;
-
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-/** Combines the information that make up a deliverable message into a more manageable form. */
+/**
+ * A deliverable message.
+ */
 public class AMQMessage
 {
+    /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
-    /** Used in clustering */
+    /** Used in clustering. @todo What for? */
     private Set<Object> _tokens;
 
-    /** Only use in clustering - should ideally be removed? */
+    /** Only use in clustering. @todo What for? */
     private AMQProtocolSession _publisher;
 
     private final Long _messageId;
@@ -67,33 +66,27 @@
 
     private AMQMessageHandle _messageHandle;
 
-    // TODO: ideally this should be able to go into the transient message date - check this! (RG)
+    /** Holds the transactional context in which this message is being processed. */
     private TransactionalContext _txnContext;
 
     /**
-     * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
-     * messages published with the 'immediate' flag.
+     * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+     * for messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
-    /**
-     * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
-     * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
-     * removed from the store.
-     */
+
+    /** Flag to indicate that this message requires 'immediate' delivery. */
     private boolean _immediate;
 
-    //    private Subscription _takenBySubcription;
-    //    private AtomicBoolean _taken = new AtomicBoolean(false);
+    // private Subscription _takenBySubcription;
+    // private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
-
     private Set<Subscription> _rejectedBy = null;
 
-
     private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
     private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
-
     private final int hashcode = System.identityHashCode(this);
     private long _expiration;
 
@@ -104,8 +97,10 @@
 
     public void setExpiration()
     {
-        long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
-        long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+        long expiration =
+            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+        long timestamp =
+            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
 
         if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
         {
@@ -118,10 +113,10 @@
             {
                 if (timestamp != 0L)
                 {
-                    //todo perhaps use arrival time
+                    // todo perhaps use arrival time
                     long diff = (System.currentTimeMillis() - timestamp);
 
-                    if (diff > 1000L || diff < 1000L)
+                    if ((diff > 1000L) || (diff < 1000L))
                     {
                         _expiration = expiration + diff;
                     }
@@ -152,11 +147,12 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+                return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
             }
             catch (AMQException e)
             {
                 _log.error("Unable to get body count: " + e, e);
+
                 return false;
             }
         }
@@ -166,7 +162,10 @@
             try
             {
 
-                AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+                AMQBody cb =
+                    getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+                            _messageId, ++_index));
+
                 return new AMQFrame(_channel, cb);
             }
             catch (AMQException e)
@@ -202,11 +201,12 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+                return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
             }
             catch (AMQException e)
             {
                 _log.error("Error getting body count: " + e, e);
+
                 return false;
             }
         }
@@ -229,8 +229,7 @@
         }
     }
 
-    public AMQMessage(Long messageId, MessagePublishInfo info,
-                      TransactionalContext txnContext)
+    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext)
     {
         _messageId = messageId;
         _txnContext = txnContext;
@@ -250,7 +249,8 @@
      *
      * @throws AMQException
      */
-    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
+    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
+        throws AMQException
     {
         _messageId = messageId;
         _messageHandle = factory.createMessageHandle(messageId, store, true);
@@ -266,8 +266,8 @@
      * @param txnContext
      * @param contentHeader
      */
-    public AMQMessage(Long messageId, MessagePublishInfo info,
-                      TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
+    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+        ContentHeaderBody contentHeader) throws AMQException
     {
         this(messageId, info, txnContext);
         setContentHeaderBody(contentHeader);
@@ -285,11 +285,9 @@
      *
      * @throws AMQException
      */
-    public AMQMessage(Long messageId, MessagePublishInfo info,
-                      TransactionalContext txnContext,
-                      ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
-                      List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
-                      MessageHandleFactory messageHandleFactory) throws AMQException
+    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+        ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+        MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
     {
         this(messageId, info, txnContext, contentHeader);
         _transientMessageData.setDestinationQueues(destinationQueues);
@@ -331,13 +329,13 @@
         }
     }
 
-    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
-            throws AMQException
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
     {
         _transientMessageData.setContentHeaderBody(contentHeaderBody);
     }
 
-    public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException
+    public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
+        throws AMQException
     {
         final boolean persistent = isPersistent();
         _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
@@ -368,6 +366,7 @@
         if (allContentReceived)
         {
             deliver(storeContext);
+
             return true;
         }
         else
@@ -392,7 +391,8 @@
      */
     public AMQMessage takeReference()
     {
-        incrementReference();// _referenceCount.incrementAndGet();
+        incrementReference(); // _referenceCount.incrementAndGet();
+
         return this;
     }
 
@@ -400,10 +400,10 @@
     protected void incrementReference()
     {
         _referenceCount.incrementAndGet();
-//        if (_log.isDebugEnabled())
-//        {
-//            _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-//        }
+        // if (_log.isDebugEnabled())
+        // {
+        // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+        // }
     }
 
     /**
@@ -427,10 +427,10 @@
         {
             try
             {
-//                if (_log.isDebugEnabled())
-//                {
-//                    _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-//                }
+                // if (_log.isDebugEnabled())
+                // {
+                // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                // }
 
                 // must check if the handle is null since there may be cases where we decide to throw away a message
                 // and the handle has not yet been constructed
@@ -441,7 +441,7 @@
             }
             catch (AMQException e)
             {
-                //to maintain consistency, we revert the count
+                // to maintain consistency, we revert the count
                 incrementReference();
                 throw new MessageCleanupException(_messageId, e);
             }
@@ -450,7 +450,8 @@
         {
             if (count < 0)
             {
-                throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
+                throw new MessageCleanupException("Reference count for message id " + debugIdentity()
+                    + " has gone below 0.");
             }
         }
     }
@@ -477,7 +478,7 @@
 
     public boolean isTaken(AMQQueue queue)
     {
-        //return _taken.get();
+        // return _taken.get();
 
         synchronized (this)
         {
@@ -494,15 +495,15 @@
 
     public boolean taken(AMQQueue queue, Subscription sub)
     {
-//        if (_taken.getAndSet(true))
-//        {
-//            return true;
-//        }
-//        else
-//        {
-//            _takenBySubcription = sub;
-//            return false;
-//        }
+        // if (_taken.getAndSet(true))
+        // {
+        // return true;
+        // }
+        // else
+        // {
+        // _takenBySubcription = sub;
+        // return false;
+        // }
 
         synchronized (this)
         {
@@ -520,6 +521,7 @@
             {
                 _takenMap.put(queue, taken);
                 _takenBySubcriptionMap.put(queue, sub);
+
                 return false;
             }
         }
@@ -532,9 +534,8 @@
             _log.trace("Releasing Message:" + debugIdentity());
         }
 
-//        _taken.set(false);
-//        _takenBySubcription = null;
-
+        // _taken.set(false);
+        // _takenBySubcription = null;
 
         synchronized (this)
         {
@@ -568,6 +569,7 @@
         else
         {
             _tokens.add(token);
+
             return false;
         }
     }
@@ -629,6 +631,7 @@
         {
             pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
         }
+
         return pb;
     }
 
@@ -659,7 +662,7 @@
      */
     public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
     {
-        //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+        // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
 
         if (_expiration != 0L)
         {
@@ -668,6 +671,7 @@
             if (now > _expiration)
             {
                 dequeue(storecontext, queue);
+
                 return true;
             }
         }
@@ -690,12 +694,13 @@
         {
             _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
         }
+
         try
         {
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
-            _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
-                                                          _transientMessageData.getContentHeaderBody());
+            _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
+                _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
 
             // we then allow the transactional context to do something with the message content
             // now that it has all been received, before we attempt delivery
@@ -705,9 +710,9 @@
 
             for (AMQQueue q : destinationQueues)
             {
-                //Increment the references to this message for each queue delivery.
+                // Increment the references to this message for each queue delivery.
                 incrementReference();
-                //normal deliver so add this message at the end.
+                // normal deliver so add this message at the end.
                 _txnContext.deliver(this, q, false);
             }
         }
@@ -719,182 +724,181 @@
         }
     }
 
-/*
-    public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      getContentHeaderBody());
-
-        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-        if (bodyCount == 0)
+    /*
+        public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+                throws AMQException
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                       contentHeader);
+            ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
+            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                          getContentHeaderBody());
+
+            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+            if (bodyCount == 0)
+            {
+                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                                           contentHeader);
+
+                protocolSession.writeFrame(compositeBlock);
+            }
+            else
+            {
+
+                //
+                // Optimise the case where we have a single content body. In that case we create a composite block
+                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+                //
+                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+                protocolSession.writeFrame(compositeBlock);
+
+                //
+                // Now start writing out the other content bodies
+                //
+                for (int i = 1; i < bodyCount; i++)
+                {
+                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                }
+
+
+            }
+
 
-            protocolSession.writeFrame(compositeBlock);
         }
-        else
+
+        public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
         {
+            ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
+            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                          getContentHeaderBody());
+
+            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+            if (bodyCount == 0)
+            {
+                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                                           contentHeader);
+                protocolSession.writeFrame(compositeBlock);
+            }
+            else
+            {
 
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+                //
+                // Optimise the case where we have a single content body. In that case we create a composite block
+                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+                //
+                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+                protocolSession.writeFrame(compositeBlock);
+
+                //
+                // Now start writing out the other content bodies
+                //
+                for (int i = 1; i < bodyCount; i++)
+                {
+                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                }
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-            protocolSession.writeFrame(compositeBlock);
 
-            //
-            // Now start writing out the other content bodies
-            //
-            for (int i = 1; i < bodyCount; i++)
-            {
-                cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
         }
 
 
-    }
-
-    public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-        ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      getContentHeaderBody());
+        private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+                throws AMQException
+        {
+            MessagePublishInfo pb = getMessagePublishInfo();
+            AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
+                                                                    deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+                                                                    pb.getRoutingKey());
+            ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+            deliverFrame.writePayload(buf);
+            buf.flip();
+            return buf;
+        }
+
+        private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
+                throws AMQException
+        {
+            MessagePublishInfo pb = getMessagePublishInfo();
+            AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+                                                                protocolSession.getProtocolMajorVersion(),
+                                                                protocolSession.getProtocolMinorVersion(),
+                                                                deliveryTag, pb.getExchange(),
+                                                                queueSize,
+                                                                _messageHandle.isRedelivered(),
+                                                                pb.getRoutingKey());
+            ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+            getOkFrame.writePayload(buf);
+            buf.flip();
+            return buf;
+        }
 
-        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-        if (bodyCount == 0)
+        private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                       contentHeader);
-            protocolSession.writeFrame(compositeBlock);
+            AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+                                                                  protocolSession.getProtocolMajorVersion(),
+                                                                  protocolSession.getProtocolMinorVersion(),
+                                                                  getMessagePublishInfo().getExchange(),
+                                                                  replyCode, replyText,
+                                                                  getMessagePublishInfo().getRoutingKey());
+            ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+            returnFrame.writePayload(buf);
+            buf.flip();
+            return buf;
         }
-        else
+
+        public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
+                throws AMQException
         {
+            ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
 
+            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                          getContentHeaderBody());
+
+            Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
             //
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-            protocolSession.writeFrame(compositeBlock);
+            if (bodyFrameIterator.hasNext())
+            {
+                AMQDataBlock firstContentBody = bodyFrameIterator.next();
+                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+                protocolSession.writeFrame(compositeBlock);
+            }
+            else
+            {
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+                                                                                 new AMQDataBlock[]{contentHeader});
+                protocolSession.writeFrame(compositeBlock);
+            }
 
             //
             // Now start writing out the other content bodies
+            // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
             //
-            for (int i = 1; i < bodyCount; i++)
+            while (bodyFrameIterator.hasNext())
             {
-                cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                protocolSession.writeFrame(bodyFrameIterator.next());
             }
-
-
-        }
-
-
-    }
-
-
-    private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        MessagePublishInfo pb = getMessagePublishInfo();
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
-                                                                deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
-        ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
-        deliverFrame.writePayload(buf);
-        buf.flip();
-        return buf;
-    }
-
-    private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        MessagePublishInfo pb = getMessagePublishInfo();
-        AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
-                                                            protocolSession.getProtocolMajorVersion(),
-                                                            protocolSession.getProtocolMinorVersion(),
-                                                            deliveryTag, pb.getExchange(),
-                                                            queueSize,
-                                                            _messageHandle.isRedelivered(),
-                                                            pb.getRoutingKey());
-        ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
-        getOkFrame.writePayload(buf);
-        buf.flip();
-        return buf;
-    }
-
-    private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
-    {
-        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
-                                                              protocolSession.getProtocolMajorVersion(),
-                                                              protocolSession.getProtocolMinorVersion(),
-                                                              getMessagePublishInfo().getExchange(),
-                                                              replyCode, replyText,
-                                                              getMessagePublishInfo().getRoutingKey());
-        ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
-        returnFrame.writePayload(buf);
-        buf.flip();
-        return buf;
-    }
-
-    public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-        ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      getContentHeaderBody());
-
-        Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
-            protocolSession.writeFrame(compositeBlock);
-        }
-        else
-        {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
-                                                                             new AMQDataBlock[]{contentHeader});
-            protocolSession.writeFrame(compositeBlock);
-        }
-
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
-        {
-            protocolSession.writeFrame(bodyFrameIterator.next());
         }
-    }
-*/
+     */
 
     public AMQMessageHandle getMessageHandle()
     {
         return _messageHandle;
     }
 
-
     public long getSize()
     {
         try
@@ -906,12 +910,12 @@
         catch (AMQException e)
         {
             _log.error(e.toString(), e);
+
             return 0;
         }
 
     }
 
-
     public void restoreTransientMessageData() throws AMQException
     {
         TransientMessageData transientMessageData = new TransientMessageData();
@@ -921,25 +925,23 @@
         _transientMessageData = transientMessageData;
     }
 
-
     public void clearTransientMessageData()
     {
         _transientMessageData = null;
     }
 
-
     public String toString()
     {
-//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
-//               _taken + " by :" + _takenBySubcription;
+        // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+        // _taken + " by :" + _takenBySubcription;
 
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-               _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
+            + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
     public Subscription getDeliveredSubscription(AMQQueue queue)
     {
-//        return _takenBySubcription;
+        // return _takenBySubcription;
         synchronized (this)
         {
             return _takenBySubcriptionMap.get(queue);
@@ -967,7 +969,7 @@
     {
         boolean rejected = _rejectedBy != null;
 
-        if (rejected)  // We have subscriptions that rejected this message
+        if (rejected) // We have subscriptions that rejected this message
         {
             return _rejectedBy.contains(subscription);
         }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Mon Jul  2 07:17:45 2007
@@ -28,24 +28,144 @@
 import org.apache.qpid.server.store.StoreContext;
 
 /**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ * TransactionalContext provides a context in which transactional operations on {@link AMQMessage}s are performed.
+ * Different levels of transactional support for the delivery of messages may be provided by different implementations
+ * of this interface.
+ *
+ * <p/>The fundamental transactional operations that can be performed on a message queue are 'enqueue' and 'dequeue'.
+ * In this interface, these have been recast as the {@link #messageFullyReceived} and {@link #acknowledgeMessage}
+ * operations. This interface essentially provides a way to make enqueueing and dequeuing transactional.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Explicitly accept a transaction start notification.
+ * <tr><td> Commit all pending operations in a transaction.
+ * <tr><td> Rollback all pending operations in a transaction.
+ * <tr><td> Deliver a message to a queue as part of a transaction.
+ * <tr><td> Redeliver a message to a queue as part of a transaction.
+ * <tr><td> Mark a message as acknowledged as part of a transaction.
+ * <tr><td> Accept notification that a message has been completely received as part of a transaction.
+ * <tr><td> Accept notification that a message has been fully processed as part of a transaction.
+ * <tr><td> Associate a message store context with this transaction context.
+ * </table>
+ *
+ * @todo The 'fullyReceived' and 'messageProcessed' events sit uncomfortably in the responsibilities of a transactional
+ *       context. They are non-transactional operations, used to trigger other side-effects. Consider moving them
+ *       somewhere else, a seperate interface for example.
+ *
+ * @todo This transactional context could be written as a wrapper extension to a Queue implementation, that provides
+ *       transactional management of the enqueue and dequeue operations, with added commit/rollback methods. Any
+ *       queue implementation could be made transactional by wrapping it as a transactional queue. This would mean
+ *       that the enqueue/dequeue operations do not need to be recast as deliver/acknowledge operations, which may be
+ *       conceptually neater.
+ *
+ * For example:
+ * <pre>
+ * public interface Transactional
+ * {
+ *    public void commit();
+ *    public void rollback();
+ * }
+ *
+ * public interface TransactionalQueue<E> extends Transactional, SizeableQueue<E>
+ * {}
+ *
+ * public class Queues
+ * {
+ *    ...
+ *    // For transactional messaging, take a transactional view onto the queue.
+ *    public static <E> TransactionalQueue<E> getTransactionalQueue(SizeableQueue<E> queue) { ... }
+ *
+ *    // For non-transactional messaging, take a non-transactional view onto the queue.
+ *    public static <E> TransactionalQueue<E> getNonTransactionalQueue(SizeableQueue<E> queue) { ... }
+ * }
+ * </pre>
  */
 public interface TransactionalContext
 {
+    /**
+     * Explicitly begins the transaction, if it has not already been started. {@link #commit} or {@link #rollback}
+     * should automatically begin the next transaction in the chain.
+     *
+     * @throws AMQException If the transaction cannot be started for any reason.
+     */
     void beginTranIfNecessary() throws AMQException;
 
+    /**
+     * Makes all pending operations on the transaction permanent and visible.
+     *
+     * @throws AMQException If the transaction cannot be committed for any reason.
+     */
     void commit() throws AMQException;
 
+    /**
+     * Erases all pending operations on the transaction.
+     *
+     * @throws AMQException If the transaction cannot be committed for any reason.
+     */
     void rollback() throws AMQException;
 
+    /**
+     * Delivers the specified message to the specified queue. A 'deliverFirst' flag may be set if the message is a
+     * redelivery, and should be placed on the front of the queue.
+     *
+     * <p/>This is an 'enqueue' operation.
+     *
+     * @param message      The message to deliver.
+     * @param queue        The queue to deliver the message to.
+     * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
+     *                     for normal FIFO message ordering.
+     *
+     * @throws AMQException If the message cannot be delivered for any reason.
+     */
     void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
 
+    /**
+     * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
+     * setting the 'multiple' flag. It is also possible for the acknowledged message id to be zero, when the 'multiple'
+     * flag is set, in which case an acknowledgement up to the latest delivered message should be done.
+     *
+     * <p/>This is a 'dequeue' operation.
+     *
+     * @param deliveryTag              The id of the message to acknowledge, or zero, if using multiple acknowledgement
+     *                                 up to the latest message.
+     * @param lastDeliveryTag          The latest message delivered.
+     * @param multiple                 <tt>true</tt> if all message ids up the acknowledged one or latest delivered, are
+     *                                 to be acknowledged, <tt>false</tt> otherwise.
+     * @param unacknowledgedMessageMap The unacknowledged messages in the transaction, to remove the acknowledged message
+     *                                 from.
+     *
+     * @throws AMQException If the message cannot be acknowledged for any reason.
+     */
     void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
-                            UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+        UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
 
+    /**
+     * Notifies the transactional context that a message has been fully received. The actual message that was received
+     * is not specified. This event may be used to trigger a process related to the receipt of the message, for example,
+     * flushing its data to disk.
+     *
+     * @param persistent <tt>true</tt> if the received message is persistent, <tt>false</tt> otherwise.
+     *
+     * @throws AMQException If the fully received event cannot be processed for any reason.
+     */
     void messageFullyReceived(boolean persistent) throws AMQException;
 
+    /**
+     * Notifies the transactional context that a message has been delivered, succesfully or otherwise. The actual
+     * message that was delivered is not specified. This event may be used to trigger a process related to the
+     * outcome of the delivery of the message, for example, cleaning up failed deliveries.
+     *
+     * @param protocolSession The protocol session of the deliverable message.
+     *
+     * @throws AMQException If the message processed event cannot be handled for any reason.
+     */
     void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
 
+    /**
+     * Gets the message store context associated with this transactional context.
+     *
+     * @return The message store context associated with this transactional context.
+     */
     StoreContext getStoreContext();
 }

Modified: incubator/qpid/branches/M2/java/perftests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/pom.xml?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/pom.xml (original)
+++ incubator/qpid/branches/M2/java/perftests/pom.xml Mon Jul  2 07:17:45 2007
@@ -201,8 +201,8 @@
                         <TQC-Qpid-02>-n TQC-Qpid-02           -d1M  -s[1000]   -c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000   messageSize=256     destinationsCount=1  rate=1000  maxPending=1000000 </TQC-Qpid-02>
                         <TQC-Qpid-03>-n TQC-Qpid-03           -d10M -s[1000]   -c[10]              -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true  commitBatchSize=100 batchSize=1000   messageSize=256     destinationsCount=10 rate=0     maxPending=1000000 </TQC-Qpid-03>
                         <TQC-Qpid-04>-n TQC-Qpid-04           -d10M -s[1000]   -c[10]              -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000   messageSize=256     destinationsCount=10 rate=0     maxPending=1000000 </TQC-Qpid-04>
-                        <TQC-Qpid-05>-n TQC-Qpid-05         -d10M -s[1000]   -c[100]             -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true  commitBatchSize=100 batchSize=1000   messageSize=256     destinationsCount=10 rate=0     maxPending=100000  </TQC-Qpid-05>
-                        <TQC-Qpid-06>-n TQC-Qpid-06         -d10M -s[1000]   -c[100]             -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000   messageSize=256     destinationsCount=10 rate=0     maxPending=100000  </TQC-Qpid-06>
+                        <TQC-Qpid-05>-n TQC-Qpid-05           -d10M -s[1000]   -c[100]             -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true  commitBatchSize=100 batchSize=1000   messageSize=256     destinationsCount=10 rate=0     maxPending=100000  </TQC-Qpid-05>
+                        <TQC-Qpid-06>-n TQC-Qpid-06           -d10M -s[1000]   -c[100]             -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=100 batchSize=1000   messageSize=256     destinationsCount=10 rate=0     maxPending=100000  </TQC-Qpid-06>
 
                         <TQM-Qpid-01-512b>-n TQM-Qpid-01-512b -d10M -s[1000]   -c[1]               -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=true  commitBatchSize=10  batchSize=1000   messageSize=512     destinationsCount=1  rate=0     maxPending=20000000</TQM-Qpid-01-512b>
                         <TQM-Qpid-02-512b>-n TQM-Qpid-02-512b -d10M -s[1000]   -c[1]               -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false transacted=false commitBatchSize=10  batchSize=1000   messageSize=512     destinationsCount=1  rate=0     maxPending=20000000</TQM-Qpid-02-512b>