You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/03/13 11:35:47 UTC

svn commit: r517638 [1/2] - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/serv...

Author: ritchiem
Date: Tue Mar 13 03:35:42 2007
New Revision: 517638

URL: http://svn.apache.org/viewvc?view=rev&rev=517638
Log:
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346  Message loss after rollback\recover 
QPID-386  Updated Transactional Tests to cover underlying AMQP/Qpid state.
QPID-403  Implement Basic.Reject
QPID-410  Queue Browsers should use not acknowledge messages.  
-------------------------------------
Broker
TxAck - Added comment and fixed white space
UnacknowledgedMessage - Added comment for messageDecrement
AMQChannel - Added extra debugging.
 + Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction.
 + Updated message reference counting. So it is in terms of queues don't increment when giving to client.
BasicCancelMethodHandler - Added Debug log.
BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging.
BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging
AMQPFastProtocolHandler - moved error log to before session.write
AMQMessage - Added additional debug via debugIdentity() and comments
AMQQueue - Decoupled reference counting from dequeue operation.
ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging
SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer.
 + On Close ensured that it is only called once.
 + Had problem where closing browser was causing two CancelOk frames to be sent back to client.
RequiredDeliveryException - Added comment to explain incrementReference
LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here.
NonTransactionalContext - Removed incrementReference on deliver 
 + - Fixed bug where browsers - acks would cause messages to be discarded.  new JIRA this needs tidied up.
TxnBuffer - Added debug logging.

Client
------
AMQQueueBrowser - Added comments
AMQSession - Added comments and debug
 + Updated to cause closed consumer to reject messages rather than receive them.
 + Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback

BasicMessageConsumer - Added trace level debuging on close calls 
+ Forced noConsume-rs to use NO_ACK
+ added more logging
Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first.
ChannelCloseOkMethodHandler - updated comment
AMQProtocolSession - Update comments,whitespace
TransportConnection - removed static block
FlowControllingBlockingQueue - Added isEmpty() Method
PropertyValueTest - Added VM Broker setup
+ Updated test to run once and 50 times to pull out delivery tag problems that were occuring.
+ Adjusted logging level to be more helpful. moved some info down to trace and debug.
MessageRequeueTest - Moved QpidClientConnection its own file.
 + Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1.
ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator().

Added QueueBrowserTest to system tests to test QueueBrowsering.

Added:
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java   (with props)
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java   (with props)
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Mar 13 03:35:42 2007
@@ -319,6 +319,25 @@
 
     public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
     {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
+            _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+            {
+
+                public boolean callback(UnacknowledgedMessage message) throws AMQException
+                {
+                    _log.debug(message);
+
+                    return true;
+                }
+
+                public void visitComplete()
+                {
+                }
+            });
+        }
+
         AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
         if (q != null)
         {
@@ -342,9 +361,23 @@
 
     private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
     {
-        _log.info("Unsubscribing all consumers on channel " + toString());
+        if (_log.isInfoEnabled())
+        {
+            if (!_consumerTag2QueueMap.isEmpty())
+            {
+                _log.info("Unsubscribing all consumers on channel " + toString());
+            }
+            else
+            {
+                _log.info("No consumers to unsubscribe on channel " + toString());
+            }
+        }
         for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
         {
+            if (_log.isInfoEnabled())
+            {
+                _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+            }
             me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
         }
         _consumerTag2QueueMap.clear();
@@ -369,7 +402,11 @@
             }
             else
             {
-                _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity());
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
+                               ") with a queue(" + queue + ") for " + consumerTag);
+                }
             }
         }
 
@@ -395,25 +432,38 @@
      */
     public void requeue() throws AMQException
     {
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Requeuing for " + toString());
+        }
+
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
+        if (_log.isDebugEnabled())
+        {
+            _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages.");
+        }
         // Deliver these messages out of the transaction as their delivery was never
         // part of the transaction only the receive.
-        TransactionalContext deliveryContext;
-        if (!(_txnContext instanceof NonTransactionalContext))
+        TransactionalContext deliveryContext = null;
+
+        if (!messagesToBeDelivered.isEmpty())
         {
-            if (_nonTransactedContext == null)
+            if (!(_txnContext instanceof NonTransactionalContext))
             {
-                _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
-                                                                    _returnMessages, _browsedAcks);
-            }
+//                if (_nonTransactedContext == null)
+                {
+                    _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+                                                                        _returnMessages, _browsedAcks);
+                }
 
-            deliveryContext = _nonTransactedContext;
-        }
-        else
-        {
-            deliveryContext = _txnContext;
+                deliveryContext = _nonTransactedContext;
+            }
+            else
+            {
+                deliveryContext = _txnContext;
+            }
         }
 
 
@@ -421,6 +471,10 @@
         {
             if (unacked.queue != null)
             {
+                // Ensure message is released for redelivery
+                unacked.message.release();
+
+                // Mark message redelivered
                 unacked.message.setRedelivered(true);
 
                 // Deliver Message
@@ -459,7 +513,7 @@
             TransactionalContext deliveryContext;
             if (!(_txnContext instanceof NonTransactionalContext))
             {
-                if (_nonTransactedContext == null)
+//                if (_nonTransactedContext == null)
                 {
                     _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
                                                                         _returnMessages, _browsedAcks);
@@ -472,13 +526,12 @@
                 deliveryContext = _txnContext;
             }
 
-
             if (unacked.queue != null)
             {
                 //Redeliver the messages to the front of the queue
                 deliveryContext.deliver(unacked.message, unacked.queue, true);
-
-                unacked.message.decrementReference(_storeContext);
+                //Deliver increments the message count but we have already deliverted this once so don't increment it again
+                // this was because deliver did an increment changed this.
             }
             else
             {
@@ -489,7 +542,6 @@
 //
 //                deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
 //
-//                unacked.message.decrementReference(_storeContext);
             }
         }
         else
@@ -656,15 +708,16 @@
                         }
                         sub.addToResendQueue(msg);
                         _unacknowledgedMessageMap.remove(message.deliveryTag);
-                        // Don't decrement as we are bypassing the normal deliver which increments
-                        // this is why there is a decrement on the Requeue as deliver will increment.
-                        // msg.decrementReference(_storeContext);
                     }
                 } // sync(sub.getSendLock)
             }
             else
             {
-                _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+
+                if (_log.isInfoEnabled())
+                {
+                    _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss");
+                }
                 //move this message to requeue
                 msgToRequeue.add(message);
             }
@@ -706,7 +759,6 @@
             deliveryContext.deliver(message.message, message.queue, true);
 
             _unacknowledgedMessageMap.remove(message.deliveryTag);
-            message.message.decrementReference(_storeContext);
         }
     }
 
@@ -760,8 +812,18 @@
     {
         synchronized (_unacknowledgedMessageMap.getLock())
         {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
+            }
+
             _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
             checkSuspension();
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
+            }
+
         }
     }
 
@@ -773,12 +835,6 @@
     public UnacknowledgedMessageMap getUnacknowledgedMessageMap()
     {
         return _unacknowledgedMessageMap;
-    }
-
-    public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
-    {
-        _browsedAcks.add(deliveryTag);
-        addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
     }
 
     private void checkSuspension()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Tue Mar 13 03:35:42 2007
@@ -37,6 +37,10 @@
     {
         super(message);
         _amqMessage = payload;
+        // Increment the reference as this message is in the routing phase
+        // and so will have the ref decremented as routing fails.
+        // we need to keep this message around so we can return it in the
+        // handler. So increment here. 
         payload.incrementReference();
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Tue Mar 13 03:35:42 2007
@@ -101,6 +101,8 @@
         for (UnacknowledgedMessage msg : _unacked)
         {
             msg.restoreTransientMessageData();
+
+            //Message has been ack so discard it. This will dequeue and decrement the reference.
             msg.discard(storeContext);
         }
     }
@@ -124,7 +126,7 @@
         _map.remove(_unacked);
         for (UnacknowledgedMessage msg : _unacked)
         {
-            msg.clearTransientMessageData();        
+            msg.clearTransientMessageData();
         }
 
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Tue Mar 13 03:35:42 2007
@@ -39,7 +39,6 @@
         this.message = message;
         this.consumerTag = consumerTag;
         this.deliveryTag = deliveryTag;
-        message.incrementReference();
     }
 
     public String toString()
@@ -63,6 +62,7 @@
         {
             message.dequeue(storeContext, queue);
         }
+        //if the queue is null then the message is waiting to be acked, but has been removed.
         message.decrementReference(storeContext);
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Tue Mar 13 03:35:42 2007
@@ -29,9 +29,12 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
 
 public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
 {
+    private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class);
+
     private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler();
 
     public static BasicCancelMethodHandler getInstance()
@@ -53,6 +56,12 @@
         if (channel == null)
         {
             throw body.getChannelNotFoundException(evt.getChannelId());
+        }
+
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("BasicCancel: for:" + body.consumerTag +
+                       " nowait:" + body.nowait);
         }
 
         channel.unsubscribeConsumer(protocolSession, body.consumerTag);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Mar 13 03:35:42 2007
@@ -25,6 +25,8 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
@@ -67,12 +69,22 @@
         }
         else
         {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("BasicConsume: from '" + body.queue +
+                           "' for:" + body.consumerTag +
+                           " nowait:" + body.nowait +
+                           " args:" + body.arguments);
+            }
 
             AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
 
             if (queue == null)
             {
-                _log.info("No queue for '" + body.queue + "'");
+                if (_log.isTraceEnabled())
+                {
+                    _log.trace("No queue for '" + body.queue + "'");
+                }
                 if (body.queue != null)
                 {
                     String msg = "No such queue, '" + body.queue + "'";
@@ -105,14 +117,34 @@
                 }
                 catch (org.apache.qpid.AMQInvalidArgumentException ise)
                 {
-                    _log.info("Closing connection due to invalid selector");
-                    throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+                    _log.debug("Closing connection due to invalid selector");
+                    // Why doesn't this ChannelException work.
+//                    throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+                                                                       (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                       BasicConsumeBody.getClazz((byte) 8, (byte) 0),    // classId
+                                                                       BasicConsumeBody.getMethod((byte) 8, (byte) 0),    // methodId
+                                                                       AMQConstant.INVALID_ARGUMENT.getCode(),    // replyCode
+                                                                       new AMQShortString(ise.getMessage())));        // replyText
                 }
                 catch (ConsumerTagNotUniqueException e)
                 {
                     AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Non-unique consumer tag, '" + body.consumerTag + "'");
+                    // If the above doesn't work then perhaps this is wrong too.
+//                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+//                                                      "Non-unique consumer tag, '" + body.consumerTag + "'");
+                                        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
+                        BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
+                        AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                        msg));	// replyText
                 }
                 catch (AMQQueue.ExistingExclusiveSubscription e)
                 {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Mar 13 03:35:42 2007
@@ -52,13 +52,13 @@
 
         int channelId = evt.getChannelId();
 
-        if (_logger.isTraceEnabled())
-        {
-            _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
-                          ": Requeue:" + evt.getMethod().requeue +
-//                              ": Resend:" + evt.getMethod().resend +                          
-                          " on channel:" + channelId);
-        }
+//        if (_logger.isDebugEnabled())
+//        {
+//            _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
+//                          ": Requeue:" + evt.getMethod().requeue +
+////                              ": Resend:" + evt.getMethod().resend +
+//                          " on channel:" + channelId);
+//        }
 
         AMQChannel channel = session.getChannel(channelId);
 
@@ -67,9 +67,9 @@
             throw evt.getMethod().getChannelNotFoundException(channelId);
         }
 
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+            _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
                           ": Requeue:" + evt.getMethod().requeue +
 //                              ": Resend:" + evt.getMethod().resend +
                           " on channel:" + channel.debugIdentity());

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Tue Mar 13 03:35:42 2007
@@ -51,8 +51,11 @@
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         ChannelCloseBody body = evt.getMethod();
-        _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
-                     " and method " + body.methodId);
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
+                         " and method " + body.methodId);
+        }
         int channelId = evt.getChannelId();
 
         AMQChannel channel = session.getChannel(channelId);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Tue Mar 13 03:35:42 2007
@@ -30,7 +30,7 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
-public class ConnectionCloseMethodHandler implements  StateAwareMethodListener<ConnectionCloseBody>
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
 {
     private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
 
@@ -49,8 +49,11 @@
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
         final ConnectionCloseBody body = evt.getMethod();
-        _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
-                     body.replyText +  " for " + session);
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
+                         body.replyText + " for " + session);
+        }
         try
         {
             session.closeSession();
@@ -62,7 +65,7 @@
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
         session.writeFrame(response);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Tue Mar 13 03:35:42 2007
@@ -176,6 +176,8 @@
         }
         else
         {
+            _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
+            
             // Be aware of possible changes to parameter order as versions change.
             protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
             	session.getProtocolMajorVersion(),
@@ -185,7 +187,6 @@
                 200,	// replyCode
                 new AMQShortString(throwable.getMessage())	// replyText
                 ));
-            _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
             protocolSession.close();
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Mar 13 03:35:42 2007
@@ -45,9 +45,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
 public class AMQMessage
 {
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -92,9 +90,10 @@
         return _taken.get();
     }
 
+    private final int hashcode = System.identityHashCode(this);
     public String debugIdentity()
     {
-        return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")";
+        return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
     }
 
     /**
@@ -206,7 +205,7 @@
         _taken = new AtomicBoolean(false);
         if (_log.isDebugEnabled())
         {
-            _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId);
+            _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
         }
     }
 
@@ -363,7 +362,7 @@
         if (_log.isDebugEnabled())
         {
 
-            _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + "   " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+            _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
 
         }
     }
@@ -374,6 +373,7 @@
      *
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
+     * @param storeContext
      */
     public void decrementReference(StoreContext storeContext) throws MessageCleanupException
     {
@@ -387,9 +387,7 @@
             {
                 if (_log.isDebugEnabled())
                 {
-                    _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
-
-
+                    _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
                 }
 
                 // must check if the handle is null since there may be cases where we decide to throw away a message
@@ -410,7 +408,7 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+                _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
                 if (_referenceCount.get() < 0)
                 {
                     Thread.dumpStack();
@@ -418,7 +416,7 @@
             }
             if (_referenceCount.get() < 0)
             {
-                throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
+                throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
             }
         }
     }
@@ -459,7 +457,10 @@
 
     public void release()
     {
-        _log.trace("Releasing Message:" + debugIdentity());
+        if (_log.isTraceEnabled())
+        {
+            _log.trace("Releasing Message:" + debugIdentity());
+        }
         _taken.set(false);
         _takenBySubcription = null;
     }
@@ -572,7 +573,7 @@
         List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
         if (_log.isDebugEnabled())
         {
-            _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
+            _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
         }
         try
         {
@@ -589,6 +590,8 @@
 
             for (AMQQueue q : destinationQueues)
             {
+                //Increment the references to this message for each queue delivery.
+                incrementReference();                
                 //normal deliver so add this message at the end.
                 _txnContext.deliver(this, q, false);
             }
@@ -596,6 +599,7 @@
         finally
         {
             destinationQueues.clear();
+            // Remove refence for routing process . Reference count should now == delivered queue count
             decrementReference(storeContext);
         }
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Mar 13 03:35:42 2007
@@ -516,7 +516,7 @@
         {
             if (_logger.isInfoEnabled())
             {
-                _logger.warn("Auto-deleteing queue:" + this);
+                _logger.info("Auto-deleteing queue:" + this);
             }
             autodelete();
             // we need to manually fire the event to the removed subscription (which was the last one left for this
@@ -624,7 +624,6 @@
         try
         {
             msg.dequeue(storeContext, this);
-            msg.decrementReference(storeContext);
         }
         catch (MessageCleanupException e)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Mar 13 03:35:42 2007
@@ -383,6 +383,9 @@
         return count;
     }
 
+    /**
+        This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. 
+     */
     private AMQMessage getNextMessage() throws AMQException
     {
         return getNextMessage(_messages, null);
@@ -392,13 +395,14 @@
     {
         AMQMessage message = messages.peek();
 
-        while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub)))
+        //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
+        while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
         {
             //remove the already taken message
             AMQMessage removed = messages.poll();
 
             assert removed == message;
-            
+
             _totalMessageSize.addAndGet(-message.getSize());
 
             if (_log.isTraceEnabled())
@@ -494,7 +498,7 @@
 
                 _extraMessages.decrementAndGet();
             }
-            else if (messageQueue == sub.getPreDeliveryQueue())
+            else if (messageQueue == sub.getPreDeliveryQueue() && !sub.isBrowser())
             {
                 if (_log.isInfoEnabled())
                 {
@@ -695,7 +699,7 @@
                 {
                     if (_log.isDebugEnabled())
                     {
-                        _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() +
+                        _log.debug(debugIdentity() + " Message(" + msg.toString() +
                                    ") has been taken so disregarding deliver request to Subscriber:" +
                                    System.identityHashCode(s));
                     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Mar 13 03:35:42 2007
@@ -256,10 +256,10 @@
 
             // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
             // received the message. If it is lost in transit that is not important.
-            if (_acks)
-            {
-                channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
-            }
+//            if (_acks)
+//            {
+//                channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+//            }
 
             if (_sendLock.get())
             {
@@ -273,41 +273,49 @@
     private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
             throws AMQException
     {
-        // if we do not need to wait for client acknowledgements
-        // we can decrement the reference count immediately.
-
-        // By doing this _before_ the send we ensure that it
-        // doesn't get sent if it can't be dequeued, preventing
-        // duplicate delivery on recovery.
-
-        // The send may of course still fail, in which case, as
-        // the message is unacked, it will be lost.
-        if (!_acks)
-        {
-            if (_logger.isDebugEnabled())
+        try
+        { // if we do not need to wait for client acknowledgements
+            // we can decrement the reference count immediately.
+
+            // By doing this _before_ the send we ensure that it
+            // doesn't get sent if it can't be dequeued, preventing
+            // duplicate delivery on recovery.
+
+            // The send may of course still fail, in which case, as
+            // the message is unacked, it will be lost.
+            if (!_acks)
             {
-                _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                }
+                queue.dequeue(storeContext, msg);
             }
-            queue.dequeue(storeContext, msg);
-        }
-        synchronized (channel)
-        {
-            long deliveryTag = channel.getNextDeliveryTag();
 
-            if (_sendLock.get())
+            synchronized (channel)
             {
-                _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
-            }
+                long deliveryTag = channel.getNextDeliveryTag();
 
-            if (_acks)
-            {
-                channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
-                msg.decrementReference(storeContext);
-            }
+                if (_sendLock.get())
+                {
+                    _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+                }
 
-            protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+                if (_acks)
+                {
+                    channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+                }
+
+                protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+
+            }
+        }
+        finally
+        {
             //Only set delivered if it actually was writen successfully..
-            // using a try->finally would set it even if an error occured. 
+            // using a try->finally would set it even if an error occured.
+            // Is this what we want? 
+
             msg.setDeliveredToConsumer();
         }
     }
@@ -461,14 +469,25 @@
 
     public void close()
     {
+        boolean closed = false;
         synchronized (_sendLock)
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Setting SendLock true");
+                _logger.debug("Setting SendLock true:" + debugIdentity());
+            }
+
+            closed = _sendLock.getAndSet(true);
+        }
+
+        if (closed)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Called close() on a closed subscription");
             }
 
-            _sendLock.set(true);
+            return;
         }
 
         if (_logger.isInfoEnabled())
@@ -488,16 +507,36 @@
         //remove references in PDQ
         if (_messages != null)
         {
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Clearing PDQ (" + debugIdentity() + "):" + this);
+            }
+
             _messages.clear();
         }
+    }
+
+    private void autoclose()
+    {
+        close();
 
         if (_autoClose && !_sentClose)
         {
-            _logger.info("Closing autoclose subscription:" + this);
+            _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
+
             ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
             converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
-
             _sentClose = true;
+
+            //fixme JIRA do this better
+            try
+            {
+                channel.unsubscribeConsumer(protocolSession, consumerTag);
+            }
+            catch (AMQException e)
+            {
+                // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+            }
         }
     }
 
@@ -590,7 +629,7 @@
             {
                 if (_messages.isEmpty())
                 {
-                    close();
+                    autoclose();
                     return null;
                 }
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Tue Mar 13 03:35:42 2007
@@ -100,7 +100,7 @@
         // be added for every queue onto which the message is
         // enqueued. Finally a cleanup op will be added to decrement
         // the reference associated with the routing.
-        message.incrementReference();
+//        message.incrementReference();
         _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
         _messageDelivered = true;
         /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Mar 13 03:35:42 2007
@@ -93,7 +93,6 @@
     {
         try
         {
-            message.incrementReference();
             queue.process(_storeContext, message, deliverFirst);
             //following check implements the functionality
             //required by the 'immediate' flag:
@@ -128,6 +127,8 @@
                             {
                                 _log.debug("Discarding message: " + message.message.getMessageId());
                             }
+
+                            //Message has been ack so discard it. This will dequeue and decrement the reference.
                             message.discard(_storeContext);
                         }
                         else
@@ -160,6 +161,8 @@
                         {
                             _log.debug("Discarding message: " + msg.message.getMessageId());
                         }
+
+                        //Message has been ack so discard it. This will dequeue and decrement the reference.
                         msg.discard(_storeContext);
                     }
                     else
@@ -181,7 +184,22 @@
                 throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
                                        _channel.getChannelId());
             }
-            msg.discard(_storeContext);
+
+            if (!_browsedAcks.contains(deliveryTag))
+            {
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug("Discarding message: " + msg.message.getMessageId());
+                }
+
+                //Message has been ack so discard it. This will dequeue and decrement the reference.
+                msg.discard(_storeContext);
+            }
+            else
+            {
+                _browsedAcks.remove(deliveryTag);
+            }
+
             if (_log.isDebugEnabled())
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java Tue Mar 13 03:35:42 2007
@@ -27,10 +27,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.store.StoreContext;
 
-/**
- * Holds a list of TxnOp instance representing transactional
- * operations.
- */
+/** Holds a list of TxnOp instance representing transactional operations. */
 public class TxnBuffer
 {
     private final List<TxnOp> _ops = new ArrayList<TxnOp>();
@@ -42,6 +39,11 @@
 
     public void commit(StoreContext context) throws AMQException
     {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray());
+        }
+
         if (prepare(context))
         {
             for (TxnOp op : _ops)
@@ -64,7 +66,7 @@
             catch (Exception e)
             {
                 //compensate previously prepared ops
-                for(int j = 0; j < i; j++)
+                for (int j = 0; j < i; j++)
                 {
                     _ops.get(j).undoPrepare();
                 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Tue Mar 13 03:35:42 2007
@@ -49,6 +49,7 @@
         _session = session;
         _queue = queue;
         _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector;
+        // Create Consumer to verify message selector.
         BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
         consumer.close();        
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Mar 13 03:35:42 2007
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Arrays;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -294,8 +295,8 @@
 
                 if (_dispatcherLogger.isDebugEnabled())
                 {
-                    _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") +
-                                            ": Currently " + (currently ? "Started" : "Stopped"));
+                    _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") +
+                                            ": Currently " + (currently ? "Stopped" : "Started"));
                 }
             }
             return currently;
@@ -307,22 +308,31 @@
             {
                 final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
 
-                if (consumer == null)
+                if (consumer == null || consumer.isClosed())
                 {
                     if (_dispatcherLogger.isInfoEnabled())
                     {
-                        _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
-                                               "[" + message.getDeliverBody().deliveryTag + "] from queue "
-                                               + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)...");
+                        if (consumer == null)
+                        {
+                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+                                                   "[" + message.getDeliverBody().deliveryTag + "] from queue " +
+                                                   message.getDeliverBody().consumerTag +
+                                                   " )without a handler - rejecting(requeue)...");
+                        }
+                        else
+                        {
+                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+                                                   "[" + message.getDeliverBody().deliveryTag + "] from queue " +
+                                                   " consumer(" + consumer.debugIdentity() +
+                                                   ") is closed rejecting(requeue)...");
+                        }
                     }
 
                     rejectMessage(message, true);
                 }
                 else
                 {
-
                     consumer.notifyMessage(message, _channelId);
-
                 }
             }
         }
@@ -354,7 +364,18 @@
 
                 for (BasicMessageConsumer consumer : _consumers.values())
                 {
-                    consumer.rollback();
+                    if (!consumer.isNoConsume())
+                    {
+                        consumer.rollback();
+                    }
+                    else
+                    {
+                        // should perhaps clear the _SQ here.
+                        //consumer._synchronousQueue.clear();
+                        consumer.clearReceiveQueue();
+                    }
+
+
                 }
 
                 setConnectionStopped(isStopped);
@@ -379,8 +400,8 @@
                 // Reject messages on pre-dispatch queue
                 rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
 
-                // Remove consumer from map.
-                deregisterConsumer(consumer);
+                // closeConsumer
+                consumer.markClosed();
 
                 _dispatcher.setConnectionStopped(stopped);
 
@@ -624,6 +645,11 @@
 
     public void close(long timeout) throws JMSException
     {
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+        }
+
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
         synchronized (_connection.getFailoverMutex())
@@ -2063,26 +2089,39 @@
         // Remove the consumer from the map
         BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
         if (consumer != null)
-        {
-            if (consumer.isAutoClose())
+        {            
+//            fixme this isn't right.. needs to check if _queue contains data for this consumer
+            if (consumer.isAutoClose())// && _queue.isEmpty())
             {
                 consumer.closeWhenNoMessages(true);
             }
 
-            //Clean the Maps up first
-            //Flush any pending messages for this consumerTag
-            if (_dispatcher != null)
+            if (!consumer.isNoConsume())
             {
-                _logger.info("Dispatcher is not null");
+                //Clean the Maps up first
+                //Flush any pending messages for this consumerTag
+                if (_dispatcher != null)
+                {
+                    _logger.info("Dispatcher is not null");
+                }
+                else
+                {
+                    _logger.info("Dispatcher is null so created stopped dispatcher");
+
+                    startDistpatcherIfNecessary(true);
+                }
+
+                _dispatcher.rejectPending(consumer);
             }
             else
             {
-                _logger.info("Dispatcher is null so created stopped dispatcher");
+                //Just close the consumer
+                //fixme  the CancelOK is being processed before the arriving messages..
+                // The dispatcher is still to process them so the server sent in order but the client
+                // has yet to receive before the close comes in.
 
-                startDistpatcherIfNecessary(true);
+//                consumer.markClosed();
             }
-
-            _dispatcher.rejectPending(consumer);
         }
         else
         {
@@ -2217,7 +2256,20 @@
     private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
     {
         Iterator messages = _queue.iterator();
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag +
+                         ") (PDispatchQ) requeue:" + requeue);
 
+            if (messages.hasNext())
+            {
+                _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
+            }
+            else
+            {
+                _logger.info("No messages in _queue to reject");
+            }
+        }
         while (messages.hasNext())
         {
             UnprocessedMessage message = (UnprocessedMessage) messages.next();
@@ -2239,10 +2291,6 @@
                     _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
                 }
             }
-            else
-            {
-                _logger.error("Pruned pending message for consumer:" + consumerTag);
-            }
         }
     }
 
@@ -2250,9 +2298,9 @@
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
 
-        if (_logger.isDebugEnabled())
+        if (_logger.isTraceEnabled())
         {
-            _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+            _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
         }
 
         rejectMessage(message.getDeliverBody().deliveryTag, requeue);
@@ -2260,9 +2308,9 @@
 
     public void rejectMessage(AbstractJMSMessage message, boolean requeue)
     {
-        if (_logger.isDebugEnabled())
+        if (_logger.isTraceEnabled())
         {
-            _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
+            _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
         }
         rejectMessage(message.getDeliveryTag(), requeue);
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Mar 13 03:35:42 2007
@@ -22,6 +22,7 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Arrays;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -118,7 +119,7 @@
 
     private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
-    /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */    
+    /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
     private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
     /**
@@ -135,6 +136,7 @@
     private boolean _closeWhenNoMessages;
 
     private boolean _noConsume;
+    private List<StackTraceElement> _closedStack = null;
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
                                    boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
@@ -157,6 +159,12 @@
         _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
         _autoClose = autoClose;
         _noConsume = noConsume;
+
+        //Force queue browsers not to use acknowledge modes.
+        if (_noConsume)
+        {
+            _acknowledgeMode = Session.NO_ACKNOWLEDGE;
+        }
     }
 
     public AMQDestination getDestination()
@@ -433,6 +441,8 @@
 
     public void close(boolean sendClose) throws JMSException
     {
+        //synchronized (_closed)
+
         if (_logger.isInfoEnabled())
         {
             _logger.info("Closing consumer:" + debugIdentity());
@@ -442,6 +452,18 @@
         {
             if (!_closed.getAndSet(true))
             {
+                if (_logger.isTraceEnabled())
+                {
+                    if (_closedStack != null)
+                    {
+                        _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                        _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+                    }
+                    else
+                    {
+                        _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
+                    }
+                }
                 if (sendClose)
                 {
                     // TODO: Be aware of possible changes to parameter order as versions change.
@@ -467,9 +489,15 @@
                         throw new JMSException("Error closing consumer: " + e);
                     }
                 }
+                else
+                {
+//                    //fixme this probably is not right
+//                    if (!isNoConsume())
+                    {   //done in BasicCancelOK Handler but not sending one so just deregister.
+                        deregisterConsumer();
+                    }
+                }
 
-                //done in BasicCancelOK Handler
-                //deregisterConsumer();
                 if (_messageListener != null && _receiving.get())
                 {
                     if (_logger.isInfoEnabled())
@@ -488,7 +516,23 @@
      */
     void markClosed()
     {
-        _closed.set(true);
+//        synchronized (_closed)
+        {
+            _closed.set(true);
+
+            if (_logger.isTraceEnabled())
+            {
+                if (_closedStack != null)
+                {
+                    _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                    _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+                }
+                else
+                {
+                    _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+                }
+            }
+        }
         deregisterConsumer();
     }
 
@@ -520,11 +564,24 @@
             {
                 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
             }
-            jmsMessage.setConsumer(this);
+//            synchronized (_closed)
+            {
+//                if (!_closed.get())
+                {
+
+                    jmsMessage.setConsumer(this);
 
-            preDeliver(jmsMessage);
+                    preDeliver(jmsMessage);
 
-            notifyMessage(jmsMessage, channelId);
+                    notifyMessage(jmsMessage, channelId);
+                }
+//                else
+//                {
+//                    _logger.error("MESSAGE REJECTING!");
+//                    _session.rejectMessage(jmsMessage, true);
+//                    //_logger.error("MESSAGE JUST DROPPED!");
+//                }
+            }
         }
         catch (Exception e)
         {
@@ -551,9 +608,16 @@
             {
                 //we do not need a lock around the test above, and the dispatch below as it is invalid
                 //for an application to alter an installed listener while the session is started
-                preApplicationProcessing(jmsMessage);
-                getMessageListener().onMessage(jmsMessage);
-                postDeliver(jmsMessage);
+//                synchronized (_closed)
+                {
+//                    if (!_closed.get())
+                    {
+
+                        preApplicationProcessing(jmsMessage);
+                        getMessageListener().onMessage(jmsMessage);
+                        postDeliver(jmsMessage);
+                    }
+                }
             }
             else
             {
@@ -649,14 +713,30 @@
                 lastDeliveryTag = _receivedDeliveryTags.poll();
             }
 
+            assert _receivedDeliveryTags.isEmpty();
+
             _session.acknowledgeMessage(lastDeliveryTag, true);
         }
     }
 
     void notifyError(Throwable cause)
     {
-        _closed.set(true);
-
+//        synchronized (_closed)
+        {
+            _closed.set(true);
+            if (_logger.isTraceEnabled())
+            {
+                if (_closedStack != null)
+                {
+                    _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                    _logger.trace(_consumerTag + " previously" + _closedStack.toString());
+                }
+                else
+                {
+                    _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+                }
+            }
+        }
         //QPID-293 can "request redelivery of this error through dispatcher"
 
         // we have no way of propagating the exception to a message listener - a JMS limitation - so we
@@ -761,14 +841,20 @@
     {
         clearUnackedMessages();
 
-        if (_logger.isDebugEnabled())
+        if (!_receivedDeliveryTags.isEmpty())
         {
-            _logger.debug("Rejecting received messages");
+            _logger.debug("Rejecting received messages in _receivedDTs (RQ)");
         }
 
         //rollback received but not committed messages
         while (!_receivedDeliveryTags.isEmpty())
         {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" +
+                              "for consumer with tag:" + _consumerTag);
+            }
+
             Long tag = _receivedDeliveryTags.poll();
 
             if (tag != null)
@@ -782,12 +868,20 @@
             }
         }
 
+        if (!_receivedDeliveryTags.isEmpty())
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection");
+            }
+        }
+
         //rollback pending messages
         if (_synchronousQueue.size() > 0)
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" +
+                _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" +
                               "for consumer with tag:" + _consumerTag);
             }
             Iterator iterator = _synchronousQueue.iterator();
@@ -821,7 +915,7 @@
                 rollback();
             }
 
-            _synchronousQueue.clear();
+            clearReceiveQueue();
         }
     }
 
@@ -831,4 +925,8 @@
         return String.valueOf(_consumerTag);
     }
 
+    public void clearReceiveQueue()
+    {
+        _synchronousQueue.clear();
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java Tue Mar 13 03:35:42 2007
@@ -25,20 +25,18 @@
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 
-/**
- * Provides support for orderly shutdown of an object.
- */
+/** Provides support for orderly shutdown of an object. */
 public abstract class Closeable
 {
     /**
-     * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing
-     * access to this flag would mean have a synchronized block in every method.
+     * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this
+     * flag would mean have a synchronized block in every method.
      */
     protected final AtomicBoolean _closed = new AtomicBoolean(false);
 
     protected void checkNotClosed() throws JMSException
     {
-        if (_closed.get())
+        if (isClosed())
         {
             throw new IllegalStateException("Object " + toString() + " has been closed");
         }
@@ -46,7 +44,10 @@
 
     public boolean isClosed()
     {
-        return _closed.get();
+//        synchronized (_closed)
+        {
+            return _closed.get();
+        }
     }
 
     public abstract void close() throws JMSException;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Tue Mar 13 03:35:42 2007
@@ -42,6 +42,6 @@
     {
         _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
 
-        //todo this should do the closure
+        //todo this should do the local closure
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Mar 13 03:35:42 2007
@@ -51,10 +51,8 @@
 import org.apache.qpid.protocol.AMQConstant;
 
 /**
- * Wrapper for protocol session that provides type-safe access to session attributes.
- * <p/>
- * The underlying protocol session is still available but clients should not
- * use it to obtain session attributes.
+ * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
+ * session is still available but clients should not use it to obtain session attributes.
  */
 public class AMQProtocolSession implements AMQVersionAwareProtocolSession
 {
@@ -78,27 +76,23 @@
     protected WriteFuture _lastWriteFuture;
 
     /**
-     * The handler from which this session was created and which is used to handle protocol events.
-     * We send failover events to the handler.
+     * The handler from which this session was created and which is used to handle protocol events. We send failover
+     * events to the handler.
      */
     protected final AMQProtocolHandler _protocolHandler;
 
-    /**
-     * Maps from the channel id to the AMQSession that it represents.
-     */
+    /** Maps from the channel id to the AMQSession that it represents. */
     protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
 
     protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
 
     /**
-     * Maps from a channel id to an unprocessed message. This is used to tie together the
-     * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+     * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
+     * first) with the subsequent content header and content bodies.
      */
     protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
 
-    /**
-     * Counter to ensure unique queue names
-     */
+    /** Counter to ensure unique queue names */
     protected int _queueId = 1;
     protected final Object _queueIdLock = new Object();
 
@@ -108,8 +102,8 @@
 
 
     /**
-     * No-arg constructor for use by test subclass - has to initialise final vars
-     * NOT intended for use other then for test
+     * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
+     * test
      */
     public AMQProtocolSession()
     {
@@ -147,7 +141,7 @@
     {
         // start the process of setting up the connection. This is the first place that
         // data is written to the server.
-        
+
         _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
     }
 
@@ -207,8 +201,7 @@
     /**
      * Store the SASL client currently being used for the authentication handshake
      *
-     * @param client if non-null, stores this in the session. if null clears any existing client
-     *               being stored
+     * @param client if non-null, stores this in the session. if null clears any existing client being stored
      */
     public void setSaslClient(SaslClient client)
     {
@@ -237,10 +230,11 @@
     }
 
     /**
-     * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
-     * This is invoked on the MINA dispatcher thread.
+     * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
+     * dispatcher thread.
      *
      * @param message
+     *
      * @throws AMQException if this was not expected
      */
     public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
@@ -295,8 +289,7 @@
     }
 
     /**
-     * Deliver a message to the appropriate session, removing the unprocessed message
-     * from our map
+     * Deliver a message to the appropriate session, removing the unprocessed message from our map
      *
      * @param channelId the channel id the message should be delivered to
      * @param msg       the message
@@ -309,8 +302,8 @@
     }
 
     /**
-     * Convenience method that writes a frame to the protocol session. Equivalent
-     * to calling getProtocolSession().write().
+     * Convenience method that writes a frame to the protocol session. Equivalent to calling
+     * getProtocolSession().write().
      *
      * @param frame the frame to write
      */
@@ -377,15 +370,14 @@
     }
 
     /**
-     * Called from the ChannelClose handler when a channel close frame is received.
-     * This method decides whether this is a response or an initiation. The latter
-     * case causes the AMQSession to be closed and an exception to be thrown if
+     * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
+     * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
      * appropriate.
      *
      * @param channelId the id of the channel (session)
-     * @return true if the client must respond to the server, i.e. if the server
-     *         initiated the channel close, false if the channel close is just the server
-     *         responding to the client's earlier request to close the channel.
+     *
+     * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
+     *         the channel close is just the server responding to the client's earlier request to close the channel.
      */
     public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
     {
@@ -450,9 +442,7 @@
         return new AMQShortString("tmp_" + localAddress + "_" + id);
     }
 
-    /**
-     * @param delay delay in seconds (not ms)
-     */
+    /** @param delay delay in seconds (not ms) */
     void initHeartbeats(int delay)
     {
         if (delay > 0)
@@ -475,7 +465,7 @@
     {
         _protocolMajorVersion = versionMajor;
         _protocolMinorVersion = versionMinor;
-        _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);        
+        _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
     }
 
     public byte getProtocolMinorVersion()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Mar 13 03:35:42 2007
@@ -38,12 +38,10 @@
 import org.apache.qpid.pool.ReadWriteThreadModel;
 
 /**
- * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up
- * the underlying connector, which currently always uses TCP/IP sockets. It creates the
- * "protocol handler" which deals with MINA protocol events.
- * <p/>
- * Could be extended in future to support different transport types by turning this into concrete class/interface
- * combo.
+ * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
+ * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
+ * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete
+ * class/interface combo.
  */
 public class TransportConnection
 {
@@ -61,22 +59,6 @@
 
     private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
 
-    static
-    {
-        _acceptor = new VmPipeAcceptor();
-
-        IoServiceConfig config = _acceptor.getDefaultConfig();
-
-        config.setThreadModel(ReadWriteThreadModel.getInstance());
-    }
-
-    public static ITransportConnection getInstance() throws AMQTransportConnectionException
-    {
-        AMQBrokerDetails details = new AMQBrokerDetails();
-        details.setTransport(BrokerDetails.TCP);
-        return getInstance(details);
-    }
-
     public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
     {
         int transport = getTransport(details.getTransport());
@@ -182,7 +164,14 @@
 
     public static void createVMBroker(int port) throws AMQVMBrokerCreationException
     {
+        if (_acceptor == null)
+        {
+            _acceptor = new VmPipeAcceptor();
 
+            IoServiceConfig config = _acceptor.getDefaultConfig();
+
+            config.setThreadModel(ReadWriteThreadModel.getInstance());
+        }
 
         if (!_inVmPipeAddress.containsKey(port))
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Tue Mar 13 03:35:42 2007
@@ -44,6 +44,10 @@
     /** We require a separate count so we can track whether we have reached the threshold */
     private int _count;
 
+    public boolean isEmpty()
+    {
+        return _queue.isEmpty();
+    }
 
     public interface ThresholdListener
     {