You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/08 14:38:38 UTC

svn commit: r504887 - in /incubator/qpid/branches/perftesting/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/queue/ client/src/main/java/org/apach...

Author: ritchiem
Date: Thu Feb  8 05:38:37 2007
New Revision: 504887

URL: http://svn.apache.org/viewvc?view=rev&rev=504887
Log:
QPID-
Broker

AMQChannel - Resend modified to add messages to Subscription resendQueue.
BasicRecoverMethodHandler - Now makes use of the Requeue boolean (needs test case, but is same logic as TxRollback)
TxRollbackHandler - Removed protocol Session from AMQChannel.resend()
AMQMessage - Changes comments, updated taken() to record the subscription that took the message 
AMQQueue - Added DeliveryManager to Subscription constructors.
ConcurrentSelectorDeliveryManager - updated to get queue from Subscription and to know when the Subscriptions have content that needs Async delivery.

DeliveryManager - added update method to allow a subscription to tell DM it has content to send.
Subscription - new methods to handle resendQueue
SubscriptionFactory - changes to pass in the DeliveryManager
SubscriptionImpl - Comment changes, Constructor changes, implmentations of interface

Client

Recover and TxRollback now perform their broker methods while suspended.

RecoverTest - Added addition asserts to prevent NPEs
CommitRollbackTest - word change
RemoteSubscriptionImpl/SubscriptionTestHelper - Subscription implementation
AckTest - Update for new SubscriptionImpl constructor


Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
    incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb  8 05:38:37 2007
@@ -36,6 +36,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.Subscription;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.TxnBuffer;
 import org.apache.qpid.server.txn.TxnOp;
@@ -388,7 +389,7 @@
     }
 
     /** Called to resend all outstanding unacknowledged messages to this same channel. */
-    public void resend(AMQProtocolSession session) throws AMQException
+    public void resend() throws AMQException
     {
         //messages go to this channel
         synchronized (_unacknowledgedMessageMapLock)
@@ -400,17 +401,35 @@
             {
                 Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next();
 
-                long deliveryTag = entry.getKey();
+                //long deliveryTag = entry.getKey();
                 String consumerTag = entry.getValue().consumerTag;
 
                 if (_consumerTag2QueueMap.containsKey(consumerTag))
                 {
                     AMQMessage msg = entry.getValue().message;
                     msg.setRedelivered(true);
-                    session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+                    Subscription sub = msg.getDeliveredSubscription();
+
+                    if (sub != null)
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Requeuing " + msg + " for resend");
+                        }
+
+                        sub.addToResendQueue(msg);
+                    }
+                    else
+                    {
+                        _log.error("DeliveredSubscription not recorded");
+                    }
+
+                    // Don't write the frame as the DeliveryManager can now deal with it
+                    //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
                 }
                 else
-                {
+                { // The current consumer has gone so we need to requeue
+
                     UnacknowledgedMessage unacked = entry.getValue();
 
                     if (unacked.queue != null)
@@ -426,6 +445,8 @@
                 }
             }
         }
+
+        //fixme need to start the async delivery here.
     }
 
     /**

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java Thu Feb  8 05:38:37 2007
@@ -46,12 +46,22 @@
                                ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
                                AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
     {
-        _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());        
+        _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
         AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+
         if (channel == null)
         {
             throw new AMQException("Unknown channel " + evt.getChannelId());
         }
-        channel.resend(protocolSession);
+
+        if (evt.getMethod().getRequeue())
+        {
+            //fixme need tests to exercise
+            channel.requeue();
+        }
+        else
+        {
+            channel.resend();
+        }
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Thu Feb  8 05:38:37 2007
@@ -48,17 +48,23 @@
                                ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
                                AMQMethodEvent<TxRollbackBody> evt) throws AMQException
     {
-        try{
+        try
+        {
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+
             channel.rollback();
+
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
+            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
-            channel.resend(protocolSession);
-        }catch(AMQException e){
+            channel.resend();           
+        }
+        catch (AMQException e)
+        {
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
         }
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Feb  8 05:38:37 2007
@@ -39,9 +39,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.ConcurrentHashMap;
 
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
 public class AMQMessage
 {
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -66,36 +64,29 @@
 
     private long _arrivalTime;
 
-    /**
-     * Keeps a track of how many bytes we have received in body frames
-     */
+    /** Keeps a track of how many bytes we have received in body frames */
     private long _bodyLengthReceived = 0;
 
-    /**
-     * The message store in which this message is contained.
-     */
+    /** The message store in which this message is contained. */
     private transient final MessageStore _store;
 
     /**
-     * For non transactional publishes, a message can be stored as
-     * soon as it is complete. For transactional messages it doesnt
-     * need to be stored until the transaction is committed.
+     * For non transactional publishes, a message can be stored as soon as it is complete. For transactional messages it
+     * doesnt need to be stored until the transaction is committed.
      */
     private boolean _storeWhenComplete;
 
-    /**
-     * TxnBuffer for transactionally published messages
-     */
+    /** TxnBuffer for transactionally published messages */
     private TxnBuffer _txnBuffer;
 
     /**
-     * Flag to indicate whether message has been delivered to a
-     * consumer. Used in implementing return functionality for
+     * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
      * messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
     private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
     private AtomicBoolean _taken;
+    private Subscription _takenBySubcription;
 
 
     public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -282,9 +273,7 @@
         return _messageId;
     }
 
-    /**
-     * Threadsafe. Increment the reference count on the message.
-     */
+    /** Threadsafe. Increment the reference count on the message. */
     public void incrementReference()
     {
         _referenceCount.incrementAndGet();
@@ -390,9 +379,8 @@
     /**
      * Called to enforce the 'immediate' flag.
      *
-     * @throws NoConsumersException if the message is marked for
-     *                              immediate delivery but has not been marked as delivered to a
-     *                              consumer
+     * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+     *                              to a consumer
      */
     public void checkDeliveredToConsumer() throws NoConsumersException
     {
@@ -403,9 +391,8 @@
     }
 
     /**
-     * Called when this message is delivered to a consumer. (used to
-     * implement the 'immediate' flag functionality).
-     * And by selectors to determin if the message has already been sent
+     * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). And
+     * by selectors to determin if the message has already been sent
      */
     public void setDeliveredToConsumer()
     {
@@ -457,13 +444,31 @@
         return msgdec;
     }
 
-    public boolean taken()
+    public boolean taken(Subscription sub)
     {
-        return _taken.getAndSet(true);
+        if (_taken.getAndSet(true))
+        {
+            if (sub == _takenBySubcription)
+            {
+                return false;
+            }
+            return true;
+        }
+        else
+        {
+            _takenBySubcription = sub;
+            return false;
+        }
     }
 
     public void release()
     {
+        _takenBySubcription = null;
         _taken.set(false);
+    }
+
+    public Subscription getDeliveredSubscription()
+    {
+        return _takenBySubcription;
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb  8 05:38:37 2007
@@ -37,8 +37,8 @@
 import java.util.concurrent.Executor;
 
 /**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
- * that. It is described fully in RFC 006.
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
+ * fully in RFC 006.
  */
 public class AMQQueue implements Managable, Comparable
 {
@@ -46,62 +46,41 @@
 
     private final String _name;
 
-    /**
-     * null means shared
-     */
+    /** null means shared */
     private final String _owner;
 
     private final boolean _durable;
 
-    /**
-     * If true, this queue is deleted when the last subscriber is removed
-     */
+    /** If true, this queue is deleted when the last subscriber is removed */
     private final boolean _autoDelete;
 
-    /**
-     * Holds subscribers to the queue.
-     */
+    /** Holds subscribers to the queue. */
     private final SubscriptionSet _subscribers;
 
     private final SubscriptionFactory _subscriptionFactory;
 
-    /**
-     * Manages message delivery.
-     */
+    /** Manages message delivery. */
     private final DeliveryManager _deliveryMgr;
 
-    /**
-     * The queue registry with which this queue is registered.
-     */
+    /** The queue registry with which this queue is registered. */
     private final QueueRegistry _queueRegistry;
 
-    /**
-     * Used to track bindings to exchanges so that on deletion they can easily
-     * be cancelled.
-     */
+    /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     private final ExchangeBindings _bindings = new ExchangeBindings(this);
 
-    /**
-     * Executor on which asynchronous delivery will be carriedout where required
-     */
+    /** Executor on which asynchronous delivery will be carriedout where required */
     private final Executor _asyncDelivery;
 
     private final AMQQueueMBean _managedObject;
 
-    /**
-     * max allowed size(KB) of a single message
-     */
+    /** max allowed size(KB) of a single message */
     private long _maximumMessageSize = 10000;
 
-    /**
-     * max allowed number of messages on a queue.
-     */
+    /** max allowed number of messages on a queue. */
     @Configured(path = "maximumMessageCount", defaultValue = "0")
     public int _maximumMessageCount;
 
-    /**
-     * max queue depth for the queue
-     */
+    /** max queue depth for the queue */
     @Configured(path = "maximumQueueDepth", defaultValue = "0")
     public long _maximumQueueDepth = 10000000;
 
@@ -117,9 +96,7 @@
     @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
     public long _minimumAlertRepeatGap = 30000;
 
-    /**
-     * total messages received by the queue since startup.
-     */
+    /** total messages received by the queue since startup. */
     public long _totalMessagesReceived = 0;
 
     public int compareTo(Object o)
@@ -198,7 +175,7 @@
         _autoDelete = autoDelete;
         _queueRegistry = queueRegistry;
         _asyncDelivery = asyncDelivery;
-        
+
         _managedObject = createMBean();
         _managedObject.register();
 
@@ -244,17 +221,13 @@
         return _autoDelete;
     }
 
-    /**
-     * @return no of messages(undelivered) on the queue.
-     */
+    /** @return no of messages(undelivered) on the queue. */
     public int getMessageCount()
     {
         return _deliveryMgr.getQueueMessageCount();
     }
 
-    /**
-     * @return List of messages(undelivered) on the queue.
-     */
+    /** @return List of messages(undelivered) on the queue. */
     public List<AMQMessage> getMessagesOnTheQueue()
     {
         return _deliveryMgr.getMessages();
@@ -263,10 +236,11 @@
     public long getQueueDepth()
     {
         return _deliveryMgr.getTotalMessageSize();
-    }    
+    }
 
     /**
      * @param messageId
+     *
      * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
      */
     public AMQMessage getMessageOnTheQueue(long messageId)
@@ -285,9 +259,7 @@
         return msg;
     }
 
-    /**
-     * @return MBean object associated with this Queue
-     */
+    /** @return MBean object associated with this Queue */
     public ManagedObject getManagedObject()
     {
         return _managedObject;
@@ -344,17 +316,13 @@
         return _deliveryMgr.getOldestMessageArrival();
     }
 
-    /**
-     * Removes the AMQMessage from the top of the queue.
-     */
+    /** Removes the AMQMessage from the top of the queue. */
     public void deleteMessageFromTop() throws AMQException
     {
         _deliveryMgr.removeAMessageFromTop();
     }
 
-    /**
-     * removes all the messages from the queue.
-     */
+    /** removes all the messages from the queue. */
     public void clearQueue() throws AMQException
     {
         _deliveryMgr.clearAllMessages();
@@ -375,7 +343,7 @@
     {
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, _deliveryMgr);
 
         if (subscription.hasFilters())
         {
@@ -396,7 +364,8 @@
         Subscription removedSubscription;
         if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
                                                                                                          ps,
-                                                                                                         consumerTag)))
+                                                                                                         consumerTag,
+                                                                                                         _deliveryMgr)))
             == null)
         {
             throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Feb  8 05:38:37 2007
@@ -31,10 +31,17 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /** Manages delivery of messages on behalf of a queue */
@@ -68,6 +75,7 @@
      */
     private ReentrantLock _lock = new ReentrantLock();
     private AtomicLong _totalMessageSize = new AtomicLong();
+    private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
 
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
@@ -111,7 +119,7 @@
         _lock.lock();
         try
         {
-            return !_messages.isEmpty();
+            return !_messages.isEmpty() || !_hasContent.isEmpty();
         }
         finally
         {
@@ -146,6 +154,20 @@
         return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
     }
 
+    public void setQueueHasContent(Subscription subscription)
+    {
+        _lock.lock();
+        try
+        {
+
+            _log.debug("Queue has content Set");
+            _hasContent.add(subscription);
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+    }
 
     public synchronized List<AMQMessage> getMessages()
     {
@@ -197,7 +219,7 @@
     {
         AMQMessage message = messages.peek();
 
-        while (message != null && (sub.isBrowser() || message.taken()))
+        while (message != null && (sub.isBrowser() || message.taken(sub)))
         {
             //remove the already taken message
             messages.poll();
@@ -207,8 +229,17 @@
         return message;
     }
 
-    public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+    public void sendNextMessage(Subscription sub, AMQQueue queue)
     {
+
+        Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+        if (messageQueue == null)
+        {
+            // There is no queue with messages currently
+            _log.warn(sub + ": asked to send messages but has none on given queue:" + queue);
+            return;
+        }
         AMQMessage message = null;
         try
         {
@@ -221,14 +252,30 @@
             }
             if (_log.isDebugEnabled())
             {
-                _log.debug("Async Delivery Message:" + message + " to :" + sub);
+                _log.debug("Async Delivery Message:" + message + " to :" + this);
             }
 
-            sub.send(message, _queue);
+            sub.send(message, queue);
 
             //remove sent message from our queue.
             messageQueue.poll();
-            _totalMessageSize.addAndGet(-message.getSize());
+
+            //If we don't remove the message from _messages
+            // Otherwise the Async send will never end            
+            if (messageQueue.isEmpty())
+            {
+                if (messageQueue == sub.getResendQueue())
+                {
+                    _hasContent.remove(sub);
+                }
+                else if (messageQueue == sub.getPreDeliveryQueue())
+                {
+                    //fixme
+                    _log.error("MEMORY LEAK: message from PreDeliveryQueue not removed from _messages");
+                    //_messages.remove(message);
+                }
+            }
+
         }
         catch (FailedDequeueException e)
         {
@@ -254,7 +301,7 @@
             {
                 if (!sub.isSuspended())
                 {
-                    sendNextMessage(sub);
+                    sendNextMessage(sub, _queue);
 
                     hasSubscribers = true;
                 }
@@ -262,25 +309,6 @@
         }
     }
 
-    private void sendNextMessage(Subscription sub)
-    {
-        if (sub.hasFilters())
-        {
-            sendNextMessage(sub, sub.getPreDeliveryQueue());
-            if (sub.isAutoClose())
-            {
-                if (sub.getPreDeliveryQueue().isEmpty())
-                {
-                    sub.close();
-                }
-            }
-        }
-        else
-        {
-            sendNextMessage(sub, _messages);
-        }
-    }
-
     private AMQMessage poll()
     {
         return _messages.poll();
@@ -355,6 +383,8 @@
                     _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
                                System.identityHashCode(s) + ") :" + s);
                 }
+                //Mark message as taken
+                msg.taken(s);
                 //Deliver the message
                 s.send(msg, _queue);
             }
@@ -405,8 +435,8 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
-                       " Active:" + _subscriptions.hasActiveSubscribers() +
+            _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:"
+                       + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() +
                        " Processing:" + _processing.get());
         }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Thu Feb  8 05:38:37 2007
@@ -80,4 +80,6 @@
     long getTotalMessageSize();
 
     long getOldestMessageArrival();
+
+    void setQueueHasContent(Subscription subscription);
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Thu Feb  8 05:38:37 2007
@@ -44,5 +44,11 @@
 
     void close();
 
-    boolean isBrowser();   
+    boolean isBrowser();
+
+    Queue<AMQMessage> getResendQueue();
+
+    Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
+
+    void addToResendQueue(AMQMessage msg);
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Thu Feb  8 05:38:37 2007
@@ -25,18 +25,18 @@
 import org.apache.qpid.framing.FieldTable;
 
 /**
- * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
- * factory primarily assists testing although in future more sophisticated subscribers may need a different
- * subscription implementation.
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
+ * primarily assists testing although in future more sophisticated subscribers may need a different subscription
+ * implementation.
  *
  * @see org.apache.qpid.server.queue.AMQQueue
  */
 public interface SubscriptionFactory
 {
     Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
-                                    FieldTable filters, boolean noLocal) throws AMQException;
+                                    FieldTable filters, boolean noLocal, DeliveryManager deliveryManager) throws AMQException;
 
 
-    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
-            throws AMQException;
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+                                    DeliveryManager deliveryManager) throws AMQException;
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Feb  8 05:38:37 2007
@@ -39,11 +39,8 @@
 import java.util.Queue;
 
 /**
- * Encapsulation of a supscription to a queue.
- * <p/>
- * Ties together the protocol session of a subscriber, the consumer tag that
- * was given out by the broker and the channel id.
- * <p/>
+ * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
  */
 public class SubscriptionImpl implements Subscription
 {
@@ -59,40 +56,46 @@
 
     private Queue<AMQMessage> _messages;
 
+    private Queue<AMQMessage> _resendQueue;
+
     private final boolean _noLocal;
 
-    /**
-     * True if messages need to be acknowledged
-     */
+    /** True if messages need to be acknowledged */
     private final boolean _acks;
     private FilterManager _filters;
     private final boolean _isBrowser;
     private final Boolean _autoClose;
     private boolean _closed = false;
 
+    private DeliveryManager _deliveryManager;
+
     public static class Factory implements SubscriptionFactory
     {
-        public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
+        public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+                                               boolean acks, FieldTable filters, boolean noLocal,
+                                               DeliveryManager deliveryManager) throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager);
         }
 
-        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
+                                                   DeliveryManager deliveryManager)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager);
         }
     }
 
     public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
-                            String consumerTag, boolean acks)
+                            String consumerTag, boolean acks, DeliveryManager deliveryManager)
             throws AMQException
     {
-        this(channelId, protocolSession, consumerTag, acks, null, false);
+        this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager);
     }
 
     public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
-                            String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+                            String consumerTag, boolean acks, FieldTable filters, boolean noLocal,
+                            DeliveryManager deliveryManager)
             throws AMQException
     {
         AMQChannel channel = protocolSession.getChannel(channelId);
@@ -107,6 +110,7 @@
         sessionKey = protocolSession.getKey();
         _acks = acks;
         _noLocal = noLocal;
+        _deliveryManager = deliveryManager;
 
         _filters = FilterManagerFactory.createManager(filters);
 
@@ -165,7 +169,7 @@
                             String consumerTag)
             throws AMQException
     {
-        this(channel, protocolSession, consumerTag, false);
+        this(channel, protocolSession, consumerTag, false, null);
     }
 
     public boolean equals(Object o)
@@ -173,9 +177,7 @@
         return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
     }
 
-    /**
-     * Equality holds if the session matches and the channel and consumer tag are the same.
-     */
+    /** Equality holds if the session matches and the channel and consumer tag are the same. */
     private boolean equals(SubscriptionImpl psc)
     {
         return sessionKey.equals(psc.sessionKey)
@@ -194,11 +196,12 @@
     }
 
     /**
-     * This method can be called by each of the publisher threads.
-     * As a result all changes to the channel object must be thread safe.
+     * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+     * thread safe.
      *
      * @param msg
      * @param queue
+     *
      * @throws AMQException
      */
     public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
@@ -225,7 +228,7 @@
         // We don't decrement the reference here as we don't want to consume the message
         // but we do want to send it to the client.
 
-        synchronized(channel)
+        synchronized (channel)
         {
             long deliveryTag = channel.getNextDeliveryTag();
 
@@ -259,7 +262,7 @@
             {
                 queue.dequeue(msg);
             }
-            synchronized(channel)
+            synchronized (channel)
             {
                 long deliveryTag = channel.getNextDeliveryTag();
 
@@ -376,6 +379,11 @@
 
     public void close()
     {
+        if (_resendQueue != null && !_resendQueue.isEmpty())
+        {
+            requeue();
+        }
+
         if (!_closed)
         {
             _logger.info("Closing autoclose subscription:" + this);
@@ -383,18 +391,74 @@
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
             protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
-        		(byte)8, (byte)0,	// AMQP version (major, minor)
-            	consumerTag	// consumerTag
-                ));
+                                                                        (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                        consumerTag    // consumerTag
+            ));
             _closed = true;
         }
     }
 
+    private void requeue()
+    {
+        //fixme
+        _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages");
+    }
+
     public boolean isBrowser()
     {
         return _isBrowser;
     }
 
+    public Queue<AMQMessage> getResendQueue()
+    {
+        if (_resendQueue == null)
+        {
+            _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+        }
+        return _resendQueue;
+    }
+
+
+    public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+    {
+        if (_resendQueue != null && !_resendQueue.isEmpty())
+        {
+            return _resendQueue;
+        }
+
+        if (_filters != null)
+        {
+            if (isAutoClose())
+            {
+                if (_messages.isEmpty())
+                {
+                    close();
+                    return null;
+                }
+            }
+            return _messages;
+        }
+        else // we want the DM queue
+        {
+            return messages;
+        }
+    }
+
+    public void addToResendQueue(AMQMessage msg)
+    {
+        // add to our resend queue
+        getResendQueue().add(msg);
+
+        // Mark Queue has having content.
+        if (_deliveryManager == null)
+        {
+            _logger.error("Delivery Manager is null won't be able to resend messages");
+        }
+        else
+        {
+            _deliveryManager.setQueueHasContent(this);
+        }
+    }
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
     {
@@ -402,13 +466,13 @@
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
-        	(byte)8, (byte)0,	// AMQP version (major, minor)
-            consumerTag,	// consumerTag
-        	deliveryTag,	// deliveryTag
-            exchange,	// exchange
-            false,	// redelivered
-            routingKey	// routingKey
-            );
+                                                                (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                consumerTag,    // consumerTag
+                                                                deliveryTag,    // deliveryTag
+                                                                exchange,    // exchange
+                                                                false,    // redelivered
+                                                                routingKey    // routingKey
+        );
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
         deliverFrame.writePayload(buf);
         buf.flip();

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Feb  8 05:38:37 2007
@@ -86,7 +86,7 @@
     private int _nextTag = 1;
 
     /** This queue is bounded and is used to store messages before being dispatched to the consumer */
-    private final FlowControllingBlockingQueue _queue;
+    public final FlowControllingBlockingQueue _queue;
 
     private Dispatcher _dispatcher;
 
@@ -804,16 +804,44 @@
         checkNotTransacted(); // throws IllegalStateException if a transacted session
         // this is set only here, and the before the consumer's onMessage is called it is set to false
         _inRecovery = true;
+
+        boolean isSuspended = isSuspended();
+
+        if (!isSuspended)
+        {
+            try
+            {
+                suspendChannel(true);
+            }
+            catch (AMQException e)
+            {
+                throw new JMSAMQException(e);
+            }
+        }
         for (BasicMessageConsumer consumer : _consumers.values())
         {
             consumer.clearUnackedMessages();
         }
+        
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
                                                                                     (byte) 8, (byte) 0,    // AMQP version (major, minor)
                                                                                     false));    // requeue
+
+        if (!isSuspended)
+        {
+            try
+            {
+                suspendChannel(false);
+            }
+            catch (AMQException e)
+            {
+                throw new JMSAMQException(e);
+            }
+        }
+
     }
 
     boolean isInRecovery()
@@ -836,8 +864,6 @@
         {
             consumer.acknowledge();
         }
-
-
     }
 
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Thu Feb  8 05:38:37 2007
@@ -100,7 +100,7 @@
         {
             _logger.trace("Object added to queue:" + o);
         }
-        
+
         if (_listener != null)
         {
             synchronized (_listener)
@@ -111,6 +111,11 @@
                 }
             }
         }
+    }
+
+    public int size()
+    {
+        return _count;
     }
 }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Thu Feb  8 05:38:37 2007
@@ -79,12 +79,15 @@
         // no ack for last three messages so when I call recover I expect to get three messages back
         consumerSession.recover();
         tm = (TextMessage) consumer.receive(3000);
+        assertNotNull("Message was null", tm);
         assertEquals("msg2", tm.getText());
 
         tm = (TextMessage) consumer.receive(3000);
+        assertNotNull("Message was null", tm);
         assertEquals("msg3", tm.getText());
 
         tm = (TextMessage) consumer.receive(3000);
+        assertNotNull("Message was null", tm);
         assertEquals("msg4", tm.getText());
 
         _logger.info("Received redelivery of three messages. Acknowledging last message");

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Thu Feb  8 05:38:37 2007
@@ -269,7 +269,7 @@
 
         _session.commit();
         assertNotNull("test message was consumed and rolled back, but is gone", result);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
+        assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText());
     }
 
 
@@ -297,4 +297,5 @@
 
         assertNull("test message should be null", result);
     }
+
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Thu Feb  8 05:38:37 2007
@@ -134,11 +134,21 @@
 
     public boolean isBrowser()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
-    public void sendNextMessage(AMQQueue queue)
+    public Queue<AMQMessage> getResendQueue()
     {
+        return null;
+    }
 
+    public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+    {
+        return null;
+    }
+
+    public void addToResendQueue(AMQMessage msg)
+    {
+        //no-op
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java Thu Feb  8 05:38:37 2007
@@ -36,9 +36,7 @@
 
 import junit.framework.TestCase;
 
-/**
- * Tests that acknowledgements are handled correctly.
- */
+/** Tests that acknowledgements are handled correctly. */
 public class AckTest extends TestCase
 {
     private static final Logger _log = Logger.getLogger(AckTest.class);
@@ -82,7 +80,7 @@
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Establish some way to determine the version for the test.
-            BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0);
+            BasicPublishBody publishBody = new BasicPublishBody((byte) 8, (byte) 0);
             publishBody.routingKey = "rk";
             publishBody.exchange = "someExchange";
             AMQMessage msg = new AMQMessage(_messageStore, publishBody);
@@ -104,12 +102,12 @@
     }
 
     /**
-     * Tests that the acknowledgements are correctly associated with a channel and
-     * order is preserved when acks are enabled
+     * Tests that the acknowledgements are correctly associated with a channel and order is preserved when acks are
+     * enabled
      */
     public void testAckChannelAssociationTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
         final int msgCount = 10;
         publishMessages(msgCount, true);
 
@@ -130,13 +128,11 @@
         assertTrue(_messageStore.getMessageMap().size() == msgCount);
     }
 
-    /**
-     * Tests that in no-ack mode no messages are retained
-     */
+    /** Tests that in no-ack mode no messages are retained */
     public void testNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false);
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false, null);
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -145,13 +141,10 @@
         assertTrue(_messageStore.getMessageMap().size() == 0);
     }
 
-    /**
-     * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
-     * set case)
-     */
+    /** Tests that a single acknowledgement is handled correctly (i.e multiple flag not set case) */
     public void testSingleAckReceivedTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -175,13 +168,10 @@
         }
     }
 
-    /**
-     * Tests that a single acknowledgement is handled correctly (i.e multiple flag not
-     * set case)
-     */
+    /** Tests that a single acknowledgement is handled correctly (i.e multiple flag not set case) */
     public void testMultiAckReceivedTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -201,12 +191,10 @@
         }
     }
 
-    /**
-     * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
-     */
+    /** Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. */
     public void testMultiAckAllReceivedTest() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -231,7 +219,7 @@
         int lowMark = 5;
         int highMark = 10;
 
-        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
         _channel.setPrefetchLowMarkCount(lowMark);
         _channel.setPrefetchHighMarkCount(highMark);
 
@@ -282,7 +270,7 @@
 
     public void testPrefetch() throws AMQException
     {
-        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+        _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true, null);
         _channel.setPrefetchCount(5);
 
         assertTrue(_channel.getPrefetchCount() == 5);

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=504887&r1=504886&r2=504887
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu Feb  8 05:38:37 2007
@@ -103,7 +103,22 @@
 
     public boolean isBrowser()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
+    }
+
+    public Queue<AMQMessage> getResendQueue()
+    {
+        return null;
+    }
+
+    public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+    {
+        return null;
+    }
+
+    public void addToResendQueue(AMQMessage msg)
+    {
+        //no-op
     }
 
     public int hashCode()