You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/03/06 15:12:49 UTC

svn commit: r515127 [1/2] - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/serv...

Author: ritchiem
Date: Tue Mar  6 06:12:47 2007
New Revision: 515127

URL: http://svn.apache.org/viewvc?view=rev&rev=515127
Log:
QPID-403 QPID-346 QPID-355 QPID-386 QPID-389 Updates to fix Transactional Rollback.
QPID-346  Message loss after rollback\recover  
QPID-355  Closing a consumer does not ensure messages delivery will stop for that subscription  
QPID-386  Updated Transactional Tests to cover underlying AMQP/Qpid state.  
QPID-389  Prefetched message are not correctly returned to the queue  
QPID-403  Implement Basic.Reject  

Broker
UnacknowledgedMessage - Added toString for debug
UnacknowledgedMessageMapImpl - Removed resendMessages method as all sending should go via DeliveryManager and Subscription.
AMQChannel - Updated resend and requeue methods so they do not directly write messages to a subscriber. This was violating the suspension state.
           - Used a local non-transactional context to requeue messages as the internal requeuing of messages on the broker should not be part of any client transaction.
           - Maked messages as resent.
           - Removed warnings from IDE about missing JavaDoc text etc.
BasicAckMethodHandler - Added debugging
BasicRecoverMethodHandler - Removed session from the resend call.
BasicRejectMethodHandler - Initial implementation. Hooks left for possible 'resend' bit.
ChannelCloseHandler - Fixed bug where channel wasn't marked as fully closed on reception of a close from the client.
TxRollbackHandler - Removed session from resend call.
AMQMinaProtocolSession - Fixed bug where channel was marked as awaiting closure before it had actually been closed. This causes problems as the close looks up the channel by id again which will return null after it has been marked as awaiting closure.
AMQMessage - Initial implementation of Rejection. Currently inactive in hasInterest() as we are miss-using reject to requeue prefetched messages from the client.
AMQQueue - Removed debug method as it made reading the log very difficult as all the logs had the same line number
ConcurrentSelectorDeliveryManager - Fixed clearAllMessages() as it didn't actually remove the messages.
           - Fixed bad logic in getNextMessage when using null subscriber. (as done by clearAllMessages)
           - Added more logging messages. Made more frequent logging a trace value.
           - Added debugIdentity() method to reduce over head in calculating standard log prefix.
           - Allowed messages to be added to the front of the queue.
           - Added currentStatus() to an overview of the queue's current state.
SubscriptionImpl - Updated to handle closure correctly (QPID-355)
    -Updated the deliver method so it doesn't use a try->finally to do msg.setDeliveredToConsumer() as this would be done even in the event of an error.
    - Created an additional logger to log suspension calls rather than through the SI logger which logs a lot of detail.


Client
pom.xml - Excluded older version of log4j that commons-collections exposes.
AMQSession - Added ability for dispatcher to start in stopped state.
  - Added dispatcher logger
  - Added checks around logging
  - Added message rejection if the dispatcher receives a message that it doesn't have a consumer for.
  - Updated message rejection to allow the dispatcher to perform the rejection if running this ensures that all queued messages are processed correctly and rejection occurs in order.
  - rollback() before calling rollback all pending queued messages must be rejected as rollback will clear unacked map which the rejects caused by rollback() will need.
  - fixed closedProducersAndConsumers so that it will rethrow any JMS Exception
  - recover() as for rollback() the rejects need to be done before the Recover Call to the broker.
  - Allowed delclareExchange to be done synchronously programatically
  - Updated confirmConsumerCancelled to use the dispatcher to perform the clean up. This required the creation of the dispatcher in stopped mode so that it does not start and message attempted to be delivered while the subscriber is being cancelled.
BasicMessageConsumer - Updated close not to perform the deregistration. This is done in via BasicCancelOkMethodHandler
  - Added guards on logging
  - Record all messages that have been received so they can be rejected if rollback occurs. so had to change impl of acknowledgeLastDelivered.
  - Updated Rollback to initially reject all received messages that are still unAcked.
  - Added a recursive call should the queue not be empty at the end of the rollback.. with a warning.
BasicCancelOkMethodHandler - White space changes to meet style guide. Added guard on logging.
UnprocessedMessage - White space changes to meet style guide. 
StateWaiter - Added comment about timeout bug.
FlowControllingBlockingQueue - Tidied imports
RecoverTest - Updated as declareExchange is now Synchronous
ChannelCloseTest - added guard on logging
MessageRequeueTest - Added to better test underlying AMQP/Qpid state QPID-386
StreamMessageTest - Updated as declareExchange is now Synchronous
CommitRollbackTest - added Additional test case to ensure prefetch queue is correctly purged.
TransactedTest - Added logging and additional tests.

Cluster
SimpleClusterTest - updated in line with AMQSession.delcareExchange changes

Common 
AMQConstant - Fixed error code 'not allowed' should be 530 not 507. 
ConcurrentLinkedMessageQueueAtomicSize -  Updated to beable to get the size of messages on the 'head' queue along with additional debug

Systests
ReturnUnroutableMandatoryMessageTest - Updated as declareExchange is now Synchronous

Added:
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/client/pom.xml
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
    incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Mar  6 06:12:47 2007
@@ -99,7 +99,7 @@
 
     private final MessageRouter _exchanges;
 
-    private TransactionalContext _txnContext;
+    private TransactionalContext _txnContext, _nonTransactedContext;
 
     /**
      * A context used by the message store enabling it to track context for a given channel even across thread
@@ -113,9 +113,9 @@
 
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
+    //Why do we need this reference ? - ritchiem
     private final AMQProtocolSession _session;
 
-
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
             throws AMQException
     {
@@ -210,9 +210,9 @@
         }
         else
         {
-            if (_log.isDebugEnabled())
+            if (_log.isTraceEnabled())
             {
-                _log.debug("Content header received on channel " + _channelId);
+                _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
             }
             _currentMessage.setContentHeaderBody(contentHeaderBody);
             routeCurrentMessage();
@@ -234,9 +234,9 @@
             throw new AMQException("Received content body without previously receiving a JmsPublishBody");
         }
 
-        if (_log.isDebugEnabled())
+        if (_log.isTraceEnabled())
         {
-            _log.debug("Content body received on channel " + _channelId);
+            _log.trace(debugIdentity() + "Content body received on channel " + _channelId);
         }
         try
         {
@@ -289,8 +289,10 @@
      * @param tag       the tag chosen by the client (if null, server will generate one)
      * @param queue     the queue to subscribe to
      * @param session   the protocol session of the subscriber
-     * @param noLocal
-     * @param exclusive
+     * @param noLocal   Flag stopping own messages being receivied.
+     * @param exclusive Flag requesting exclusive access to the queue
+     * @param acks      Are acks enabled for this subscriber
+     * @param filters   Filters to apply to this subscriber
      *
      * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
      *
@@ -327,6 +329,8 @@
     /**
      * Called from the protocol session to close this channel and clean up. T
      *
+     * @param session The session to close
+     *
      * @throws AMQException if there is an error during closure
      */
     public void close(AMQProtocolSession session) throws AMQException
@@ -352,10 +356,23 @@
      * @param message     the message that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
+     * @param consumerTag The tag for the consumer that is to acknowledge this message.
      * @param queue       the queue from which the message was delivered
      */
     public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
     {
+        if (_log.isDebugEnabled())
+        {
+            if (queue == null)
+            {
+                _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
+            }
+            else
+            {
+                _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity());
+            }
+        }
+
         synchronized (_unacknowledgedMessageMap.getLock())
         {
             _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
@@ -363,8 +380,15 @@
         }
     }
 
+    private final String id = "(" + System.identityHashCode(this) + ")";
+
+    public String debugIdentity()
+    {
+        return _channelId + id;
+    }
+
     /**
-     * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+     * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to
      * this same channel or to other subscribers.
      *
      * @throws org.apache.qpid.AMQException if the requeue fails
@@ -374,11 +398,22 @@
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
-        TransactionalContext nontransacted = null;
+        // Deliver these messages out of the transaction as their delivery was never
+        // part of the transaction only the receive.
+        TransactionalContext deliveryContext;
         if (!(_txnContext instanceof NonTransactionalContext))
         {
-            nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
-                                                        _returnMessages, _browsedAcks);
+            if (_nonTransactedContext == null)
+            {
+                _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+                                                                    _returnMessages, _browsedAcks);
+            }
+
+            deliveryContext = _nonTransactedContext;
+        }
+        else
+        {
+            deliveryContext = _txnContext;
         }
 
 
@@ -386,72 +421,130 @@
         {
             if (unacked.queue != null)
             {
-                // Deliver these messages out of the transaction as their delivery was never
-                // part of the transaction only the receive.
-                if (!(_txnContext instanceof NonTransactionalContext))
-                {
-                    nontransacted.deliver(unacked.message, unacked.queue, false);
-                }
-                else
-                {
-                    _txnContext.deliver(unacked.message, unacked.queue, false);
-                }
+                unacked.message.setRedelivered(true);
+
+                // Deliver Message
+                deliveryContext.deliver(unacked.message, unacked.queue, false);
+
+                // Should we allow access To the DM to directy deliver the message?
+                // As we don't need to check for Consumers or worry about incrementing the message count?
+//                unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
             }
         }
 
     }
 
+    /**
+     * Requeue a single message
+     *
+     * @param deliveryTag The message to requeue
+     *
+     * @throws AMQException If something goes wrong.
+     */
     public void requeue(long deliveryTag) throws AMQException
     {
         UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
         if (unacked != null)
         {
-            TransactionalContext nontransacted = null;
+
+            // Ensure message is released for redelivery
+            unacked.message.release();
+
+            // Mark message redelivered
+            unacked.message.setRedelivered(true);
+
+            // Deliver these messages out of the transaction as their delivery was never
+            // part of the transaction only the receive.
+            TransactionalContext deliveryContext;
             if (!(_txnContext instanceof NonTransactionalContext))
             {
-                nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
-                                                            _returnMessages, _browsedAcks);
+                if (_nonTransactedContext == null)
+                {
+                    _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+                                                                        _returnMessages, _browsedAcks);
+                }
+
+                deliveryContext = _nonTransactedContext;
+            }
+            else
+            {
+                deliveryContext = _txnContext;
             }
 
-            if (!(_txnContext instanceof NonTransactionalContext))
+
+            if (unacked.queue != null)
             {
-                nontransacted.deliver(unacked.message, unacked.queue, false);
+                //Redeliver the messages to the front of the queue
+                deliveryContext.deliver(unacked.message, unacked.queue, true);
+
+                unacked.message.decrementReference(_storeContext);
             }
             else
             {
-                _txnContext.deliver(unacked.message, unacked.queue, false);
+                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
+                          " but no queue defined and no DeadLetter queue so DROPPING message.");
+//                _log.error("Requested requeue of message:" + deliveryTag +
+//                           " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
+//
+//                deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
+//
+//                unacked.message.decrementReference(_storeContext);
             }
-            unacked.message.decrementReference(_storeContext);
         }
         else
         {
-            _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists");
+            _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
+
+            if (_log.isDebugEnabled())
+            {
+                _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+                {
+                    int count = 0;
+
+                    public boolean callback(UnacknowledgedMessage message) throws AMQException
+                    {
+                        _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
+                                   "[" + message.deliveryTag + "]");
+                        return false;  // Continue
+                    }
+
+                    public void visitComplete()
+                    {
+
+                    }
+                });
+            }
         }
 
 
     }
 
 
-    /** Called to resend all outstanding unacknowledged messages to this same channel.
-     * @param session the session
-     * @param requeue if true then requeue, else resend
-     * @throws org.apache.qpid.AMQException */
-    public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+    /**
+     * Called to resend all outstanding unacknowledged messages to this same channel.
+     *
+     * @param requeue Are the messages to be requeued or dropped.
+     *
+     * @throws AMQException When something goes wrong.
+     */
+    public void resend(final boolean requeue) throws AMQException
     {
         final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
         final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
 
         if (_log.isInfoEnabled())
         {
-            _log.info("unacked map contains " + _unacknowledgedMessageMap.size());
+            _log.info("unacked map Size:" + _unacknowledgedMessageMap.size());
         }
 
+        // Process the Unacked-Map.
+        // Marking messages who still have a consumer for to be resent
+        // and those that don't to be requeued.
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
         {
             public boolean callback(UnacknowledgedMessage message) throws AMQException
             {
-                long deliveryTag = message.deliveryTag;
                 AMQShortString consumerTag = message.consumerTag;
                 AMQMessage msg = message.message;
                 msg.setRedelivered(true);
@@ -503,14 +596,21 @@
         {
             if (!msgToResend.isEmpty())
             {
-                _log.info("Preparing (" + msgToResend.size() + ") message to resend to.");
+                _log.info("Preparing (" + msgToResend.size() + ") message to resend.");
+            }
+            else
+            {
+                _log.info("No message to resend.");
             }
         }
         for (UnacknowledgedMessage message : msgToResend)
         {
             AMQMessage msg = message.message;
 
-            // Our Java Client will always suspend the channel when resending!!
+            // Our Java Client will always suspend the channel when resending!
+            // If the client has requested the messages be resent then it is
+            // their responsibility to ensure that thay are capable of receiving them
+            // i.e. The channel hasn't been server side suspended.
 //            if (isSuspended())
 //            {
 //                _log.info("Channel is suspended so requeuing");
@@ -518,50 +618,58 @@
 //                msgToRequeue.add(message);
 //            }
 //            else
-            {
-                //release to allow it to be delivered
-                msg.release();
+//            {
+            //release to allow it to be delivered
+            msg.release();
 
-                // Without any details from the client about what has been processed we have to mark
-                // all messages in the unacked map as redelivered.
-                msg.setRedelivered(true);
+            // Without any details from the client about what has been processed we have to mark
+            // all messages in the unacked map as redelivered.
+            msg.setRedelivered(true);
 
 
-                Subscription sub = msg.getDeliveredSubscription();
+            Subscription sub = msg.getDeliveredSubscription();
 
-                if (sub != null)
+            if (sub != null)
+            {
+                // Get the lock so we can tell if the sub scription has closed.
+                // will stop delivery to this subscription until the lock is released.
+                // note: this approach would allow the use of a single queue if the
+                // PreDeliveryQueue would allow head additions.
+                // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute..
+                // needs guidance from AMQP WG Model SIG
+                synchronized (sub.getSendLock())
                 {
-                    synchronized (sub.getSendLock())
+                    if (sub.isClosed())
                     {
-                        if (sub.isClosed())
+                        if (_log.isDebugEnabled())
                         {
-                            _log.info("Subscription closed during resend so requeuing message");
-                            //move this message to requeue
-                            msgToRequeue.add(message);
+                            _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message");
                         }
-                        else
+                        //move this message to requeue
+                        msgToRequeue.add(message);
+                    }
+                    else
+                    {
+                        if (_log.isDebugEnabled())
                         {
-                            if (_log.isDebugEnabled())
-                            {
-                                _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
-                            }
-                            // Will throw an exception if the sub is closed
-                            sub.addToResendQueue(msg);
-                            _unacknowledgedMessageMap.remove(message.deliveryTag);
-                            // Don't decrement as we are bypassing the normal deliver which increments
-                            // this is what there is a decrement on the Requeue as deliver will increment.
-                            // msg.decrementReference(_storeContext);
+                            _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub));
                         }
+                        sub.addToResendQueue(msg);
+                        _unacknowledgedMessageMap.remove(message.deliveryTag);
+                        // Don't decrement as we are bypassing the normal deliver which increments
+                        // this is why there is a decrement on the Requeue as deliver will increment.
+                        // msg.decrementReference(_storeContext);
                     }
-                }
-                else
-                {
-                    _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
-                    //move this message to requeue
-                    msgToRequeue.add(message);
-                }
+                } // sync(sub.getSendLock)
             }
-        }
+            else
+            {
+                _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+                //move this message to requeue
+                msgToRequeue.add(message);
+            }
+        } // for all messages
+//        } else !isSuspend
 
         if (_log.isInfoEnabled())
         {
@@ -571,26 +679,31 @@
             }
         }
 
-        TransactionalContext nontransacted = null;
+        // Deliver these messages out of the transaction as their delivery was never
+        // part of the transaction only the receive.
+        TransactionalContext deliveryContext;
         if (!(_txnContext instanceof NonTransactionalContext))
         {
-            nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
-                                                        _returnMessages, _browsedAcks);
+            if (_nonTransactedContext == null)
+            {
+                _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+                                                                    _returnMessages, _browsedAcks);
+            }
+
+            deliveryContext = _nonTransactedContext;
+        }
+        else
+        {
+            deliveryContext = _txnContext;
         }
 
         // Process Messages to Requeue at the front of the queue
         for (UnacknowledgedMessage message : msgToRequeue)
         {
-            // Deliver these messages out of the transaction as their delivery was never
-            // part of the transaction only the receive.
-            if (!(_txnContext instanceof NonTransactionalContext))
-            {
-                nontransacted.deliver(message.message, message.queue, true);
-            }
-            else
-            {
-                _txnContext.deliver(message.message, message.queue, true);
-            }
+            message.message.release();
+            message.message.setRedelivered(true);
+
+            deliveryContext.deliver(message.message, message.queue, true);
 
             _unacknowledgedMessageMap.remove(message.deliveryTag);
             message.message.decrementReference(_storeContext);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Tue Mar  6 06:12:47 2007
@@ -42,6 +42,21 @@
         message.incrementReference();
     }
 
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Q:");
+        sb.append(queue);
+        sb.append(" M:");
+        sb.append(message);
+        sb.append(" CT:");
+        sb.append(consumerTag);
+        sb.append(" DT:");
+        sb.append(deliveryTag);
+
+        return sb.toString();
+    }
+
     public void discard(StoreContext storeContext) throws AMQException
     {
         if (queue != null)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Tue Mar  6 06:12:47 2007
@@ -196,25 +196,7 @@
             }
         }
     }
-
-    public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException
-    {
-        synchronized (_lock)
-        {
-            for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
-            {
-                long deliveryTag = entry.getKey();
-                AMQShortString consumerTag = entry.getValue().consumerTag;
-                AMQMessage msg = entry.getValue().message;
-
-                if(consumerTag != null)
-                {
-                    protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag);
-                }
-            }
-        }
-    }
-
+    
     public UnacknowledgedMessage get(long key)
     {
         synchronized (_lock)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java Tue Mar  6 06:12:47 2007
@@ -47,12 +47,13 @@
     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException
     {
         AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+        BasicAckBody body = evt.getMethod();
 
         if (_log.isDebugEnabled())
         {
-            _log.debug("Ack received on channel " + evt.getChannelId());
+            _log.debug("Ack(Tag:" + body.deliveryTag + ":Mult:" + body.multiple + ") received on channel " + evt.getChannelId());
         }
-        BasicAckBody body = evt.getMethod();
+
         final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
 
         if (channel == null)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java Tue Mar  6 06:12:47 2007
@@ -54,7 +54,7 @@
             throw body.getChannelNotFoundException(evt.getChannelId());
         }
 
-        channel.resend(session, body.requeue);
+        channel.resend(body.requeue);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Mar  6 06:12:47 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicRejectBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -49,20 +50,67 @@
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
-        _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
-
         int channelId = evt.getChannelId();
-        UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
-
-        _logger.info("Need to reject message:" + message);
-//        if (evt.getMethod().requeue)
-//        {
-//          session.getChannel(channelId).requeue(evt.getMethod().deliveryTag);
-//        }
-//        else
-//       {
-//           // session.getChannel(channelId).resend(message);
-//       }
 
+        if (_logger.isTraceEnabled())
+        {
+            _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+                          ": Requeue:" + evt.getMethod().requeue +
+//                              ": Resend:" + evt.getMethod().resend +                          
+                          " on channel:" + channelId);
+        }
+
+        AMQChannel channel = session.getChannel(channelId);
+
+        if (channel == null)
+        {
+            throw evt.getMethod().getChannelNotFoundException(channelId);
+        }
+
+        if (_logger.isTraceEnabled())
+        {
+            _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+                          ": Requeue:" + evt.getMethod().requeue +
+//                              ": Resend:" + evt.getMethod().resend +
+                          " on channel:" + channel.debugIdentity());
+        }
+
+        long deliveryTag = evt.getMethod().deliveryTag;
+
+        UnacknowledgedMessage message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+        if (message == null)
+        {
+            _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+//            throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
+        }
+        else
+        {
+
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() +
+                              ": Requeue:" + evt.getMethod().requeue +
+//                              ": Resend:" + evt.getMethod().resend +
+                              " on channel:" + channel.debugIdentity());
+            }
+
+            // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
+//            if (!evt.getMethod().resend)
+            {
+                message.message.reject(message.message.getDeliveredSubscription());
+            }
+
+            if (evt.getMethod().requeue)
+            {
+                channel.requeue(deliveryTag);
+            }
+            else
+            {
+                _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
+//                message.queue = channel.getDefaultDeadLetterQueue();
+//                channel.requeue(deliveryTag);
+            }
+        }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Tue Mar  6 06:12:47 2007
@@ -63,6 +63,8 @@
         }
 
         session.closeChannel(channelId);
+        // Client requested closure so we don't wait for ok we send it
+        stateManager.getProtocolSession().closeChannelOk(channelId);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Tue Mar  6 06:12:47 2007
@@ -63,7 +63,7 @@
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             // Why, are we not allowed to send messages back to client before the ok method?
-            channel.resend(session, false);
+            channel.resend(false);
         }
         catch (AMQException e)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Mar  6 06:12:47 2007
@@ -523,8 +523,8 @@
         {
             try
             {
-                markChannelawaitingCloseOk(channelId);
                 channel.close(this);
+                markChannelawaitingCloseOk(channelId);
             }
             finally
             {
@@ -546,7 +546,7 @@
     /**
      * In our current implementation this is used by the clustering code.
      *
-     * @param channelId
+     * @param channelId The channel to remove
      */
     public void removeChannel(int channelId)
     {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Mar  6 06:12:47 2007
@@ -85,11 +85,18 @@
 
     private Subscription _takenBySubcription;
 
+    private Set<Subscription> _rejectedBy = null;
+
     public boolean isTaken()
     {
         return _taken.get();
     }
 
+    public String debugIdentity()
+    {
+        return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")";
+    }
+
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
      * therefore is memory-efficient.
@@ -199,7 +206,7 @@
         _taken = new AtomicBoolean(false);
         if (_log.isDebugEnabled())
         {
-            _log.debug("Message created with id " + messageId);
+            _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId);
         }
     }
 
@@ -452,7 +459,9 @@
 
     public void release()
     {
+        _log.trace("Releasing Message:" + debugIdentity());
         _taken.set(false);
+        _takenBySubcription = null;
     }
 
     public boolean checkToken(Object token)
@@ -511,7 +520,7 @@
      * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
      *                              to a consumer
      */
-    public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
+    public void checkDeliveredToConsumer() throws NoConsumersException
     {
 
         if (_immediate && !_deliveredToConsumer)
@@ -580,7 +589,8 @@
 
             for (AMQQueue q : destinationQueues)
             {
-                _txnContext.deliver(this, q, true);
+                //normal deliver so add this message at the end.
+                _txnContext.deliver(this, q, false);
             }
         }
         finally
@@ -801,12 +811,43 @@
 
     public String toString()
     {
-        return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
                _taken + " by:" + _takenBySubcription;
     }
 
     public Subscription getDeliveredSubscription()
     {
         return _takenBySubcription;
+    }
+
+    public void reject(Subscription subscription)
+    {
+        if (subscription != null)
+        {
+            if (_rejectedBy == null)
+            {
+                _rejectedBy = new HashSet<Subscription>();
+            }
+
+            _rejectedBy.add(subscription);
+        }
+        else
+        {
+            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+        }
+    }
+
+    public boolean isRejectedBy(Subscription subscription)
+    {
+        boolean rejected = _rejectedBy != null;
+
+        if (rejected)  // We have subscriptions that rejected this message
+        {
+            return _rejectedBy.contains(subscription);
+        }
+        else // This messasge hasn't been rejected yet.
+        {
+            return rejected;
+        }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Mar  6 06:12:47 2007
@@ -50,6 +50,7 @@
  */
 public class AMQQueue implements Managable, Comparable
 {
+
     public static final class ExistingExclusiveSubscription extends AMQException
     {
 
@@ -446,7 +447,11 @@
             setExclusive(true);
         }
 
-        debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " +
+                                               "consumer tag {2} with {3}", ps, channel, consumerTag, this));
+        }
 
         Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
                                                                             filters, noLocal, this);
@@ -486,8 +491,11 @@
 
     public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
     {
-        debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
-              this);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
+                                               this));
+        }
 
         Subscription removedSubscription;
         if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
@@ -506,6 +514,10 @@
         // if we are eligible for auto deletion, unregister from the queue registry
         if (_autoDelete && _subscribers.isEmpty())
         {
+            if (_logger.isInfoEnabled())
+            {
+                _logger.warn("Auto-deleteing queue:" + this);
+            }
             autodelete();
             // we need to manually fire the event to the removed subscription (which was the last one left for this
             // queue. This is because the delete method uses the subscription set which has just been cleared
@@ -561,14 +573,18 @@
 
     protected void autodelete() throws AMQException
     {
-        debug("autodeleting {0}", this);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format("autodeleting {0}", this));
+        }
         delete();
     }
 
-    public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
+    public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
     {
         //fixme not sure what this is doing. should we be passing deliverFirst through here?
-        _deliveryMgr.deliver(storeContext, getName(), msg, false);
+        // This code is not used so when it is perhaps it should
+        _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
         try
         {
             msg.checkDeliveredToConsumer();
@@ -582,6 +598,10 @@
         }
     }
 
+//    public DeliveryManager getDeliveryManager()
+//    {
+//        return _deliveryMgr;
+//    }
 
     public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
     {
@@ -671,14 +691,6 @@
     public String toString()
     {
         return "Queue(" + _name + ")@" + System.identityHashCode(this);
-    }
-
-    private void debug(String msg, Object... args)
-    {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug(MessageFormat.format(msg, args));
-        }
     }
 
     public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Mar  6 06:12:47 2007
@@ -45,7 +45,6 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.util.MessageQueue;
 import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 
 
 /** Manages delivery of messages on behalf of a queue */
@@ -86,6 +85,8 @@
     private AtomicLong _totalMessageSize = new AtomicLong();
     private AtomicInteger _extraMessages = new AtomicInteger();
     private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
+    private final Object _queueHeadLock = new Object();
+    private String _processingThreadName = "";
 
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
@@ -118,7 +119,10 @@
 
         if (deliverFirst)
         {
-            _messages.pushHead(msg);
+            synchronized (_queueHeadLock)
+            {
+                _messages.pushHead(msg);
+            }
         }
         else
         {
@@ -367,16 +371,19 @@
         long count = 0;
         _lock.lock();
 
-        AMQMessage msg = getNextMessage();
-        while (msg != null)
+        synchronized (_queueHeadLock)
         {
-            //mark this message as taken and get it removed
-            msg.taken(null);
-            _queue.dequeue(storeContext, msg);
-            msg = getNextMessage();
-            count++;
-        }
+            AMQMessage msg = getNextMessage();
+            while (msg != null)
+            {
+                //and remove it
+                _messages.poll();
 
+                _queue.dequeue(storeContext, msg);
+                msg = getNextMessage();
+                count++;
+            }
+        }
         _lock.unlock();
         return count;
     }
@@ -390,12 +397,20 @@
     {
         AMQMessage message = messages.peek();
 
-
-        while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
+        while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub)))
         {
             //remove the already taken message
-            messages.poll();
+            AMQMessage removed = messages.poll();
+
+            assert removed == message;
+            
             _totalMessageSize.addAndGet(-message.getSize());
+
+            if (_log.isTraceEnabled())
+            {
+                _log.trace("Removed taken message:" + message.debugIdentity());
+            }
+
             // try the next message
             message = messages.peek();
         }
@@ -409,7 +424,7 @@
 
         if (_log.isTraceEnabled())
         {
-            _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+            _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
                        ") from queue (" + System.identityHashCode(messageQueue) +
                        ") AMQQueue (" + System.identityHashCode(queue) + ")");
         }
@@ -417,46 +432,63 @@
         if (messageQueue == null)
         {
             // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
-            if (_log.isDebugEnabled())
+            if (_log.isInfoEnabled())
             {
-                _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+                _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue);
             }
             return;
         }
 
         AMQMessage message = null;
+        AMQMessage removed = null;
         try
         {
-            message = getNextMessage(messageQueue, sub);
-
-            // message will be null if we have no messages in the messageQueue.
-            if (message == null)
+            synchronized (_queueHeadLock)
             {
-                if (_log.isTraceEnabled())
+                message = getNextMessage(messageQueue, sub);
+
+                // message will be null if we have no messages in the messageQueue.
+                if (message == null)
                 {
-                    _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+                    if (_log.isTraceEnabled())
+                    {
+                        _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+                    }
+                    return;
+                }
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) +
+                               ") by :" + System.identityHashCode(this) +
+                               ") to :" + System.identityHashCode(sub));
                 }
-                return;
+
+                sub.send(message, _queue);
+
+                //remove sent message from our queue.
+                removed = messageQueue.poll();
+                //If we don't remove the message from _messages
+                // Otherwise the Async send will never end
             }
+
+            if (removed != message)
+            {
+                _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+            }
+
             if (_log.isDebugEnabled())
             {
-                _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+                _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() +
                            ") by :" + System.identityHashCode(this) +
                            ") to :" + System.identityHashCode(sub));
             }
 
-            sub.send(message, _queue);
-
-            //remove sent message from our queue.
-            messageQueue.poll();
-            //If we don't remove the message from _messages
-            // Otherwise the Async send will never end
 
             if (messageQueue == sub.getResendQueue())
             {
                 if (_log.isTraceEnabled())
                 {
-                    _log.trace("All messages sent from resendQueue for " + sub);
+                    _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub);
                 }
                 if (messageQueue.isEmpty())
                 {
@@ -469,7 +501,10 @@
             }
             else if (messageQueue == sub.getPreDeliveryQueue())
             {
-                _log.info("We could do clean up of the main _message queue here");
+                if (_log.isInfoEnabled())
+                {
+                    _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+                }
             }
 
             _totalMessageSize.addAndGet(-message.getSize());
@@ -477,7 +512,7 @@
         catch (AMQException e)
         {
             message.release();
-            _log.error("Unable to deliver message as dequeue failed: " + e, e);
+            _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
         }
     }
 
@@ -516,6 +551,12 @@
      */
     private void processQueue()
     {
+        //record thread name
+        if (_log.isDebugEnabled())
+        {
+            _processingThreadName = Thread.currentThread().getName();
+        }
+
         // Continue to process delivery while we haveSubscribers and messages
         boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
 
@@ -561,9 +602,10 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
+            _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
         }
-        msg.release();
+        // This shouldn't be done here.
+//        msg.release();
 
         //Check if we have someone to deliver the message to.
         _lock.lock();
@@ -575,7 +617,7 @@
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+                    _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
                 }
                 if (!msg.getMessagePublishInfo().isImmediate())
                 {
@@ -587,7 +629,7 @@
                     //Pre Deliver to all subscriptions
                     if (_log.isDebugEnabled())
                     {
-                        _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+                        _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() +
                                    " subscribers to give the message to:" + currentStatus());
                     }
                     for (Subscription sub : _subscriptions.getSubscriptions())
@@ -598,7 +640,7 @@
                         {
                             if (_log.isDebugEnabled())
                             {
-                                _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+                                _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
                                            ") is already delivered.");
                             }
                             continue;
@@ -609,7 +651,7 @@
                         {
                             if (_log.isDebugEnabled())
                             {
-                                _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+                                _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
                                            ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
                             }
                             sub.enqueueForPreDelivery(msg, deliverFirst);
@@ -625,9 +667,9 @@
                 {
                     if (!s.isSuspended())
                     {
-                        if (_log.isDebugEnabled())
+                        if (_log.isTraceEnabled())
                         {
-                            _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+                            _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
                         msg.taken(s);
@@ -636,33 +678,35 @@
                     }
                     else
                     {
-                        if (_log.isDebugEnabled())
+                        if (_log.isInfoEnabled())
                         {
-                            _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+                            _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
+                                      "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
                         }
                     }
+                }
 
-                    if (!msg.isTaken())
+                if (!msg.isTaken())
+                {
+                    if (_log.isInfoEnabled())
                     {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
-                                       " Subscriber:" + System.identityHashCode(s));
-                        }
-
-                        deliver(context, name, msg, deliverFirst);
+                        _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+                                  " Subscriber:" + System.identityHashCode(s));
                     }
-                    else
+
+                    deliver(context, name, msg, deliverFirst);
+                }
+                else
+                {
+                    if (_log.isDebugEnabled())
                     {
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug(id() + " Message(" + System.identityHashCode(msg) +
-                                       ") has been taken so disregarding deliver request to Subscriber:" +
-                                       System.identityHashCode(s));
-                        }
+                        _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() +
+                                   ") has been taken so disregarding deliver request to Subscriber:" +
+                                   System.identityHashCode(s));
                     }
                 }
             }
+
         }
         finally
         {
@@ -674,10 +718,9 @@
         }
     }
 
-    //fixme remove
     private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
 
-    private String id()
+    private String debugIdentity()
     {
         return id;
     }
@@ -710,7 +753,7 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug("Processing Async." + currentStatus());
+            _log.debug(debugIdentity() + "Processing Async." + currentStatus());
         }
 
         if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -725,14 +768,12 @@
 
     private String currentStatus()
     {
-        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
-               "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
+               "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
                " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
                "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
                " Active:" + _subscriptions.hasActiveSubscribers() +
-               " Processing:" + _processing.get() +
-               " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
-               "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
+               " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false");
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Mar  6 06:12:47 2007
@@ -46,6 +46,8 @@
  */
 public class SubscriptionImpl implements Subscription
 {
+
+    private static final Logger _suspensionlogger = Logger.getLogger("Suspension");
     private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
 
     public final AMQChannel channel;
@@ -258,6 +260,12 @@
             {
                 channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
             }
+
+            if (_sendLock.get())
+            {
+                _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+            }
+
             protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
         }
     }
@@ -265,56 +273,56 @@
     private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
             throws AMQException
     {
-        try
-        {
-            // if we do not need to wait for client acknowledgements
-            // we can decrement the reference count immediately.
-
-            // By doing this _before_ the send we ensure that it
-            // doesn't get sent if it can't be dequeued, preventing
-            // duplicate delivery on recovery.
+        // if we do not need to wait for client acknowledgements
+        // we can decrement the reference count immediately.
 
-            // The send may of course still fail, in which case, as
-            // the message is unacked, it will be lost.
-            if (!_acks)
+        // By doing this _before_ the send we ensure that it
+        // doesn't get sent if it can't be dequeued, preventing
+        // duplicate delivery on recovery.
+
+        // The send may of course still fail, in which case, as
+        // the message is unacked, it will be lost.
+        if (!_acks)
+        {
+            if (_logger.isDebugEnabled())
             {
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
-                }
-                queue.dequeue(storeContext, msg);
+                _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
             }
-            synchronized (channel)
-            {
-                long deliveryTag = channel.getNextDeliveryTag();
-
-                if (_acks)
-                {
-                    channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
-                    msg.decrementReference(storeContext);
-                }
+            queue.dequeue(storeContext, msg);
+        }
+        synchronized (channel)
+        {
+            long deliveryTag = channel.getNextDeliveryTag();
 
-                protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+            if (_sendLock.get())
+            {
+                _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+            }
 
+            if (_acks)
+            {
+                channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+                msg.decrementReference(storeContext);
             }
-        }
-        finally
-        {
+
+            protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+            //Only set delivered if it actually was writen successfully..
+            // using a try->finally would set it even if an error occured. 
             msg.setDeliveredToConsumer();
         }
     }
 
     public boolean isSuspended()
     {
-        if (_logger.isTraceEnabled())
+        if (_suspensionlogger.isInfoEnabled())
         {
             if (channel.isSuspended())
             {
-                _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+                _suspensionlogger.info("Subscription(" + debugIdentity() + ") channel's is susupended");
             }
             if (_sendLock.get())
             {
-                _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+                _suspensionlogger.info("Subscription(" + debugIdentity() + ") has sendLock set so closing.");
             }
         }
         return channel.isSuspended() || _sendLock.get();
@@ -323,7 +331,7 @@
     /**
      * Callback indicating that a queue has been deleted.
      *
-     * @param queue
+     * @param queue The queue to delete
      */
     public void queueDeleted(AMQQueue queue) throws AMQException
     {
@@ -337,9 +345,18 @@
 
     public boolean hasInterest(AMQMessage msg)
     {
+        //check that the message hasn't been rejected
+        if (msg.isRejectedBy(this))
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+            }
+//            return false;
+        }
+
         if (_noLocal)
         {
-            boolean isLocal;
             // We don't want local messages so check to see if message is one we sent
             Object localInstance;
             Object msgInstance;
@@ -350,12 +367,12 @@
                 if ((msg.getPublisher().getClientProperties() != null) &&
                     (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
                 {
-                    if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+                    if (localInstance == msgInstance || localInstance.equals(msgInstance))
                     {
                         if (_logger.isTraceEnabled())
                         {
-                            _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
-                                          System.identityHashCode(msg) + ")");
+                            _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+                                          msg.debugIdentity() + ")");
                         }
                         return false;
                     }
@@ -369,8 +386,8 @@
                 {
                     if (_logger.isTraceEnabled())
                     {
-                        _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
-                                      System.identityHashCode(msg) + ")");
+                        _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+                                      msg.debugIdentity() + ")");
                     }
                     return false;
                 }
@@ -383,19 +400,26 @@
 
         if (_logger.isTraceEnabled())
         {
-            _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+            _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
         }
         return checkFilters(msg);
 
     }
 
+    private String id = String.valueOf(System.identityHashCode(this));
+
+    private String debugIdentity()
+    {
+        return id;
+    }
+
     private boolean checkFilters(AMQMessage msg)
     {
         if (_filters != null)
         {
             if (_logger.isTraceEnabled())
             {
-                _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+                _logger.trace("(" + debugIdentity() + ") has filters.");
             }
             return _filters.allAllow(msg);
         }
@@ -403,7 +427,7 @@
         {
             if (_logger.isTraceEnabled())
             {
-                _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+                _logger.trace("(" + debugIdentity() + ") has no filters");
             }
 
             return true;
@@ -445,15 +469,19 @@
             }
 
             _sendLock.set(true);
-
         }
+
         if (_logger.isInfoEnabled())
         {
-            _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+            _logger.info("Closing subscription (" + debugIdentity() + "):" + this);
         }
 
         if (_resendQueue != null && !_resendQueue.isEmpty())
         {
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Requeuing closing subscription (" + debugIdentity() + "):" + this);
+            }
             requeue();
         }
 
@@ -486,6 +514,11 @@
             {
                 AMQMessage resent = _resendQueue.poll();
 
+                if (_logger.isTraceEnabled())
+                {
+                    _logger.trace("Removed for resending:" + resent.debugIdentity());
+                }
+
                 resent.release();
                 _queue.subscriberHasPendingResend(false, this, resent);
 
@@ -495,7 +528,7 @@
                 }
                 catch (AMQException e)
                 {
-                    _logger.error("Unable to re-deliver messages", e);
+                    _logger.error("MESSAGE LOSS : Unable to re-deliver messages", e);
                 }
             }
 

Modified: incubator/qpid/trunk/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/pom.xml?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/client/pom.xml Tue Mar  6 06:12:47 2007
@@ -55,6 +55,13 @@
         <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
+            <!-- commons collection exports log4j v1.2.7 which doesn't have trace()-->
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>