You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/02/13 12:24:45 UTC

svn commit: r744079 [1/2] - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/output/amqp0_8/ main/java/org/apache/qpid/server/output/amqp0_9/ main/java/org/apache/qpid/server/queue/ main/java/org/a...

Author: ritchiem
Date: Fri Feb 13 11:24:44 2009
New Revision: 744079

URL: http://svn.apache.org/viewvc?rev=744079&view=rev
Log:
QPID-1629 : Convered AMQMessage to Interface and created concrete Transient/PersistentAMQMessage implementations

Removed the use of WeakReferences from PersistentAMQMessage and therefore the need to have a StoreContext on get requests.

NOTE: this checking will break persistent recovery.

Coverted all uses of *MessageHandle to AMQMessage. A number of tests (SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message) still use a custom constructor on Transient/PersistentAMQMessage. This is because they have their own Message implemntations that are used for testing. However, I'm sure they could be modified to override the required functionality rather than attempt to use the existing Factory and Wrap the resulting Message. A new JIRA to address this QPID-1659.

QPID-1628 : The update to MessageFactory removes the commented out code

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
      - copied, changed from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
      - copied, changed from r744076, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryTest.java
      - copied, changed from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
      - copied, changed from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
      - copied, changed from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
Removed:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/InMemoryMessageHandleTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/WeakMessageHandleTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockContentChunk.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 13 11:24:44 2009
@@ -35,12 +35,12 @@
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -108,7 +108,7 @@
 
     private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
 
-    private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
+    private MessageFactory _messageHandleFactory = new MessageFactory();
 
     // Why do we need this reference ? - ritchiem
     private final AMQProtocolSession _session;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Fri Feb 13 11:24:44 2009
@@ -28,7 +28,6 @@
 
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,8 +36,6 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.AMQException;
 
-import org.apache.mina.common.ByteBuffer;
-
 import java.util.Iterator;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -79,11 +76,7 @@
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       message.getContentHeaderBody());
 
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-
-
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        final int bodyCount = message.getBodyCount();
 
         if(bodyCount == 0)
         {
@@ -100,7 +93,7 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+            ContentChunk cb = message.getContentChunk(0);
 
             AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -112,7 +105,7 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = messageHandle.getContentChunk(storeContext, i);
+                cb = message.getContentChunk(i);
                 writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
@@ -126,8 +119,6 @@
     public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
         final AMQMessage message = queueEntry.getMessage();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
 
         AMQDataBlock deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
 
@@ -135,7 +126,7 @@
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       message.getContentHeaderBody());
 
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        final int bodyCount = message.getBodyCount();
         if(bodyCount == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -150,7 +141,7 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+            ContentChunk cb = message.getContentChunk(0);
 
             AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -162,7 +153,7 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = messageHandle.getContentChunk(storeContext, i);
+                cb = message.getContentChunk(i);
                 writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
@@ -179,7 +170,6 @@
         final AMQMessage message = queueEntry.getMessage();
 
         final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
 
         MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
         BasicDeliverBody deliverBody =
@@ -188,18 +178,14 @@
                                                       queueEntry.isRedelivered(),
                                                       pb.getExchange(),
                                                       pb.getRoutingKey());
-        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
-
-
-        return deliverFrame;
+        return deliverBody.generateFrame(channelId);
     }
 
     private AMQDataBlock createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
             throws AMQException
     {
         final AMQMessage message = queueEntry.getMessage();
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final MessagePublishInfo pb = message.getMessagePublishInfo();        
 
         MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
         BasicGetOkBody getOkBody =
@@ -208,9 +194,7 @@
                                                     pb.getExchange(),
                                                     pb.getRoutingKey(),
                                                     queueSize);
-        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
-        return getOkFrame;
+        return getOkBody.generateFrame(channelId);
     }
 
     public byte getProtocolMinorVersion()
@@ -231,9 +215,8 @@
                                                      replyText,
                                                      message.getMessagePublishInfo().getExchange(),
                                                      message.getMessagePublishInfo().getRoutingKey());
-        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
-        return returnFrame;
+        return basicReturnBody.generateFrame(channelId);
+        
     }
 
     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Fri Feb 13 11:24:44 2009
@@ -28,9 +28,7 @@
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -77,12 +75,7 @@
         AMQBody deliverBody = createEncodedDeliverFrame(queueEntry, channelId, deliveryTag, consumerTag);
         final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
 
-
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-
-
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        final int bodyCount = message.getBodyCount();
 
         if(bodyCount == 0)
         {
@@ -99,7 +92,7 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+            ContentChunk cb = message.getContentChunk(0);
 
             AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
 
@@ -111,7 +104,7 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = messageHandle.getContentChunk(storeContext, i);
+                cb = message.getContentChunk(i);
                 writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
             }
 
@@ -123,9 +116,7 @@
     private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
     {
         
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      contentHeaderBody);
-        return contentHeader;
+        return ContentHeaderBody.createAMQFrame(channelId, contentHeaderBody);
     }
 
 
@@ -133,15 +124,13 @@
     {
 
         final AMQMessage message = queueEntry.getMessage();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
 
         AMQFrame deliver = createEncodedGetOkFrame(queueEntry, channelId, deliveryTag, queueSize);
 
 
         AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
 
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        final int bodyCount = message.getBodyCount();
         if(bodyCount == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -156,7 +145,7 @@
             // Optimise the case where we have a single content body. In that case we create a composite block
             // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
             //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+            ContentChunk cb = message.getContentChunk(0);
 
             AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
@@ -168,7 +157,7 @@
             //
             for(int i = 1; i < bodyCount; i++)
             {
-                cb = messageHandle.getContentChunk(storeContext, i);
+                cb = message.getContentChunk(i);
                 writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
             }
 
@@ -190,7 +179,7 @@
         final AMQShortString exchangeName = pb.getExchange();
         final AMQShortString routingKey = pb.getRoutingKey();
 
-        final AMQBody returnBlock = new AMQBody()
+        return new AMQBody()
         {
 
             public AMQBody _underlyingBody;
@@ -238,7 +227,6 @@
                 throw new AMQException("This block should never be dispatched!");
             }
         };
-        return returnBlock;
     }
 
     private AMQFrame createEncodedGetOkFrame(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize)
@@ -253,9 +241,7 @@
                                                     pb.getExchange(),
                                                     pb.getRoutingKey(),
                                                     queueSize);
-        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
-
-        return getOkFrame;
+        return getOkBody.generateFrame(channelId);
     }
 
     public byte getProtocolMinorVersion()
@@ -276,9 +262,7 @@
                                                      replyText,
                                                      message.getMessagePublishInfo().getExchange(),
                                                      message.getMessagePublishInfo().getRoutingKey());
-        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
-
-        return returnFrame;
+        return basicReturnBody.generateFrame(channelId);       
     }
 
     public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 13 11:24:44 2009
@@ -20,363 +20,52 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-
+import org.apache.qpid.AMQException;
 
 import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * A deliverable message.
- */
-public class AMQMessage
+public interface AMQMessage
 {
-    /** Used for debugging purposes. */
-    private static final Logger _log = Logger.getLogger(AMQMessage.class);
+    //Get Content relating to this message
 
-    private final AtomicInteger _referenceCount = new AtomicInteger(1);
+    Long getMessageId();
 
-    private final AMQMessageHandle _messageHandle;
+    Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel);
 
-    /** Holds the transactional context in which this message is being processed. */
-    private StoreContext _storeContext;
+    Iterator<ContentChunk> getContentBodyIterator();
 
-    /** Flag to indicate that this message requires 'immediate' delivery. */
+    ContentHeaderBody getContentHeaderBody();
 
-    private static final byte IMMEDIATE = 0x01;
+    ContentChunk getContentChunk(int index);
 
-    /**
-     * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
-     * for messages published with the 'immediate' flag.
-     */
-
-    private static final byte DELIVERED_TO_CONSUMER = 0x02;
+    Object getPublisherClientInstance();
 
-    private byte _flags = 0;
+    Object getPublisherIdentifier();
 
-    private long _expiration;
+    MessagePublishInfo getMessagePublishInfo();
 
-    private final long _size;
+    int getBodyCount();
 
-    private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
-    private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+    long getSize();
 
+    long getArrivalTime();
 
 
-    /**
-     * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
-     * therefore is memory-efficient.
-     */
-    private class BodyFrameIterator implements Iterator<AMQDataBlock>
-    {
-        private int _channel;
-
-        private int _index = -1;
-        private AMQProtocolSession _protocolSession;
-
-        private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
-        {
-            _channel = channel;
-            _protocolSession = protocolSession;
-        }
-
-        public boolean hasNext()
-        {
-            try
-            {
-                return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
-            }
-            catch (AMQException e)
-            {
-                _log.error("Unable to get body count: " + e, e);
-
-                return false;
-            }
-        }
-
-        public AMQDataBlock next()
-        {
-            try
-            {
-
-                AMQBody cb =
-                        getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
-                                                                                                         ++_index));
-
-                return new AMQFrame(_channel, cb);
-            }
-            catch (AMQException e)
-            {
-                // have no choice but to throw a runtime exception
-                throw new RuntimeException("Error getting content body: " + e, e);
-            }
-
-        }
-
-        private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
-        {
-            return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    public void clearStoreContext()
-    {
-        _storeContext = new StoreContext();
-    }
-
-    public StoreContext getStoreContext()
-    {
-        return _storeContext;
-    }
-
-    private class BodyContentIterator implements Iterator<ContentChunk>
-    {
-
-        private int _index = -1;
-
-        public boolean hasNext()
-        {
-            try
-            {
-                return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
-            }
-            catch (AMQException e)
-            {
-                _log.error("Error getting body count: " + e, e);
-
-                return false;
-            }
-        }
-
-        public ContentChunk next()
-        {
-            try
-            {
-                return _messageHandle.getContentChunk(getStoreContext(), ++_index);
-            }
-            catch (AMQException e)
-            {
-                throw new RuntimeException("Error getting content body: " + e, e);
-            }
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-
-
-    /**
-     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
-     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
-     * queues.
-     *
-     * @param messageId
-     * @param store
-     * @param factory
-     *
-     * @throws AMQException
-     */
-    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
-            throws AMQException
-    {
-        _messageHandle = factory.createMessageHandle(messageId, store, true);
-        _storeContext = txnConext.getStoreContext();
-        _size = _messageHandle.getBodySize(txnConext.getStoreContext());
-    }
-
-        /**
-     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
-     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
-     * queues.
-     *
-     * @param messageHandle
-     *
-     * @throws AMQException
-     */
-    public AMQMessage(
-                AMQMessageHandle messageHandle,
-                StoreContext storeConext,
-                MessagePublishInfo info)
-            throws AMQException
-    {
-        _messageHandle = messageHandle;
-        _storeContext = storeConext;
-
-        if(info.isImmediate())
-        {
-            _flags |= IMMEDIATE;
-        }
-        _size = messageHandle.getBodySize(storeConext);
-
-    }
-
-
-    protected AMQMessage(AMQMessage msg) throws AMQException
-    {
-        _messageHandle = msg._messageHandle;
-        _storeContext = msg._storeContext;
-        _flags = msg._flags;
-        _size = msg._size;
-
-    }
-
-
-    public String debugIdentity()
-    {
-        return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
-    }
-
-    public void setExpiration(final long expiration)
-    {
-
-        _expiration = expiration;
-
-    }
-
-    public boolean isReferenced()
-    {
-        return _referenceCount.get() > 0;
-    }
-
-    public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
-    {
-        return new BodyFrameIterator(protocolSession, channel);
-    }
-
-    public Iterator<ContentChunk> getContentBodyIterator()
-    {
-        return new BodyContentIterator();
-    }
-
-    public ContentHeaderBody getContentHeaderBody() throws AMQException
-    {
-        return _messageHandle.getContentHeaderBody(getStoreContext());
-    }
-
-
-
-    public Long getMessageId()
-    {
-        return _messageHandle.getMessageId();
-    }
-
-    /**
-     * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
-     * operation.
-     */
-    public AMQMessage takeReference()
-    {
-        incrementReference(); // _referenceCount.incrementAndGet();
-
-        return this;
-    }
-
-    public boolean incrementReference()
-    {
-        return incrementReference(1);
-    }
-
-    /* Threadsafe. Increment the reference count on the message. */
-    public boolean incrementReference(int count)
-    {
-        if(_referenceCount.addAndGet(count) <= 1)
-        {
-            _referenceCount.addAndGet(-count);
-            return false;
-        }
-        else
-        {
-            return true;
-        }
-
-    }
-
-    /**
-     * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
-     * message store.
-     *
-     * @param storeContext
-     *
-     * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
-     *                                 failed
-     */
-    public void decrementReference(StoreContext storeContext) throws MessageCleanupException
-    {
-
-        int count = _referenceCount.decrementAndGet();
-
-        // note that the operation of decrementing the reference count and then removing the message does not
-        // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
-        // the message has been passed to all queues. i.e. we are
-        // not relying on the all the increments having taken place before the delivery manager decrements.
-        if (count == 0)
-        {
-            // set the reference count way below 0 so that we can detect that the message has been deleted
-            // this is to guard against the message being spontaneously recreated (from the mgmt console)
-            // by copying from other queues at the same time as it is being removed.
-            _referenceCount.set(Integer.MIN_VALUE/2);
-
-            try
-            {
-                // must check if the handle is null since there may be cases where we decide to throw away a message
-                // and the handle has not yet been constructed
-                if (_messageHandle != null)
-                {
-                    _messageHandle.removeMessage(storeContext);
-                }
-            }
-            catch (AMQException e)
-            {
-                // to maintain consistency, we revert the count
-                incrementReference();
-                throw new MessageCleanupException(getMessageId(), e);
-            }
-        }
-        else
-        {
-            if (count < 0)
-            {
-                throw new MessageCleanupException("Reference count for message id " + debugIdentity()
-                                                  + " has gone below 0.");
-            }
-        }
-    }
 
+    //Check the status of this message
 
     /**
      * Called selectors to determin if the message has already been sent
      *
      * @return _deliveredToConsumer
      */
-    public boolean getDeliveredToConsumer()
-    {
-        return (_flags & DELIVERED_TO_CONSUMER) != 0;
-    }
-
-    public boolean isPersistent() throws AMQException
-    {
-        return _messageHandle.isPersistent();
-    }
+    boolean getDeliveredToConsumer();
 
     /**
      * Called to enforce the 'immediate' flag.
@@ -384,89 +73,62 @@
      * @returns  true if the message is marked for immediate delivery but has not been marked as delivered
      *                              to a consumer
      */
-    public boolean immediateAndNotDelivered() 
-    {
-
-        return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
-    }
-
-    public MessagePublishInfo getMessagePublishInfo() throws AMQException
-    {
-        return _messageHandle.getMessagePublishInfo(getStoreContext());
-    }
-
-    public long getArrivalTime()
-    {
-        return _messageHandle.getArrivalTime();
-    }
+    boolean immediateAndNotDelivered();
 
     /**
      * Checks to see if the message has expired. If it has the message is dequeued.
      *
-     * @param queue The queue to check the expiration against. (Currently not used)
-     *
      * @return true if the message has expire
      *
-     * @throws AMQException
+     * @throws org.apache.qpid.AMQException
      */
-    public boolean expired(AMQQueue queue) throws AMQException
-    {
-
-        if (_expiration != 0L)
-        {
-            long now = System.currentTimeMillis();
+    boolean expired() throws AMQException;
 
-            return (now > _expiration);
-        }
+    /** Is this a persistent message
+     *
+     * @return true if the message is persistent
+     */
+    boolean isPersistent();
 
-        return false;
-    }
 
     /**
      * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
      * And for selector efficiency.
      */
-    public void setDeliveredToConsumer()
-    {
-        _flags |= DELIVERED_TO_CONSUMER;
-    }
+    void setDeliveredToConsumer();
+
+    void setExpiration(long expiration);
+
+    void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier);
+
+    /**
+     * This is called when all the content has been received.
+     * @param storeContext
+     *@param messagePublishInfo
+     * @param contentHeaderBody @throws org.apache.qpid.AMQException
+     */
+    void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody)
+            throws AMQException;
+
+    void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+            throws AMQException;
+
+
+    void removeMessage(StoreContext storeContext) throws AMQException;
+
+    String toString();
+
+    String debugIdentity();
+
+    // Reference counting methods
 
+    void decrementReference(StoreContext storeContext) throws MessageCleanupException;
 
+    boolean incrementReference(int queueCount);
 
-    public AMQMessageHandle getMessageHandle()
-    {
-        return _messageHandle;
-    }
-
-    public long getSize()
-    {
-        return _size;
-
-    }
-
-    public Object getPublisherClientInstance()
-    {
-        return _sessionIdentifier.getSessionInstance();
-    }
-                                                                                          
-    public Object getPublisherIdentifier()
-    {
-        return _sessionIdentifier.getSessionIdentifier();
-    }
-
-    public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
-    {
-        _sessionIdentifier = sessionIdentifier;
-    }
-
-
-    public String toString()
-    {
-        // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
-        // _taken + " by :" + _takenBySubcription;
+    boolean incrementReference();
 
-        return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
-    }
+    AMQMessage takeReference();
 
+    boolean isReferenced();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Fri Feb 13 11:24:44 2009
@@ -353,29 +353,20 @@
             }
         }
 
-        try
+        // Create header attributes list
+        CommonContentHeaderProperties headerProperties =
+            (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+        String mimeType = null, encoding = null;
+        if (headerProperties != null)
         {
-            // Create header attributes list
-            CommonContentHeaderProperties headerProperties =
-                (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
-            String mimeType = null, encoding = null;
-            if (headerProperties != null)
-            {
-                AMQShortString mimeTypeShortSting = headerProperties.getContentType();
-                mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
-                encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
-            }
+            AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+            mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+            encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+        }
 
-            Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+        Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
 
-            return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
-        }
-        catch (AMQException e)
-        {
-            JMException jme = new JMException("Error creating header attributes list: " + e);
-            jme.initCause(e);
-            throw jme;
-        }
+        return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
     }
 
     /**
@@ -392,27 +383,18 @@
         List<QueueEntry> list = _queue.getMessagesOnTheQueue();
         TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
 
-        try
+        // Create the tabular list of message header contents
+        for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
         {
-            // Create the tabular list of message header contents
-            for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
-            {
-                QueueEntry queueEntry = list.get(i - 1);
-                AMQMessage msg = queueEntry.getMessage();
-                ContentHeaderBody headerBody = msg.getContentHeaderBody();
-                // Create header attributes list
-                String[] headerAttributes = getMessageHeaderProperties(headerBody);
-                Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
-                                        queueEntry.isRedelivered() };
-                CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
-                _messageList.put(messageData);
-            }
-        }
-        catch (AMQException e)
-        {
-            JMException jme = new JMException("Error creating message contents: " + e);
-            jme.initCause(e);
-            throw jme;
+            QueueEntry queueEntry = list.get(i - 1);
+            AMQMessage msg = queueEntry.getMessage();
+            ContentHeaderBody headerBody = msg.getContentHeaderBody();
+            // Create header attributes list
+            String[] headerAttributes = getMessageHeaderProperties(headerBody);
+            Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize,
+                                    queueEntry.isRedelivered() };
+            CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
+            _messageList.put(messageData);
         }
 
         return _messageList;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri Feb 13 11:24:44 2009
@@ -35,7 +35,6 @@
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collection;
 
 public class IncomingMessage implements Filterable<RuntimeException>
 {
@@ -48,7 +47,7 @@
 
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
-    private AMQMessageHandle _messageHandle;
+    private AMQMessage _message;
     private final Long _messageId;
     private final TransactionalContext _txnContext;
 
@@ -74,7 +73,6 @@
     
     private Exchange _exchange;
 
-
     public IncomingMessage(final Long messageId,
                            final MessagePublishInfo info,
                            final TransactionalContext txnContext,
@@ -124,11 +122,11 @@
     }
 
     public void routingComplete(final MessageStore store,
-                                final MessageHandleFactory factory) throws AMQException
+                                final MessageFactory factory) throws AMQException
     {
 
         final boolean persistent = isPersistent();
-        _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
+        _message = factory.createMessage(_messageId, store, persistent);
         if (persistent)
         {
             _txnContext.beginTranIfNecessary();
@@ -157,21 +155,16 @@
             _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
         }
 
-        AMQMessage message = null;
-
         try
         {
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
-            _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
-                                                          _messagePublishInfo, getContentHeaderBody());
+            _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
+
 
-            
-            
-            message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
 
-            message.setExpiration(_expiration);
-            message.setClientIdentifier(_publisher.getSessionIdentifier());
+            _message.setExpiration(_expiration);
+            _message.setClientIdentifier(_publisher.getSessionIdentifier());
 
             // we then allow the transactional context to do something with the message content
             // now that it has all been received, before we attempt delivery
@@ -182,7 +175,7 @@
             
             if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
             {
-                throw new UnauthorizedAccessException("Acccess Refused",message);
+                throw new UnauthorizedAccessException("Acccess Refused", _message);
             }
             
             if ((_destinationQueues == null) || _destinationQueues.size() == 0)
@@ -190,26 +183,26 @@
 
                 if (isMandatory() || isImmediate())
                 {
-                    throw new NoRouteException("No Route for message", message);
+                    throw new NoRouteException("No Route for message", _message);
 
                 }
                 else
                 {
-                    _logger.warn("MESSAGE DISCARDED: No routes for message - " + message);
+                    _logger.warn("MESSAGE DISCARDED: No routes for message - " + _message);
                 }
             }
             else
             {
                 int offset;
                 final int queueCount = _destinationQueues.size();
-                message.incrementReference(queueCount);
+                _message.incrementReference(queueCount);
                 if(queueCount == 1)
                 {
                     offset = 0;
                 }
                 else
                 {
-                    offset = ((int)(message.getMessageId().longValue())) % queueCount;
+                    offset = ((int)(_message.getMessageId().longValue())) % queueCount;
                     if(offset < 0)
                     {
                         offset = -offset;
@@ -218,22 +211,21 @@
                 for (int i = offset; i < queueCount; i++)
                 {
                     // normal deliver so add this message at the end.
-                    _txnContext.deliver(_destinationQueues.get(i), message);
+                    _txnContext.deliver(_destinationQueues.get(i), _message);
                 }
                 for (int i = 0; i < offset; i++)
                 {
                     // normal deliver so add this message at the end.
-                    _txnContext.deliver(_destinationQueues.get(i), message);
+                    _txnContext.deliver(_destinationQueues.get(i), _message);
                 }
             }
 
-            message.clearStoreContext();
-            return message;
+            return _message;
         }
         finally
         {
             // Remove refence for routing process . Reference count should now == delivered queue count
-            if(message != null) message.decrementReference(_txnContext.getStoreContext());
+            if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
         }
 
     }
@@ -244,7 +236,7 @@
 
         _bodyLengthReceived += contentChunk.getSize();
 
-        _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
+        _message.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
 
     }
 

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java (from r744076, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java&r1=744076&r2=744079&rev=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java Fri Feb 13 11:24:44 2009
@@ -20,18 +20,22 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
-public class MockAMQMessageHandle extends InMemoryMessageHandle
+public class MessageFactory
 {
-    public MockAMQMessageHandle(final Long messageId)
-    {
-        super(messageId);
-    }
 
-    @Override
-    public long getBodySize(StoreContext store)
+    public AMQMessage createMessage(Long messageId, MessageStore store, boolean persistent)
     {
-      return 0l;
+        if (persistent)
+        {
+            return new PersistentAMQMessage(messageId, store);
+        }
+        else
+        {
+            return new TransientAMQMessage(messageId);
+        }
     }
+    
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Fri Feb 13 11:24:44 2009
@@ -47,20 +47,13 @@
             if(maximumMessageSize != 0)
             {
                 // Check for threshold message size
-                long messageSize;
-                try
-                {
-                    messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
-                }
-                catch (AMQException e)
-                {
-                    messageSize = 0;
-                }
-
+                long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
 
                 if (messageSize >= maximumMessageSize)
                 {
-                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
+                                                        maximumMessageSize + ") breached. [Message ID=" +
+                                                        (msg == null ? "null" : msg.getMessageId()) + "]");
                     return true;
                 }
             }
@@ -110,7 +103,7 @@
                 }
             }
             return false;
-                    
+
         }
 
     }

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java?rev=744079&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java Fri Feb 13 11:24:44 2009
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+public class PersistentAMQMessage extends TransientAMQMessage
+{
+    protected MessageStore _messageStore;
+
+    public PersistentAMQMessage(Long messageId, MessageStore store)
+    {
+        super(messageId);
+        _messageStore = store;
+    }
+
+    @Override
+    public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+            throws AMQException
+    {
+        super.addContentBodyFrame(storeContext, contentChunk, isLastContentBody);
+        _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
+                                            contentChunk, isLastContentBody);
+    }
+
+    @Override
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+                                               ContentHeaderBody contentHeaderBody)
+            throws AMQException
+    {
+        super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
+        MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody, _contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
+
+        _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+    }
+
+    @Override
+    public void removeMessage(StoreContext storeContext) throws AMQException
+    {
+        _messageStore.removeMessage(storeContext, _messageId);
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return true;
+    }
+
+    public void recoverFromMessageMetaData(MessageMetaData mmd)
+    {
+        _arrivalTime = mmd.getArrivalTime();
+        _contentHeaderBody = mmd.getContentHeaderBody();
+        _messagePublishInfo = mmd.getMessagePublishInfo();
+    }
+
+    public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
+    {
+        super.addContentBodyFrame(null, contentChunk, isLastContentBody);
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Fri Feb 13 11:24:44 2009
@@ -54,25 +54,16 @@
 
     public QueueEntry add(AMQMessage message)
     {
-        try
+        int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+        if(index >= _priorities)
         {
-            int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
-            if(index >= _priorities)
-            {
-                index = _priorities-1;
-            }
-            else if(index < 0)
-            {
-                index = 0;
-            }
-            return _priorityLists[index].add(message);
+            index = _priorities-1;
         }
-        catch (AMQException e)
+        else if(index < 0)
         {
-            // TODO - fix AMQ Exception
-            throw new RuntimeException(e);
+            index = 0;
         }
-
+        return _priorityLists[index].add(message);
     }
 
     public QueueEntry next(QueueEntry node)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Feb 13 11:24:44 2009
@@ -132,7 +132,7 @@
 
     public boolean expired() throws AMQException
     {
-        return getMessage().expired(getQueue());
+        return getMessage().expired();
     }
 
     public boolean isAcquired()

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java (from r744076, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java&r1=744076&r2=744079&rev=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java Fri Feb 13 11:24:44 2009
@@ -30,28 +30,35 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A deliverable message.
  */
-public class AMQMessage
+public class TransientAMQMessage implements AMQMessage
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
     private final AtomicInteger _referenceCount = new AtomicInteger(1);
 
-    private final AMQMessageHandle _messageHandle;
+    protected ContentHeaderBody _contentHeaderBody;
+
+    protected MessagePublishInfo _messagePublishInfo;
+
+    protected List<ContentChunk> _contentBodies;
+
+    protected long _arrivalTime;
+
+    protected final Long _messageId;
+
 
-    /** Holds the transactional context in which this message is being processed. */
-    private StoreContext _storeContext;
 
     /** Flag to indicate that this message requires 'immediate' delivery. */
 
@@ -68,13 +75,9 @@
 
     private long _expiration;
 
-    private final long _size;
-
     private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
 
-
-
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
      * therefore is memory-efficient.
@@ -94,35 +97,15 @@
 
         public boolean hasNext()
         {
-            try
-            {
-                return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
-            }
-            catch (AMQException e)
-            {
-                _log.error("Unable to get body count: " + e, e);
-
-                return false;
-            }
+            return _index < (getBodyCount() - 1);
         }
 
         public AMQDataBlock next()
         {
-            try
-            {
-
-                AMQBody cb =
-                        getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
-                                                                                                         ++_index));
-
-                return new AMQFrame(_channel, cb);
-            }
-            catch (AMQException e)
-            {
-                // have no choice but to throw a runtime exception
-                throw new RuntimeException("Error getting content body: " + e, e);
-            }
+            AMQBody cb =
+                    getProtocolVersionMethodConverter().convertToBody(getContentChunk(++_index));
 
+            return new AMQFrame(_channel, cb);
         }
 
         private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
@@ -136,16 +119,6 @@
         }
     }
 
-    public void clearStoreContext()
-    {
-        _storeContext = new StoreContext();
-    }
-
-    public StoreContext getStoreContext()
-    {
-        return _storeContext;
-    }
-
     private class BodyContentIterator implements Iterator<ContentChunk>
     {
 
@@ -153,28 +126,12 @@
 
         public boolean hasNext()
         {
-            try
-            {
-                return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
-            }
-            catch (AMQException e)
-            {
-                _log.error("Error getting body count: " + e, e);
-
-                return false;
-            }
+            return _index < (getBodyCount() - 1);
         }
 
         public ContentChunk next()
         {
-            try
-            {
-                return _messageHandle.getContentChunk(getStoreContext(), ++_index);
-            }
-            catch (AMQException e)
-            {
-                throw new RuntimeException("Error getting content body: " + e, e);
-            }
+            return getContentChunk(++_index);
         }
 
         public void remove()
@@ -183,64 +140,32 @@
         }
     }
 
+    /**
+     * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
+     * These all need refactoring to some sort of MockAMQMessageFactory.
+     */ 
+    @Deprecated
+    protected TransientAMQMessage(AMQMessage message) throws AMQException
+    {
+        _messageId = message.getMessageId();
+        _flags = ((TransientAMQMessage)message)._flags;
+        _contentHeaderBody = message.getContentHeaderBody();
+        _messagePublishInfo = message.getMessagePublishInfo();
+    }
 
 
     /**
-     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
-     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
-     * queues.
+     * Normal message creation via the MessageFactory uses this constructor
+     * Package scope limited as MessageFactory should be used
+     * @see MessageFactory
      *
      * @param messageId
-     * @param store
-     * @param factory
-     *
-     * @throws AMQException
      */
-    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
-            throws AMQException
+    TransientAMQMessage(Long messageId)
     {
-        _messageHandle = factory.createMessageHandle(messageId, store, true);
-        _storeContext = txnConext.getStoreContext();
-        _size = _messageHandle.getBodySize(txnConext.getStoreContext());
+        _messageId = messageId;
     }
 
-        /**
-     * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
-     * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
-     * queues.
-     *
-     * @param messageHandle
-     *
-     * @throws AMQException
-     */
-    public AMQMessage(
-                AMQMessageHandle messageHandle,
-                StoreContext storeConext,
-                MessagePublishInfo info)
-            throws AMQException
-    {
-        _messageHandle = messageHandle;
-        _storeContext = storeConext;
-
-        if(info.isImmediate())
-        {
-            _flags |= IMMEDIATE;
-        }
-        _size = messageHandle.getBodySize(storeConext);
-
-    }
-
-
-    protected AMQMessage(AMQMessage msg) throws AMQException
-    {
-        _messageHandle = msg._messageHandle;
-        _storeContext = msg._storeContext;
-        _flags = msg._flags;
-        _size = msg._size;
-
-    }
-
-
     public String debugIdentity()
     {
         return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
@@ -248,9 +173,7 @@
 
     public void setExpiration(final long expiration)
     {
-
         _expiration = expiration;
-
     }
 
     public boolean isReferenced()
@@ -268,16 +191,15 @@
         return new BodyContentIterator();
     }
 
-    public ContentHeaderBody getContentHeaderBody() throws AMQException
+
+    public ContentHeaderBody getContentHeaderBody()
     {
-        return _messageHandle.getContentHeaderBody(getStoreContext());
+        return _contentHeaderBody;
     }
 
-
-
     public Long getMessageId()
     {
-        return _messageHandle.getMessageId();
+        return _messageId;
     }
 
     /**
@@ -340,10 +262,8 @@
             {
                 // must check if the handle is null since there may be cases where we decide to throw away a message
                 // and the handle has not yet been constructed
-                if (_messageHandle != null)
-                {
-                    _messageHandle.removeMessage(storeContext);
-                }
+                // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
+                removeMessage(storeContext);
             }
             catch (AMQException e)
             {
@@ -373,11 +293,6 @@
         return (_flags & DELIVERED_TO_CONSUMER) != 0;
     }
 
-    public boolean isPersistent() throws AMQException
-    {
-        return _messageHandle.isPersistent();
-    }
-
     /**
      * Called to enforce the 'immediate' flag.
      *
@@ -391,26 +306,14 @@
 
     }
 
-    public MessagePublishInfo getMessagePublishInfo() throws AMQException
-    {
-        return _messageHandle.getMessagePublishInfo(getStoreContext());
-    }
-
-    public long getArrivalTime()
-    {
-        return _messageHandle.getArrivalTime();
-    }
-
     /**
      * Checks to see if the message has expired. If it has the message is dequeued.
      *
-     * @param queue The queue to check the expiration against. (Currently not used)
-     *
      * @return true if the message has expire
      *
      * @throws AMQException
      */
-    public boolean expired(AMQQueue queue) throws AMQException
+    public boolean expired() throws AMQException
     {
 
         if (_expiration != 0L)
@@ -433,16 +336,9 @@
     }
 
 
-
-    public AMQMessageHandle getMessageHandle()
-    {
-        return _messageHandle;
-    }
-
     public long getSize()
     {
-        return _size;
-
+        return _contentHeaderBody.bodySize;
     }
 
     public Object getPublisherClientInstance()
@@ -460,6 +356,107 @@
         _sessionIdentifier = sessionIdentifier;
     }
 
+    /** From AMQMessageHandle **/
+
+    public int getBodyCount()
+    {
+        return _contentBodies.size();
+    }
+
+    public ContentChunk getContentChunk(int index)
+    {
+        if(_contentBodies == null)
+        {
+            throw new RuntimeException("No ContentBody has been set");
+        }
+
+        if (index > _contentBodies.size() - 1 || index < 0)
+        {
+            throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+                                               (_contentBodies.size() - 1));
+        }
+        return _contentBodies.get(index);
+    }
+
+    public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
+            throws AMQException
+    {
+        if(_contentBodies == null)
+        {
+            if(isLastContentBody)
+            {
+                _contentBodies = Collections.singletonList(contentChunk);
+            }
+            else
+            {
+                _contentBodies = new ArrayList<ContentChunk>();
+                _contentBodies.add(contentChunk);
+            }
+        }
+        else
+        {
+            _contentBodies.add(contentChunk);
+        }
+    }
+
+    public MessagePublishInfo getMessagePublishInfo()
+    {
+        return _messagePublishInfo;
+    }
+
+    public boolean isPersistent()
+    {
+        return false;
+    }
+
+    /**
+     * This is called when all the content has been received.
+     * @param storeContext
+     *@param messagePublishInfo
+     * @param contentHeaderBody @throws AMQException
+     */
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
+                                               ContentHeaderBody contentHeaderBody)
+            throws AMQException
+    {
+
+        if (contentHeaderBody == null)
+        {
+            throw new NullPointerException("HeaderBody cannot be null");
+        }
+
+        if( messagePublishInfo == null)
+        {
+            throw new NullPointerException("PublishInfo cannot be null");
+        }
+
+        _messagePublishInfo = messagePublishInfo;
+        _contentHeaderBody = contentHeaderBody;
+
+
+        if( contentHeaderBody.bodySize == 0)
+        {
+            _contentBodies = Collections.EMPTY_LIST;
+        }       
+
+        _arrivalTime = System.currentTimeMillis();
+
+        if(messagePublishInfo.isImmediate())
+        {
+            _flags |= IMMEDIATE;
+        }
+    }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
+    public void removeMessage(StoreContext storeContext) throws AMQException
+    {
+        //no-op
+    }
+
 
     public String toString()
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Fri Feb 13 11:24:44 2009
@@ -27,8 +27,8 @@
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.QueueRegistry;
 
+import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -93,7 +93,7 @@
 
     private String _connectionURL;
 
-
+    MessageFactory _messageFactory;
 
     private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
     private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
@@ -167,6 +167,8 @@
 
         // this recovers durable queues and persistent messages
 
+        _messageFactory = new MessageFactory();
+
         recover();
 
         stateTransition(State.RECOVERING, State.STARTED);
@@ -1299,7 +1301,7 @@
     private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
         throws SQLException, AMQException
     {
-        Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+        Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
         List<ProcessAction> actions = new ArrayList<ProcessAction>();
 
         Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1318,8 +1320,6 @@
                 conn = newConnection();
             }
 
-
-            MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
             long maxId = 1;
 
             TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
@@ -1355,7 +1355,11 @@
                 }
                 else
                 {
-                    message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
+                    message = _messageFactory.createMessage(messageId, this, true);                    
+
+                    _logger.error("todo must do message recovery now.");
+                    //todo must do message recovery now.
+
                     msgMap.put(messageId,message);
                 }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Fri Feb 13 11:24:44 2009
@@ -26,7 +26,6 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
@@ -349,32 +348,15 @@
 
             arrival.add("" + msg.getArrivalTime());
 
-            try
-            {
-                ispersitent.add(msg.isPersistent() ? "true" : "false");
-            }
-            catch (AMQException e)
-            {
-                ispersitent.add("n/a");
-            }
+            ispersitent.add(msg.isPersistent() ? "true" : "false");
 
             isredelivered.add(entry.isRedelivered() ? "true" : "false");
 
             isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
 
-//        msg.getMessageHandle();
-
             BasicContentHeaderProperties headers = null;
 
-            try
-            {
-                headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
-            }
-            catch (AMQException e)
-            {
-                //ignore
-//                commandError("Unable to read properties for message: " + e.getMessage(), null);
-            }
+            headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
 
             if (headers != null)
             {
@@ -414,15 +396,7 @@
                 AMQShortString useridSS = headers.getUserId();
                 userid.add(useridSS == null ? "null" : useridSS.toString());
 
-                MessagePublishInfo info = null;
-                try
-                {
-                    info = msg.getMessagePublishInfo();
-                }
-                catch (AMQException e)
-                {
-                    //ignore
-                }
+                MessagePublishInfo info = msg.getMessagePublishInfo();
 
                 if (info != null)
                 {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Fri Feb 13 11:24:44 2009
@@ -22,14 +22,13 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.SimpleQueueEntryList;
 import org.apache.qpid.server.queue.MockAMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntryIterator;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.MockSubscription;
@@ -38,7 +37,6 @@
 import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
-import java.util.Iterator;
 
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -62,7 +60,7 @@
 
     UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
     private static final int INITIAL_MSG_COUNT = 10;
-    private AMQQueue _queue = new MockAMQQueue();
+    private AMQQueue _queue = new MockAMQQueue("ExtractResendAndRequeueTest");
     private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
 
     @Override

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Fri Feb 13 11:24:44 2009
@@ -28,11 +28,11 @@
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQMessageHandle;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.TransientAMQMessage;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.server.store.StoreContext;
@@ -113,6 +113,8 @@
         private StoreContext _storeContext = new StoreContext();
 		private AMQQueue _queue;
 
+        private static final int MESSAGE_SIZE=100;
+
         Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception
         {
             TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(),
@@ -128,7 +130,12 @@
 
                 MessagePublishInfo info = new MessagePublishInfoImpl();
 
-                TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+                AMQMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
+
+                ContentHeaderBody header = new ContentHeaderBody();
+                header.bodySize = MESSAGE_SIZE;
+                message.setPublishAndContentHeaderBody(_storeContext, info, header);
+
                 _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
             }
             _acked = acked;
@@ -190,16 +197,15 @@
         }
     }
 
-    private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
+    private static AMQMessage createMessage(final long messageId, final MessagePublishInfo publishBody)
     {
-        final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
-                                                                                                   null,
-                                                                                                   false);
+        final AMQMessage amqMessage = (new MessageFactory()).createMessage(messageId,
+                                                                           null,
+                                                                           false);
         try
         {
-            amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
-                                                              publishBody,
-                                                              new ContentHeaderBody()
+            // Safe to use null here as we just created a TransientMessage above
+            amqMessage.setPublishAndContentHeaderBody(null, publishBody, new ContentHeaderBody()
             {
                 public int getSize()
                 {
@@ -213,11 +219,11 @@
         }
 
 
-        return amqMessageHandle;
+        return amqMessage;
     }
 
 
-    private class TestMessage extends AMQMessage
+    private class TestMessage extends TransientAMQMessage
     {
         private final long _tag;
         private int _count;
@@ -225,7 +231,7 @@
         TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
                 throws AMQException
         {
-            super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+            super(createMessage(messageId, publishBody));
             _tag = tag;
         }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Fri Feb 13 11:24:44 2009
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
@@ -54,7 +55,7 @@
 
     private StoreContext _storeContext = new StoreContext();
 
-    private MessageHandleFactory _handleFactory = new MessageHandleFactory();
+    private MessageFactory _handleFactory = new MessageFactory();
 
     private int count;
 
@@ -370,7 +371,7 @@
     /**
      * Just add some extra utility methods to AMQMessage to aid testing.
      */
-    static class Message extends AMQMessage
+    static class Message extends PersistentAMQMessage
     {
         private class TestIncomingMessage extends IncomingMessage
         {
@@ -392,14 +393,7 @@
 
             public ContentHeaderBody getContentHeaderBody()
             {
-                try
-                {
-                    return Message.this.getContentHeaderBody();
-                }
-                catch (AMQException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                return Message.this.getContentHeaderBody();
             }
         }
 
@@ -407,10 +401,7 @@
 
         private static MessageStore _messageStore = new SkeletonMessageStore();
 
-        private static StoreContext _storeContext = new StoreContext();
-
-
-        private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
+        private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, new StoreContext(),
                                                                                       null,
                                                                          new LinkedList<RequiredDeliveryException>()
         );
@@ -422,7 +413,7 @@
 
         Message(String id, FieldTable headers) throws AMQException
         {
-            this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+            this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers));
         }
 
         public IncomingMessage getIncomingMessage()
@@ -432,42 +423,35 @@
 
         private Message(long messageId,
                         MessagePublishInfo publish,
-                        ContentHeaderBody header,
-                        List<ContentBody> bodies) throws AMQException
-        {
-            super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
-
-
-            
-            _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
-            _incoming.setContentHeaderBody(header);
-
-
-        }
-
-        private static AMQMessageHandle createMessageHandle(final long messageId,
-                                                            final MessagePublishInfo publish,
-                                                            final ContentHeaderBody header)
+                        ContentHeaderBody header) throws AMQException
         {
-
-            final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
-                                                                                                       _messageStore,
-                                                                                                       true);
+            super(messageId, _messageStore);
 
             try
             {
-                amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
+                setPublishAndContentHeaderBody(_txnContext.getStoreContext(), publish,header);
             }
             catch (AMQException e)
             {
-                
+
             }
-            return amqMessageHandle;
+
+            _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore));
+            _incoming.setContentHeaderBody(header);
         }
 
         private Message(AMQMessage msg) throws AMQException
         {
-            super(msg);
+            super(msg.getMessageId(), _messageStore);
+
+            this.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), msg.getMessagePublishInfo(), msg.getContentHeaderBody());
+
+            Iterator<ContentChunk> iterator = msg.getContentBodyIterator();
+
+            while(iterator.hasNext())
+            {
+                this.addContentBodyFrame(_txnContext.getStoreContext(), iterator.next(),iterator.hasNext());
+            }
         }
 
 
@@ -500,15 +484,7 @@
 
         private Object getKey()
         {
-            try
-            {
-                return getMessagePublishInfo().getRoutingKey();
-            }
-            catch (AMQException e)
-            {
-                _log.error("Error getting routing key: " + e, e);
-                return null;
-            }
+            return getMessagePublishInfo().getRoutingKey();
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=744079&r1=744078&r2=744079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Fri Feb 13 11:24:44 2009
@@ -497,7 +497,7 @@
             throws AMQException
     {
         _exchange.route(message);
-        message.routingComplete(_store, new MessageHandleFactory());
+        message.routingComplete(_store, new MessageFactory());
         message.deliverToQueues();
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org