You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/09 18:22:55 UTC

svn commit: r505383 - in /incubator/qpid/branches/perftesting/qpid: java/broker/etc/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/handler/ java/broker/src/main/java/org/apache/qpid/server/protocol/ ...

Author: ritchiem
Date: Fri Feb  9 09:22:52 2007
New Revision: 505383

URL: http://svn.apache.org/viewvc?view=rev&rev=505383
Log:
QPID-346 Message loss after rollback
QPID-347 Connection closure results in undelivered messages remaining 'taken()' and so never re-delivered.
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription  

Most changes are related to QPID-346,QPID-348,QPID-355

Broker
------

Adjusted broker log4j.xml to provide a more reasonable output.

AMQChannel - resend() modified to use the DeliveryManager to resend messages so ordering is preserved and higher priority messaes can be so prioritised. 
   - requeue/resend adjusted to release() messages QPID-347 

BasicRecoverMethodHandler - updated to make use of requeue flag ... needs testing though it is same code as Rollback. Method now sends back the synchronous Recover-Ok

TxRollbackHandler - This now calls resend() to put all messages sent back on subscriptions resendQueue. This currently assumes that you are suspended as otherwise you will start to receive the resent messages before the RollbackOk method.

AMQMinaProtocolSession - getChannel() from the interface says to return null if channelId cannot be found. However, in most cases this is not checked to be null and in the few places it is checked a AMQE is thrown. Consistency here would be good I have changed this method to throw AMQException directly but we should be clear why we were returning null. NPEs could occur in an error situation.
 - also white space changes

AMQQueue - Changed previous change to subscription creation to pass in the AMQQueue to the subscription rather than the DeliveryManager this allows the messages stored on the resendQueue to be delivered to the queue during closure.
Added method to allow a subscription to say that it is containing data for the queue.

DeliveryManager - Added method to allow the Subscription call through the AMQQueue to specify that it is holding content that should stop the queue being marked as empty.

ConcurrentSelectorDeliveryManager - Added locking with the subscription to ensure that if there is a resendQueue in use then only one thread is processing it at once. Without this the AsyncDelivery thread and the io thread closing the subscription can both process _resendQueue. This results in a message being dupicated and the next message on the queue being lost. As the AsyncDelivery thread peeks the queue, ensures the message is sent then poll()s it. While the io-closing thread poll()s the message and delivers it to the queue. So the AsyncDelivery thread will remove the next message in the queue by mistake with it's poll() call.
+implemented new features in DeliveryManager
+wrapped loging in if<level>Enable added some extra trace logs.

Subscription - added method to get sendLock.

SubscriptionFactory - changes as said above from AMQChannel, changed DeliveryManager to AMQQueue in subscription constructor.

SubscriptionImpl - Changes for new constructor.
+ Changed suspension notion to suspend the subscription whilst messages are being resent from the resendQueue to the main AMQQueue this only occurs during closure so this is a good thing as the messages would just come back when the consumer finally closes. QPID-355
+ Implemented requeue of messages during message closure. QPID-355
+ Changed hard-coded value of false for msg.redelivered to use the acutal value stored in the message.

SubscriptionSet - formatting

Client
------
AMQSession - Changed the implementation of recover() to be synchronous on Recover-Ok and perform the correct clean up. Whilst the session is suspended.

CommitRollbackTest - Added to confirm changes work. (Updated from previous commit)
    testPutThenRollback() - checks redelivered flag
   testRollbackWithConsumerConnectionClose() -
     Test that Closing a consumer and then connection while messages are being resent from a rolling back get correctly requeued a session purges the dispatcher queue, and the messages arrive in the correct order
 
    testRollbackWithConsumerAndSessionClose()-
    Test that Closing a consumer and then session while messags are being resent from a rollback get correctly requeued, a session purges the dispatcher queue, and the messages arrive in the correct order


Cluster
-------
SubscriptionTestHelper/RemoteSubscriptionImpl - null implementation of Subscription interface sendLock()

amqp.0-8.xml - spec change. to add Recover-ok

Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
    incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    incubator/qpid/branches/perftesting/qpid/specs/amqp.0-8.xml

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml Fri Feb  9 09:22:52 2007
@@ -46,6 +46,11 @@
         <priority value="info"/>
     </category>
 
+    <category name="org.apache.qpid.framing.AMQDataBlockEncoder">
+       <priority value="info"/>
+   </category>
+
+
      <category name="org.apache.qpid">
         <priority value="warn"/>
     </category>

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb  9 09:22:52 2007
@@ -308,6 +308,10 @@
 
     public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException
     {
+        if (_log.isTraceEnabled())
+        {
+            _log.trace("Unsubscribed consumer:" + consumerTag);
+        }
         AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
         if (q != null)
         {
@@ -350,9 +354,17 @@
      * @param message
      * @param deliveryTag
      * @param queue
+     * @param consumerTag
      */
     public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
     {
+        if (_log.isTraceEnabled())
+        {
+            _log.trace("Adding unackedMessage (" + System.identityHashCode(message) + ") for channel " + _channelId +
+                       " with delivery tag " + deliveryTag + " and consumerTag " + consumerTag +
+                       " from queue:" + queue.getName());
+        }
+
         synchronized (_unacknowledgedMessageMapLock)
         {
             _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
@@ -364,6 +376,8 @@
     /**
      * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
      * this same channel or to other subscribers.
+     *
+     * @throws org.apache.qpid.AMQException if delivery failes
      */
     public void requeue() throws AMQException
     {
@@ -372,7 +386,12 @@
         synchronized (_unacknowledgedMessageMapLock)
         {
             currentList = _unacknowledgedMessageMap;
-            _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+            _unacknowledgedMessageMap = newUnacknowledgedMap();
+        }
+
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Requeuing " + currentList.size() + " messages for channel:" + System.identityHashCode(this));
         }
 
         for (UnacknowledgedMessage unacked : currentList.values())
@@ -391,62 +410,61 @@
     /** Called to resend all outstanding unacknowledged messages to this same channel. */
     public void resend() throws AMQException
     {
-        //messages go to this channel
+        Map<Long, UnacknowledgedMessage> currentList;
+
         synchronized (_unacknowledgedMessageMapLock)
         {
-            Iterator<Map.Entry<Long, UnacknowledgedMessage>> messageSetIterator =
-                    _unacknowledgedMessageMap.entrySet().iterator();
-
-            while (messageSetIterator.hasNext())
-            {
-                Map.Entry<Long, UnacknowledgedMessage> entry = messageSetIterator.next();
+            currentList = _unacknowledgedMessageMap;
+            _unacknowledgedMessageMap = newUnacknowledgedMap();
+        }
 
-                //long deliveryTag = entry.getKey();
-                String consumerTag = entry.getValue().consumerTag;
+        for (Map.Entry<Long, UnacknowledgedMessage> entry : currentList.entrySet())
+        {
+            UnacknowledgedMessage unacked = entry.getValue();
 
-                if (_consumerTag2QueueMap.containsKey(consumerTag))
-                {
-                    AMQMessage msg = entry.getValue().message;
-                    msg.setRedelivered(true);
-                    Subscription sub = msg.getDeliveredSubscription();
+            String consumerTag = unacked.consumerTag;
 
-                    if (sub != null)
-                    {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Requeuing " + msg + " for resend");
-                        }
+            if (_consumerTag2QueueMap.containsKey(consumerTag))
+            {
+                AMQMessage msg = entry.getValue().message;
+                msg.setRedelivered(true);
+                Subscription sub = msg.getDeliveredSubscription();
 
-                        sub.addToResendQueue(msg);
-                    }
-                    else
+                if (sub != null)
+                {
+                    if (_log.isDebugEnabled())
                     {
-                        _log.error("DeliveredSubscription not recorded");
+                        _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
                     }
 
-                    // Don't write the frame as the DeliveryManager can now deal with it
-                    //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+                    sub.addToResendQueue(msg);
                 }
                 else
-                { // The current consumer has gone so we need to requeue
+                {
+                    _log.error("DeliveredSubscription not recorded");
+                }
 
-                    UnacknowledgedMessage unacked = entry.getValue();
+                // Don't write the frame as the DeliveryManager can now deal with it
+                //session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+            }
+            else
+            { // The current consumer has gone so we need to requeue
 
-                    if (unacked.queue != null)
-                    {
-                        unacked.message.setTxnBuffer(null);
+                if (unacked.queue != null)
+                {
+                    unacked.message.setTxnBuffer(null);
 
-                        unacked.message.release();
+                    unacked.message.release();
 
-                        unacked.queue.deliver(unacked.message);
-                    }
-                    // delete the requeued message.
-                    messageSetIterator.remove();
+                    unacked.queue.deliver(unacked.message);
                 }
             }
         }
+    }
 
-        //fixme need to start the async delivery here.
+    private Map<Long, UnacknowledgedMessage> newUnacknowledgedMap()
+    {
+        return new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
     }
 
     /**
@@ -541,9 +559,9 @@
 
     private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
     {
-        if (_log.isDebugEnabled())
+        if (_log.isTraceEnabled())
         {
-            _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
+            _log.trace("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
                        " and multiple " + multiple);
         }
         if (multiple)

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java Fri Feb  9 09:22:52 2007
@@ -28,6 +28,7 @@
 import org.apache.qpid.server.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
@@ -49,11 +50,6 @@
         _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
         AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
 
-        if (channel == null)
-        {
-            throw new AMQException("Unknown channel " + evt.getChannelId());
-        }
-
         if (evt.getMethod().getRequeue())
         {
             //fixme need tests to exercise
@@ -63,5 +59,11 @@
         {
             channel.resend();
         }
+
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        protocolSession.writeFrame(BasicRecoverOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Feb  9 09:22:52 2007
@@ -54,14 +54,14 @@
 
             channel.rollback();
 
+            //The DeliveryManager will now be responsible for dispatching these messages.
+            // So call resend to put them on the correct resend queue.
+            channel.resend();
+
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
             protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
-
-            //Now resend all the unacknowledged messages back to the original subscribers.
-            //(Must be done after the TxnRollback-ok response).
-            channel.resend();           
         }
         catch (AMQException e)
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Feb  9 09:22:52 2007
@@ -136,7 +136,7 @@
         catch (RuntimeException e)
         {
             e.printStackTrace();
-        //    throw e;
+            //    throw e;
 
         }
     }
@@ -183,12 +183,12 @@
                 String locales = "en_US";
                 // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
                 AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
-            		_major, _minor,	// AMQP version (major, minor)
-                    locales.getBytes(),	// locales
-                    mechanisms.getBytes(),	// mechanisms
-                    null,	// serverProperties
-                	(short)_major,	// versionMajor
-                    (short)_minor);	// versionMinor
+                                                                       _major, _minor,    // AMQP version (major, minor)
+                                                                       locales.getBytes(),    // locales
+                                                                       mechanisms.getBytes(),    // mechanisms
+                                                                       null,    // serverProperties
+                                                                       (short) _major,    // versionMajor
+                                                                       (short) _minor);    // versionMinor
                 _minaProtocolSession.write(response);
             }
             catch (AMQException e)
@@ -307,8 +307,8 @@
     }
 
     /**
-     * Convenience method that writes a frame to the protocol session. Equivalent
-     * to calling getProtocolSession().write().
+     * Convenience method that writes a frame to the protocol session. Equivalent to calling
+     * getProtocolSession().write().
      *
      * @param frame the frame to write
      */
@@ -335,14 +335,23 @@
 
     public AMQChannel getChannel(int channelId) throws AMQException
     {
-        return _channelMap.get(channelId);
+        AMQChannel channel = _channelMap.get(channelId);
+
+        if (channel == null)
+        {
+            throw new AMQException("Unknown channel " + channelId);
+        }
+        else
+        {
+            return channel;
+        }
     }
 
     public void addChannel(AMQChannel channel) throws AMQException
     {
         if (_closed)
         {
-            throw new AMQException("Session is closed");    
+            throw new AMQException("Session is closed");
         }
 
         _channelMap.put(channel.getChannelId(), channel);
@@ -385,12 +394,12 @@
     }
 
     /**
-     * Close a specific channel. This will remove any resources used by the channel, including:
-     * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
-     * </ul>
+     * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+     * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
      *
      * @param channelId id of the channel to close
-     * @throws AMQException if an error occurs closing the channel
+     *
+     * @throws AMQException             if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
      */
     public void closeChannel(int channelId) throws AMQException
@@ -438,8 +447,7 @@
     }
 
     /**
-     * Closes all channels that were opened by this protocol session. This frees up all resources
-     * used by the channel.
+     * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
      *
      * @throws AMQException if an error occurs while closing any channel
      */
@@ -452,10 +460,7 @@
         _channelMap.clear();
     }
 
-    /**
-     * This must be called when the session is _closed in order to free up any resources
-     * managed by the session.
-     */
+    /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     public void closeSession() throws AMQException
     {
         if (!_closed)
@@ -479,17 +484,15 @@
         return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
     }
 
-    /**
-     * @return an object that can be used to identity
-     */
+    /** @return an object that can be used to identity */
     public Object getKey()
     {
         return _minaProtocolSession.getRemoteAddress();
     }
 
     /**
-     * Get the fully qualified domain name of the local address to which this session is bound. Since some servers
-     * may be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+     * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+     * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
      *
      * @return a String FQDN
      */
@@ -533,8 +536,8 @@
     }
 
     /**
-     * Convenience methods for managing AMQP version.
-     * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
+     * Convenience methods for managing AMQP version. NOTE: Both major and minor will be set to 0 prior to protocol
+     * initiation.
      */
 
     public byte getAmqpMajor()

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb  9 09:22:52 2007
@@ -343,7 +343,7 @@
     {
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
-        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, _deliveryMgr);
+        Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
 
         if (subscription.hasFilters())
         {
@@ -364,14 +364,15 @@
         Subscription removedSubscription;
         if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
                                                                                                          ps,
-                                                                                                         consumerTag,
-                                                                                                         _deliveryMgr)))
+                                                                                                         consumerTag)))
             == null)
         {
             throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag +
                                    " and protocol session key " + ps.getKey() + " not registered with queue " + this);
         }
 
+        removedSubscription.close();
+
         // if we are eligible for auto deletion, unregister from the queue registry
         if (_autoDelete && _subscribers.isEmpty())
         {
@@ -543,6 +544,10 @@
         _maximumMessageAge = maximumMessageAge;
     }
 
+    public void setQueueHasContent(boolean b, SubscriptionImpl subscription)
+    {
+        _deliveryMgr.setQueueHasContent(b, subscription);
+    }
 
     private class Deliver implements TxnOp
     {

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb  9 09:22:52 2007
@@ -31,17 +31,13 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
-import java.util.HashMap;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicInteger;
 
 
 /** Manages delivery of messages on behalf of a queue */
@@ -154,13 +150,13 @@
         return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
     }
 
-    public void setQueueHasContent(Subscription subscription)
+    public void setQueueHasContent(boolean hasContent, Subscription subscription)
     {
         _lock.lock();
         try
         {
 
-            _log.debug("Queue has content Set");
+            _log.trace("Queue has content Set");
             _hasContent.add(subscription);
         }
         finally
@@ -236,8 +232,11 @@
 
         if (messageQueue == null)
         {
-            // There is no queue with messages currently
-            _log.warn(sub + ": asked to send messages but has none on given queue:" + queue);
+            // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
+            if (_log.isDebugEnabled())
+            {
+                _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+            }
             return;
         }
         AMQMessage message = null;
@@ -252,7 +251,7 @@
             }
             if (_log.isDebugEnabled())
             {
-                _log.debug("Async Delivery Message:" + message + " to :" + this);
+                _log.debug("Async Delivery Message (" + System.identityHashCode(message) + ") to :" + System.identityHashCode(this));
             }
 
             sub.send(message, queue);
@@ -266,6 +265,11 @@
             {
                 if (messageQueue == sub.getResendQueue())
                 {
+                    if (_log.isTraceEnabled())
+                    {
+                        _log.trace("All messages sent from resendQueue for " + sub);
+                    }
+
                     _hasContent.remove(sub);
                 }
                 else if (messageQueue == sub.getPreDeliveryQueue())
@@ -299,11 +303,16 @@
 
             for (Subscription sub : _subscriptions.getSubscriptions())
             {
-                if (!sub.isSuspended())
+                // Ensure only we are processing the subscribers. getNextQueue as if the subscriber has a resend queue
+                // they may close and start to empty it themselves.
+                synchronized (sub.sendlock())
                 {
-                    sendNextMessage(sub, _queue);
+                    if (!sub.isSuspended())
+                    {
+                        sendNextMessage(sub, _queue);
 
-                    hasSubscribers = true;
+                        hasSubscribers = true;
+                    }
                 }
             }
         }
@@ -316,9 +325,9 @@
 
     public void deliver(String name, AMQMessage msg) throws FailedDequeueException
     {
-        if (_log.isDebugEnabled())
+        if (_log.isTraceEnabled())
         {
-            _log.debug(id() + "deliver :" + System.identityHashCode(msg));
+            _log.trace(id() + "deliver :" + System.identityHashCode(msg));
         }
 
         //Check if we have someone to deliver the message to.
@@ -436,7 +445,7 @@
         if (_log.isDebugEnabled())
         {
             _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ") hasContent:"
-                       + _hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() +
+                       + !_hasContent.isEmpty() + " Active:" + _subscriptions.hasActiveSubscribers() +
                        " Processing:" + _processing.get());
         }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Feb  9 09:22:52 2007
@@ -81,5 +81,5 @@
 
     long getOldestMessageArrival();
 
-    void setQueueHasContent(Subscription subscription);
+    void setQueueHasContent(boolean hasContent, Subscription subscription);
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Feb  9 09:22:52 2007
@@ -51,4 +51,6 @@
     Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
 
     void addToResendQueue(AMQMessage msg);
+
+    Object sendlock();
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Fri Feb  9 09:22:52 2007
@@ -34,9 +34,8 @@
 public interface SubscriptionFactory
 {
     Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
-                                    FieldTable filters, boolean noLocal, DeliveryManager deliveryManager) throws AMQException;
+                                    FieldTable filters, boolean noLocal, AMQQueue queue) throws AMQException;
 
 
-    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
-                                    DeliveryManager deliveryManager) throws AMQException;
+    Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException;
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Feb  9 09:22:52 2007
@@ -37,6 +37,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
@@ -67,35 +68,35 @@
     private final Boolean _autoClose;
     private boolean _closed = false;
 
-    private DeliveryManager _deliveryManager;
+    private AMQQueue _queue;
+    private final AtomicBoolean _resending = new AtomicBoolean(false);
 
     public static class Factory implements SubscriptionFactory
     {
         public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
                                                boolean acks, FieldTable filters, boolean noLocal,
-                                               DeliveryManager deliveryManager) throws AMQException
+                                               AMQQueue queue) throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, deliveryManager);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue);
         }
 
-        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag,
-                                                   DeliveryManager deliveryManager)
+        public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
                 throws AMQException
         {
-            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, deliveryManager);
+            return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null);
         }
     }
 
     public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
-                            String consumerTag, boolean acks, DeliveryManager deliveryManager)
+                            String consumerTag, boolean acks, AMQQueue queue)
             throws AMQException
     {
-        this(channelId, protocolSession, consumerTag, acks, null, false, deliveryManager);
+        this(channelId, protocolSession, consumerTag, acks, null, false, queue);
     }
 
     public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
                             String consumerTag, boolean acks, FieldTable filters, boolean noLocal,
-                            DeliveryManager deliveryManager)
+                            AMQQueue queue)
             throws AMQException
     {
         AMQChannel channel = protocolSession.getChannel(channelId);
@@ -110,7 +111,7 @@
         sessionKey = protocolSession.getKey();
         _acks = acks;
         _noLocal = noLocal;
-        _deliveryManager = deliveryManager;
+        _queue = queue;
 
         _filters = FilterManagerFactory.createManager(filters);
 
@@ -238,9 +239,12 @@
             {
                 channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
             }
-            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
             AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
 
+            //fixme what is wrong with this?
+            //AMQDataBlock frame = msg.getDataBlock(channel.getChannelId(),consumerTag,deliveryTag);
+
             protocolSession.writeFrame(frame);
         }
     }
@@ -271,9 +275,12 @@
                     channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
                 }
 
-                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
                 AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
 
+                //fixme what is wrong with this?
+                //AMQDataBlock frame = msg.getDataBlock(channel.getChannelId(),consumerTag,deliveryTag);                
+
                 protocolSession.writeFrame(frame);
             }
         }
@@ -285,7 +292,7 @@
 
     public boolean isSuspended()
     {
-        return channel.isSuspended();
+        return channel.isSuspended() && !_resending.get();
     }
 
     /**
@@ -379,12 +386,20 @@
 
     public void close()
     {
+        _logger.info("Closing subscription:" + this);
+
         if (_resendQueue != null && !_resendQueue.isEmpty())
         {
             requeue();
         }
 
-        if (!_closed)
+        //remove references in PDQ
+        if (_messages != null)
+        {
+            _messages.clear();
+        }
+
+        if (_autoClose && !_closed)
         {
             _logger.info("Closing autoclose subscription:" + this);
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -400,8 +415,59 @@
 
     private void requeue()
     {
-        //fixme
-        _logger.error("MESSAGES LOST as subscription hasn't yet resent all its requeued messages");
+
+        if (_queue != null)
+        {
+            _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
+
+            //Take  control over to this thread for delivering messages from the Async Delivery.
+            setResending(true);
+
+            while (!_resendQueue.isEmpty())
+            {
+                AMQMessage resent = _resendQueue.poll();
+
+                resent.setTxnBuffer(null);
+
+                resent.release();
+
+                try
+                {
+                    _queue.deliver(resent);
+                }
+                catch (AMQException e)
+                {
+                    _logger.error("Unable to re-deliver messages", e);
+                }
+            }
+
+            setResending(false);
+
+            if (!_resendQueue.isEmpty())
+            {
+                _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
+            }
+
+            _queue.setQueueHasContent(false, this);
+        }
+        else
+        {
+            if (!_resendQueue.isEmpty())
+            {
+                _logger.error("Unable to re-deliver messages as queue is null.");
+            }
+        }
+
+        // Clear the messages
+        _resendQueue = null;
+    }
+
+    private void setResending(boolean resending)
+    {
+        synchronized (_resending)
+        {
+            _resending.set(resending);
+        }
     }
 
     public boolean isBrowser()
@@ -450,17 +516,22 @@
         getResendQueue().add(msg);
 
         // Mark Queue has having content.
-        if (_deliveryManager == null)
+        if (_queue == null)
         {
             _logger.error("Delivery Manager is null won't be able to resend messages");
         }
         else
         {
-            _deliveryManager.setQueueHasContent(this);
+            _queue.setQueueHasContent(true, this);
         }
     }
 
-    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+    public Object sendlock()
+    {
+        return _resending;
+    }
+
+    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange, boolean redelivered)
     {
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -470,7 +541,7 @@
                                                                 consumerTag,    // consumerTag
                                                                 deliveryTag,    // deliveryTag
                                                                 exchange,    // exchange
-                                                                false,    // redelivered
+                                                                redelivered,    // redelivered
                                                                 routingKey    // routingKey
         );
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Fri Feb  9 09:22:52 2007
@@ -27,27 +27,18 @@
 import java.util.ListIterator;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-/**
- * Holds a set of subscriptions for a queue and manages the round
- * robin-ing of deliver etc.
- */
+/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */
 class SubscriptionSet implements WeightedSubscriptionManager
 {
     private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
 
-    /**
-     * List of registered subscribers
-     */
+    /** List of registered subscribers */
     private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
 
-    /**
-     * Used to control the round robin delivery of content
-     */
+    /** Used to control the round robin delivery of content */
     private int _currentSubscriber;
 
-    /**
-     * Accessor for unit tests.
-     */
+    /** Accessor for unit tests. */
     int getCurrentSubscriber()
     {
         return _currentSubscriber;
@@ -62,14 +53,19 @@
      * Remove the subscription, returning it if it was found
      *
      * @param subscription
+     *
      * @return null if no match was found
      */
     public Subscription removeSubscriber(Subscription subscription)
     {
-        boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
-        if (isRemoved)
+        // TODO: possibly need O(1) operation here.
+        int subIndex = _subscriptions.indexOf(subscription);
+
+        if (subIndex != -1)
         {
-            return subscription;
+            //we can't just return the passed in subscription as it is a new object
+            // and doesn't contain the stored state we need.
+            return _subscriptions.remove(subIndex);
         }
         else
         {
@@ -92,14 +88,11 @@
     }
 
     /**
-     * Return the next unsuspended subscription or null if not found.
-     * <p/>
-     * Performance note:
-     * This method can scan all items twice when looking for a subscription that is not
-     * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
-     * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
-     * race conditions and when subscriptions are removed between calls to nextSubscriber, the
-     * IndexOutOfBoundsException also causes the scan to start at the beginning.
+     * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all
+     * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions
+     * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed
+     * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
+     * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
      */
     public Subscription nextSubscriber(AMQMessage msg)
     {
@@ -156,9 +149,7 @@
         return null;
     }
 
-    /**
-     * Overridden in test classes.
-     */
+    /** Overridden in test classes. */
     protected void subscriberScanned()
     {
     }
@@ -199,8 +190,8 @@
     }
 
     /**
-     * Notification that a queue has been deleted. This is called so that the subscription can inform the
-     * channel, which in turn can update its list of unacknowledged messages.
+     * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which
+     * in turn can update its list of unacknowledged messages.
      *
      * @param queue
      */

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Feb  9 09:22:52 2007
@@ -805,42 +805,57 @@
         // this is set only here, and the before the consumer's onMessage is called it is set to false
         _inRecovery = true;
 
-        boolean isSuspended = isSuspended();
-
-        if (!isSuspended)
+        try
         {
-            try
+
+            boolean isSuspended = isSuspended();
+
+            if (!isSuspended)
             {
-                suspendChannel(true);
+                try
+                {
+                    suspendChannel(true);
+                }
+                catch (AMQException e)
+                {
+                    throw new JMSAMQException(e);
+                }
             }
-            catch (AMQException e)
+            for (BasicMessageConsumer consumer : _consumers.values())
             {
-                throw new JMSAMQException(e);
+                consumer.clearUnackedMessages();
             }
-        }
-        for (BasicMessageConsumer consumer : _consumers.values())
-        {
-            consumer.clearUnackedMessages();
-        }
-        
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                    (byte) 8, (byte) 0,    // AMQP version (major, minor)
-                                                                                    false));    // requeue
 
-        if (!isSuspended)
-        {
-            try
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+                                                                                       (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                                       false)    // requeue
+                    , BasicRecoverOkBody.class);
+
+
+            if (_dispatcher != null)
             {
-                suspendChannel(false);
+                _dispatcher.rollback();
             }
-            catch (AMQException e)
+                                   
+            if (!isSuspended)
             {
-                throw new JMSAMQException(e);
+                try
+                {
+                    suspendChannel(false);
+                }
+                catch (AMQException e)
+                {
+                    throw new JMSAMQException(e);
+                }
             }
         }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException(e);
+        }
 
     }
 
@@ -1873,9 +1888,17 @@
     public void confirmConsumerCancelled(String consumerTag)
     {
         BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
-        if ((consumer != null) && (consumer.isAutoClose()))
+        if (consumer != null)
         {
-            consumer.closeWhenNoMessages(true);
+            if (consumer.isAutoClose())
+            {
+                consumer.closeWhenNoMessages(true);
+            }
+            //fixme seems abit like a hack
+//            else
+//            {
+//                consumer.rollback();
+//            }
         }
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Fri Feb  9 09:22:52 2007
@@ -22,8 +22,11 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.message.AMQMessage;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.TxRollbackOkBody;
 import org.apache.qpid.url.URLSyntaxException;
 import org.apache.log4j.Logger;
 
@@ -34,6 +37,8 @@
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.TextMessage;
+import java.util.Set;
+import java.util.HashSet;
 
 /**
  * This class tests a number of commits and roll back scenarios
@@ -52,6 +57,7 @@
     Queue _jmsQueue;
 
     private static final Logger _logger = Logger.getLogger(CommitRollbackTest.class);
+    private final int ACK_MODE = Session.CLIENT_ACKNOWLEDGE;
 
     protected void setUp() throws Exception
     {
@@ -64,12 +70,12 @@
     {
         conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'");
 
-        _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        _session = conn.createSession(true, ACK_MODE);
 
         _jmsQueue = _session.createQueue(queue);
         _consumer = _session.createConsumer(_jmsQueue);
 
-        _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        _pubSession = conn.createSession(true, ACK_MODE);
 
         _publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
 
@@ -129,6 +135,7 @@
         _logger.info("receiving result");
         Message result = _consumer.receive(1000);
 
+        assertTrue("Redelivered not true", result.getJMSRedelivered());
         assertNull("test message was put and rolled back, but is still present", result);
     }
 
@@ -247,7 +254,7 @@
         assertTrue("session is not transacted", _pubSession.getTransacted());
 
         _logger.info("sending test message");
-        String MESSAGE_TEXT = "testGetThenDisconnect";
+        String MESSAGE_TEXT = "testGetThenRollback";
         _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
 
         _pubSession.commit();
@@ -258,6 +265,7 @@
 
         assertNotNull("retrieved message is null", msg);
         assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
+        assertTrue("Message redelivered not set", !msg.getJMSRedelivered());
 
         _logger.info("rolling back");
 
@@ -270,6 +278,7 @@
         _session.commit();
         assertNotNull("test message was consumed and rolled back, but is gone", result);
         assertEquals("test message was incorrect message", MESSAGE_TEXT, ((TextMessage) result).getText());
+        assertTrue("Message redelivered not set", result.getJMSRedelivered());
     }
 
 
@@ -296,6 +305,142 @@
         Message result = _consumer.receive(1000);
 
         assertNull("test message should be null", result);
+
+        _session.commit();
     }
 
+    /**
+     * Test that Closing a consumer and then connection while messags are being resent from a rolling back get correctly
+     * requeued a session purges the dispatcher queue, and the messages arrive in the correct order
+     *
+     * @throws Exception if something goes wrong
+     */
+    public void testRollbackWithConsumerConnectionClose() throws Exception
+    {
+        assertTrue("session is not transacted", _session.getTransacted());
+        assertTrue("session is not transacted", _pubSession.getTransacted());
+
+        _logger.info("sending two test messages");
+
+        int MESSAGE_TO_SEND = 1000;
+
+        for (int count = 0; count < MESSAGE_TO_SEND; count++)
+        {
+            _publisher.send(_pubSession.createTextMessage(String.valueOf(count)));
+        }
+
+        _pubSession.commit();
+
+        _logger.info("getting a few messages");
+
+        for (int count = 0; count < MESSAGE_TO_SEND / 2; count++)
+        {
+            assertEquals(String.valueOf(count), ((TextMessage) _consumer.receive(1000)).getText());
+        }
+
+
+        _logger.info("rolling back");
+        _session.rollback();
+
+        _logger.info("closing consumer");
+        _consumer.close();
+        _logger.info("closed consumer");
+
+        _logger.info("close connection");
+        conn.close();
+        _logger.info("closed connection");
+
+        newConnection();
+
+        _logger.info("getting all messages");
+
+        Set<String> results = new HashSet<String>();
+        for (int count = 0; count < MESSAGE_TO_SEND; count++)
+        {
+            TextMessage msg = ((TextMessage) _consumer.receive(1000));
+
+            assertNotNull("Message should not be null, count:" + count, msg);
+            String txt = msg.getText();
+            _logger.trace("Received msg:" + txt + ":" + ((AMQMessage) msg).getDeliveryTag());
+            results.add(txt);
+        }
+
+
+        Message result = _consumer.receive(1000);
+        assertNull("test message should be null", result);
+
+        assertEquals("All messages not received", MESSAGE_TO_SEND, results.size());
+
+        _session.commit();
+    }
+
+
+    /**
+     * Test that Closing a consumer and then session while messags are being resent from a rollback get correctly
+     * requeued, a session purges the dispatcher queue, and the messages arrive in the correct order
+     *
+     * @throws Exception if something goes wrong
+     */
+    public void testRollbackWithConsumerAndSessionClose() throws Exception
+    {
+        assertTrue("session is not transacted", _session.getTransacted());
+        assertTrue("session is not transacted", _pubSession.getTransacted());
+
+        _logger.info("sending two test messages");
+
+        int MESSAGE_TO_SEND = 1000;
+
+        for (int count = 0; count < MESSAGE_TO_SEND; count++)
+        {
+            _publisher.send(_pubSession.createTextMessage(String.valueOf(count)));
+        }
+
+        _pubSession.commit();
+
+        _logger.info("getting a few messages");
+
+        for (int count = 0; count < MESSAGE_TO_SEND / 2; count++)
+        {
+            assertEquals(String.valueOf(count), ((TextMessage) _consumer.receive(1000)).getText());
+        }
+
+
+        _logger.info("rolling back");
+        _session.rollback();
+
+        _logger.info("closing consumer");
+        _consumer.close();
+        _logger.info("closed consumer");
+
+        _logger.info("closing session");
+        _session.close();
+        _logger.info("closed session");
+
+        _session = conn.createSession(true, ACK_MODE);
+
+        _consumer = _session.createConsumer(_jmsQueue);
+
+        _logger.info("getting all messages");
+
+        Set<String> results = new HashSet<String>();
+        for (int count = 0; count < MESSAGE_TO_SEND; count++)
+        {
+            TextMessage msg = ((TextMessage) _consumer.receive(1000));
+
+            assertNotNull("Message should not be null, count:" + count, msg);
+            String txt = msg.getText();
+            _logger.trace("Received msg:" + txt + ":" + ((AMQMessage) msg).getDeliveryTag());
+            results.add(txt);
+        }
+
+
+        Message result = _consumer.receive(1000);
+        assertNull("test message should be null:" + result, result);
+
+        assertEquals("All messages not received", MESSAGE_TO_SEND, results.size());
+
+        _session.commit();
+
+
+    }   
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Fri Feb  9 09:22:52 2007
@@ -151,4 +151,9 @@
     {
         //no-op
     }
+
+    public Object sendlock()
+    {
+        return null;
+    }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Fri Feb  9 09:22:52 2007
@@ -121,6 +121,11 @@
         //no-op
     }
 
+    public Object sendlock()
+    {
+        return null;
+    }
+
     public int hashCode()
     {
         return key.hashCode();

Modified: incubator/qpid/branches/perftesting/qpid/specs/amqp.0-8.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/specs/amqp.0-8.xml?view=diff&rev=505383&r1=505382&r2=505383
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/specs/amqp.0-8.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/specs/amqp.0-8.xml Fri Feb  9 09:22:52 2007
@@ -169,6 +169,8 @@
     unacknowledged messages on a channel.
 
     2006-07-03 (PH) - cosmetic clean-up of Basic.Recover comments.
+
+    2006-07-09 (MR) - added Basic.RecoverOk so we know when the recover has been done.
 -->
 
 <amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80">
@@ -2521,7 +2523,16 @@
     The server MUST raise a channel exception if this is called on a 
     transacted channel.
   </doc>
-</method>
+    <response name="rollback-ok"/>
+  </method>
+  <method name="recover-ok" synchronous="1" index="101">
+	confirm a successful recover
+	<doc>
+	  This method confirms to the client that the recover succeeded.
+  	  Note that if an recover fails, the server raises a channel exception.
+    </doc>
+    <chassis name="client" implement="MUST"/>
+  </method>
 
 </class>