You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/28 12:42:11 UTC

svn commit: r580293 [1/2] - in /incubator/qpid/branches/M2: ./ java/ java/broker/ java/broker/etc/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/queue/ java/broker/src/test/java/org/apache/qpid/serve...

Author: ritchiem
Date: Fri Sep 28 03:41:49 2007
New Revision: 580293

URL: http://svn.apache.org/viewvc?rev=580293&view=rev
Log:
Merged revisions 574555-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-580022 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r574555 | ritchiem | 2007-09-11 12:39:10 +0100 (Tue, 11 Sep 2007) | 1 line
  
  QPID-590 : Provide test case and resolution to prevent deadlock occurring on the client when two threads work on the AMQSession object.
........
  r574585 | rgreig | 2007-09-11 14:02:19 +0100 (Tue, 11 Sep 2007) | 1 line
  
  QPID-591 Fixed to use dirname and avoid working directory issues.
........
  r579115 | ritchiem | 2007-09-25 09:15:04 +0100 (Tue, 25 Sep 2007) | 1 line
  
  QPID-604 Commited patch provided by Aidan Skinner, to prevent NPE in FieldTable.
........
  r579147 | ritchiem | 2007-09-25 10:27:22 +0100 (Tue, 25 Sep 2007) | 2 lines
  
  QPID-610 : Fix for Returned Messages leak. Augmented TestableMemoryMessageStore to allow it to query the MemoryMessageStore in use by the broker. 
  New MessageReturnTest to verify no leak after messages have been returned.
........
  r579198 | ritchiem | 2007-09-25 12:13:23 +0100 (Tue, 25 Sep 2007) | 1 line
  
  Update to start M2.1 python test broker on port 2100 (management 2101) and for the tests to use that broker.

MODIFIED: to start on port 2000 and 2001 as this is M2

........
  r579229 | ritchiem | 2007-09-25 13:51:09 +0100 (Tue, 25 Sep 2007) | 3 lines
  
  QPID-610 : Fix for Get NO_ACK leak. The Java Client doesn't use get so augmented the python test_get to send persistent messages and used debugger to verify that messages were correctly removed. Verified that prior to this commit they would remain in the store. We need a management exchange to fully validate this with a python tests.
  NOTE: The setting of "delivery mode" property on M2.1 is not the same as on trunk where _ is use such as "delivery_mode".
  There is also no error that you have sent an incorrect property.
........
  r579574 | ritchiem | 2007-09-26 11:42:54 +0100 (Wed, 26 Sep 2007) | 4 lines
  
  QPID-610 : Fix for ManagementConcole NO_ACK & Msg Expire leaks. Updated AMQQueueMBeanTest to verify the contents of the MessageStore after clearing. 
  All the MC features only dequeued the message and didn't correctly decrementReference.
  The MessageReturnTest was failing due to the TimeToLive test causing messages to be left on the store. The expired messages were not having the reference decremented.
........
  r579577 | ritchiem | 2007-09-26 11:45:21 +0100 (Wed, 26 Sep 2007) | 5 lines
  
  Updated TransportConnection to synchronize around the creation/destruction of VM Brokers. I had observed a ConcurrentModificationException in the KillAllVMBrokers().
  
  This isn't good this suggests that the tests are overlapping. This fix won't address that problem but will stop any CModifications occuring. If there is test setup/teardown overlapping we should now see tests failing because the VM broker isn't there.
  
  Potentially addresses VM issues in QPID-596
........
  r579578 | ritchiem | 2007-09-26 11:48:14 +0100 (Wed, 26 Sep 2007) | 1 line
  
  Updated the version of slf4j-simple to be one that would work. Changing the systests/pom.xml to depend on this rather than the slf4j-log4j will cause maven to provide more details in the tests. All info and above is logged.
........
  r579602 | rupertlssmith | 2007-09-26 12:27:45 +0100 (Wed, 26 Sep 2007) | 1 line
  
  Added timeout to perftests, to fail tests if message loss causes test to jam.
........
  r579614 | rupertlssmith | 2007-09-26 12:51:14 +0100 (Wed, 26 Sep 2007) | 1 line
  
  Added timeout to perftests, wait limit set to higher value to stop threads thashing.
........
  r579709 | ritchiem | 2007-09-26 17:40:09 +0100 (Wed, 26 Sep 2007) | 1 line
  
  Update for three tests that don't remove their VMBroker
........
  r580022 | ritchiem | 2007-09-27 15:27:22 +0100 (Thu, 27 Sep 2007) | 18 lines
  
  QPID-596 : ConnectionStartTest was broken. I've fixed it but here is the problem for those like me that like to know why:
  
  Previously:
  The setUp method created a producer connection and then sent a message
  - This will result in that message being bounced as there is no consumer.
  
  The first test should fail but the test was wrong, which caused it to pass.
  There was an assert that was expecting the receive a message yet the test was recieve() == null !!!!
  
  The second test worked because the broker was not killed between tests
  
  This left the queue created so on the second run the message was delivered causing the test to succeed.
  
  Now:
  Fixed the InVM broker setup/teardown so the client is created first and the broker removed at the end of the test.
  Also updated the asserts to be more explicit rather than having the == null or !=null put that as assertNull/NotNull.
........

Added:
    incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
      - copied unchanged from r580022, incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
      - copied unchanged from r574585, incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
      - copied unchanged from r580022, incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
Modified:
    incubator/qpid/branches/M2/   (props changed)
    incubator/qpid/branches/M2/java/broker/etc/log4j.xml
    incubator/qpid/branches/M2/java/broker/pom.xml
    incubator/qpid/branches/M2/java/broker/python-test.xml
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
    incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/branches/M2/java/pom.xml
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    incubator/qpid/branches/M2/python/tests/basic.py

Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Sep 28 03:41:49 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:400000-567005,568919,568924,573502,573516,573740-574504,574874,574902,575738,575788,575811,577941,578058,578845
+/incubator/qpid/branches/M2.1:400000-567005,568919,568924,573502,573516,573740-574504,574555-577772,577774-578732,578734,578736-578744,578746-578827,578829-580022

Modified: incubator/qpid/branches/M2/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/etc/log4j.xml?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/M2/java/broker/etc/log4j.xml Fri Sep 28 03:41:49 2007
@@ -76,6 +76,7 @@
     <category name="Qpid.Broker">
         <priority value="debug"/>
         <appender-ref ref="AlertFile"/>
+        <appender-ref ref="STDOUT"/>
     </category>
 
     <category name="org.apache.qpid.server.queue.AMQQueueMBean">

Modified: incubator/qpid/branches/M2/java/broker/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/pom.xml?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/pom.xml (original)
+++ incubator/qpid/branches/M2/java/broker/pom.xml Fri Sep 28 03:41:49 2007
@@ -204,18 +204,12 @@
                             <configuration>
                                 <tasks>
 
-                                    <condition property="broker.dir" 
-                                               else="${user.dir}${file.separator}broker"
-                                               value="${user.dir}">
-                                        <contains string="${user.dir}" substring="broker" />
-                                    </condition>
-
                                     <condition property="skip-python-tests" value="true">
                                         <isset property="skip.python.tests"/>
                                     </condition>
 
                                     <property name="command" 
-                                              value="python run-tests -v -I java_failing.txt"/>
+                                              value="python run-tests -v -I java_failing.txt -b localhost:2000"/>
                                     <!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>-->
 
                                     <ant antfile="python-test.xml" inheritRefs="true">

Modified: incubator/qpid/branches/M2/java/broker/python-test.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/python-test.xml?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/python-test.xml (original)
+++ incubator/qpid/branches/M2/java/broker/python-test.xml Fri Sep 28 03:41:49 2007
@@ -1,50 +1,56 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<!-- ====================================================================== -->
-<!-- Ant build file (http://ant.apache.org/) for Ant 1.6.2 or above.        -->
-<!-- ====================================================================== -->
-
-<project basedir="." default="default">
-
-  <target name="default" >
-      <echo message="Used via maven to run python tests."/>
-  </target>
-
-  <property name="pythondir" value="../../python"/>
-
-  <target name="run-tests" unless="skip-python-tests">
-
-	<echo message="Starting Broker with command"/>
-
-	<java classname="org.apache.qpid.server.RunBrokerWithCommand"
-			fork="true"
-			dir="${pythondir}"
-			failonerror="true"
-			>
-		<arg value="${command}"/>
-		
-		<classpath refid="maven.test.classpath"/>
-		<sysproperty key="QPID_HOME" value="${broker.dir}"/>
-		<sysproperty key="QPID_WORK" value="${broker.dir}${file.separator}target"/>
-	</java>
-
-	</target>
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<!-- ====================================================================== -->
+<!-- Ant build file (http://ant.apache.org/) for Ant 1.6.2 or above.        -->
+<!-- ====================================================================== -->
+
+<project basedir="." default="default">
+
+  <target name="default" >
+      <echo message="Used via maven to run python tests."/>
+  </target>
+
+  <dirname property="broker.dir" file="${ant.file.python-test}"/>
+
+  <property name="pythondir" value="${broker.dir}/../../python"/>
+
+  <target name="run-tests" unless="skip-python-tests">
+
+	<echo message="Starting Broker with command"/>
+
+	<java classname="org.apache.qpid.server.RunBrokerWithCommand"
+			fork="true"
+			dir="${pythondir}"
+			failonerror="true"
+			>
+		<arg value="${command}"/>
+        <arg value="-p"/>
+        <arg value="2000"/>
+        <arg value="-m"/>
+        <arg value="2001"/>
+
+        <classpath refid="maven.test.classpath"/>
+		<sysproperty key="QPID_HOME" value="${broker.dir}"/>
+		<sysproperty key="QPID_WORK" value="${broker.dir}${file.separator}target"/>
+	</java>
+
+	</target>
+</project>

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Sep 28 03:41:49 2007
@@ -943,6 +943,8 @@
             AMQMessage message = bouncedMessage.getAMQMessage();
             session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
                 new AMQShortString(bouncedMessage.getMessage()));
+
+            message.decrementReference(_storeContext);
         }
 
         _returnMessages.clear();

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Sep 28 03:41:49 2007
@@ -581,7 +581,7 @@
     /** Removes the AMQMessage from the top of the queue. */
     public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
     {
-        _deliveryMgr.removeAMessageFromTop(storeContext);
+        _deliveryMgr.removeAMessageFromTop(storeContext, this);
     }
 
     /** removes all the messages from the queue. */

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Sep 28 03:41:49 2007
@@ -330,6 +330,11 @@
                                                                             deliveryTag, _queue.getMessageCount());
                     _totalMessageSize.addAndGet(-msg.getSize());
                 }
+
+                if (!acks)
+                {
+                    msg.decrementReference(channel.getStoreContext());
+                }
             }
             finally
             {
@@ -402,11 +407,16 @@
      *
      * @throws AMQException
      */
-    public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
+    public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException
     {
         _lock.lock();
 
         AMQMessage message = _messages.poll();
+
+        message.dequeue(storeContext, queue);
+
+        message.decrementReference(storeContext);
+
         if (message != null)
         {
             _totalMessageSize.addAndGet(-message.getSize());
@@ -429,6 +439,9 @@
                 _messages.poll();
 
                 _queue.dequeue(storeContext, msg);
+
+                msg.decrementReference(_reapingStoreContext);
+
                 msg = getNextMessage();
                 count++;
             }
@@ -473,6 +486,8 @@
 
                 // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
                 message.dequeue(_reapingStoreContext, _queue);
+
+                message.decrementReference(_reapingStoreContext);
 
                 if (_log.isInfoEnabled())
                 {

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Sep 28 03:41:49 2007
@@ -72,7 +72,7 @@
      */
     void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
 
-    void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
+    void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException;
 
     long clearAllMessages(StoreContext storeContext) throws AMQException;
 

Modified: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java Fri Sep 28 03:41:49 2007
@@ -33,11 +33,11 @@
     public static void main(String[] args)
     {
         //Start broker
-
         try
         {
+            String[] fudge = args.clone();
 
-            String[] fudge = new String[1];
+            // Override the first value which is the command we are going to run later.
             fudge[0] = "-v";
             new Main(fudge).startup();
         }

Modified: incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Sep 28 03:41:49 2007
@@ -24,10 +24,13 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -36,6 +39,8 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
 
 import javax.management.JMException;
 import java.util.LinkedList;
@@ -49,18 +54,16 @@
     private static long MESSAGE_SIZE = 1000;
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
-    private MessageStore _messageStore = new MemoryMessageStore();
+    private MessageStore _messageStore;
     private StoreContext _storeContext = new StoreContext();
-    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                                                     null,
-                                                                                     new LinkedList<RequiredDeliveryException>(),
-                                                                                     new HashSet<Long>());
+    private TransactionalContext _transactionalContext;
     private VirtualHost _virtualHost;
+    private AMQProtocolSession _protocolSession;
 
-    public void testMessageCount() throws Exception
+    public void testMessageCountTransient() throws Exception
     {
         int messageCount = 10;
-        sendMessages(messageCount);
+        sendMessages(messageCount, false);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
         long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
@@ -73,6 +76,43 @@
         _queueMBean.clearQueue();
         assertTrue(_queueMBean.getMessageCount() == 0);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        //Ensure that the data has been removed from the Store
+        verifyBrokerState();
+    }
+
+    public void testMessageCountPersistent() throws Exception
+    {
+        int messageCount = 10;
+        sendMessages(messageCount, true);
+        assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+        _queueMBean.deleteMessageFromTop();
+        assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        _queueMBean.clearQueue();
+        assertTrue(_queueMBean.getMessageCount() == 0);
+        assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+        //Ensure that the data has been removed from the Store
+        verifyBrokerState();
+    }
+
+    // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
+    private void verifyBrokerState()
+    {
+
+        TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+        // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
+        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       
+        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+        assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
+        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
     }
 
     public void testConsumerCount() throws AMQException
@@ -86,26 +126,26 @@
         AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
         protocolSession.addChannel(channel);
 
-        _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
+        _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
         assertTrue(_queueMBean.getActiveConsumerCount() == 1);
 
         SubscriptionSet _subscribers = (SubscriptionSet) mgr;
         SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
-        Subscription s1 =  subscriptionFactory.createSubscription(channel.getChannelId(),
-                                                                  protocolSession,
-                                                                  new AMQShortString("S1"),
-                                                                  false,
-                                                                  null,
-                                                                  true,
-                                                                  _queue);
-
-        Subscription s2 =  subscriptionFactory.createSubscription(channel.getChannelId(),
-                                                                  protocolSession,
-                                                                  new AMQShortString("S2"),
-                                                                  false,
-                                                                  null,
-                                                                  true,
-                                                                  _queue);
+        Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
+                                                                 protocolSession,
+                                                                 new AMQShortString("S1"),
+                                                                 false,
+                                                                 null,
+                                                                 true,
+                                                                 _queue);
+
+        Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
+                                                                 protocolSession,
+                                                                 new AMQShortString("S2"),
+                                                                 false,
+                                                                 null,
+                                                                 true,
+                                                                 _queue);
         _subscribers.addSubscriber(s1);
         _subscribers.addSubscriber(s2);
         assertTrue(_queueMBean.getActiveConsumerCount() == 3);
@@ -165,7 +205,7 @@
 
         }
 
-        AMQMessage msg = message(false);
+        AMQMessage msg = message(false, false);
         long id = msg.getMessageId();
         _queue.clearQueue(_storeContext);
 
@@ -184,7 +224,7 @@
         }
     }
 
-    private AMQMessage message(final boolean immediate) throws AMQException
+    private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
         {
@@ -209,9 +249,11 @@
                 return null;
             }
         };
-                              
+
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
+        contentHeaderBody.properties = new BasicContentHeaderProperties();
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
         return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
     }
 
@@ -221,22 +263,38 @@
         super.setUp();
         IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
         _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+        _messageStore = _virtualHost.getMessageStore();
+
+        _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+                                                            null,
+                                                            new LinkedList<RequiredDeliveryException>(),
+                                                            new HashSet<Long>());
+
         _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
         _queueMBean = new AMQQueueMBean(_queue);
+
+        _protocolSession = new TestMinaProtocolSession();
     }
 
-    private void sendMessages(int messageCount) throws AMQException
+    private void sendMessages(int messageCount, boolean persistent) throws AMQException
     {
-        AMQMessage[] messages = new AMQMessage[messageCount];
-        for (int i = 0; i < messages.length; i++)
-        {
-            messages[i] = message(false);
-            messages[i].enqueue(_queue);
-            messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
-        }
         for (int i = 0; i < messageCount; i++)
         {
-            _queue.process(_storeContext, messages[i], false);
+            AMQMessage currentMessage = message(false, persistent);
+            currentMessage.enqueue(_queue);
+
+            // route header
+            currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+
+            // Add the body so we have somthing to test later
+            currentMessage.addContentBodyFrame(_storeContext,
+                                               _protocolSession.getRegistry()
+                                                       .getProtocolVersionMethodConverter()
+                                                       .convertToContentChunk(
+                                                       new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+                                                                       MESSAGE_SIZE)));
+
+
         }
     }
 }

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Sep 28 03:41:49 2007
@@ -72,7 +72,6 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,7 +99,6 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -208,14 +206,14 @@
      * subscriptions between executions of the client.
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
-        new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+            new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
 
     /**
      * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
      * up in the {@link #_subscriptions} map.
      */
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
-        new ConcurrentHashMap<BasicMessageConsumer, String>();
+            new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
      * Used to hold incoming messages.
@@ -248,11 +246,11 @@
      * consumer.
      */
     private Map<AMQShortString, BasicMessageConsumer> _consumers =
-        new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+            new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
-        new ConcurrentHashMap<Destination, AtomicInteger>();
+            new ConcurrentHashMap<Destination, AtomicInteger>();
 
     /**
      * Used as a source of unique identifiers for producers within the session.
@@ -312,15 +310,15 @@
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
      */
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
-        MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
         _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
         _strictAMQPFATAL =
-            Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
+                Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
         _immediatePrefetch =
-            _strictAMQP
-            || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+                _strictAMQP
+                || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
 
         _connection = con;
         _transacted = transacted;
@@ -341,31 +339,31 @@
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue =
-                new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
-                    new FlowControllingBlockingQueue.ThresholdListener()
-                    {
-                        public void aboveThreshold(int currentValue)
-                        {
-                            if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                            {
-                                _logger.debug(
-                                    "Above threshold(" + _defaultPrefetchHighMark
-                                    + ") so suspending channel. Current value is " + currentValue);
-                                new Thread(new SuspenderRunner(true)).start();
-                            }
-                        }
-
-                        public void underThreshold(int currentValue)
-                        {
-                            if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                            {
-                                _logger.debug(
-                                    "Below threshold(" + _defaultPrefetchLowMark
-                                    + ") so unsuspending channel. Current value is " + currentValue);
-                                new Thread(new SuspenderRunner(false)).start();
-                            }
-                        }
-                    });
+                    new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+                                                     new FlowControllingBlockingQueue.ThresholdListener()
+                                                     {
+                                                         public void aboveThreshold(int currentValue)
+                                                         {
+                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                             {
+                                                                 _logger.debug(
+                                                                         "Above threshold(" + _defaultPrefetchHighMark
+                                                                         + ") so suspending channel. Current value is " + currentValue);
+                                                                 new Thread(new SuspenderRunner(true)).start();
+                                                             }
+                                                         }
+
+                                                         public void underThreshold(int currentValue)
+                                                         {
+                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                             {
+                                                                 _logger.debug(
+                                                                         "Below threshold(" + _defaultPrefetchLowMark
+                                                                         + ") so unsuspending channel. Current value is " + currentValue);
+                                                                 new Thread(new SuspenderRunner(false)).start();
+                                                             }
+                                                         }
+                                                     });
         }
         else
         {
@@ -384,10 +382,10 @@
      * @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
      */
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
-        int defaultPrefetchLow)
+               int defaultPrefetchLow)
     {
         this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
-            defaultPrefetchLow);
+             defaultPrefetchLow);
     }
 
     // ===== JMS Session methods.
@@ -442,8 +440,8 @@
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         final AMQFrame ackFrame =
-            BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                multiple);
+                BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+                                            multiple);
 
         if (_logger.isDebugEnabled())
         {
@@ -470,27 +468,27 @@
      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-        final AMQShortString exchangeName) throws AMQException
+                          final AMQShortString exchangeName) throws AMQException
     {
         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
             {
-                public Object execute() throws AMQException, FailoverException
-                {
-                    AMQFrame queueBind =
+                AMQFrame queueBind =
                         QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            arguments, // arguments
-                            exchangeName, // exchange
-                            false, // nowait
-                            queueName, // queue
-                            routingKey, // routingKey
-                            getTicket()); // ticket
+                                                     arguments, // arguments
+                                                     exchangeName, // exchange
+                                                     false, // nowait
+                                                     queueName, // queue
+                                                     routingKey, // routingKey
+                                                     getTicket()); // ticket
 
-                    getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+                getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
 
-                    return null;
-                }
-            }, _connection).execute();
+                return null;
+            }
+        }, _connection).execute();
     }
 
     /**
@@ -517,62 +515,58 @@
         if (_logger.isInfoEnabled())
         {
             _logger.info("Closing session: " + this + ":"
-                + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                         + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
         }
 
-
-
-        synchronized(_messageDeliveryLock)
-        {
-        // We must close down all producers and consumers in an orderly fashion. This is the only method
-        // that can be called from a different thread of control from the one controlling the session.
-        synchronized (_connection.getFailoverMutex())
+        synchronized (_messageDeliveryLock)
         {
-
-
-            // Ensure we only try and close an open session.
-            if (!_closed.getAndSet(true))
+            // We must close down all producers and consumers in an orderly fashion. This is the only method
+            // that can be called from a different thread of control from the one controlling the session.
+            synchronized (_connection.getFailoverMutex())
             {
-                // we pass null since this is not an error case
-                closeProducersAndConsumers(null);
-
-                try
+                // Ensure we only try and close an open session.
+                if (!_closed.getAndSet(true))
                 {
+                    // we pass null since this is not an error case
+                    closeProducersAndConsumers(null);
+
+                    try
+                    {
 
-                    getProtocolHandler().closeSession(this);
+                        getProtocolHandler().closeSession(this);
 
-                    final AMQFrame frame =
-                        ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            0, // classId
-                            0, // methodId
-                            AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                            new AMQShortString("JMS client closing channel")); // replyText
+                        final AMQFrame frame =
+                                ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                                0, // classId
+                                                                0, // methodId
+                                                                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                                                                new AMQShortString("JMS client closing channel")); // replyText
 
-                    getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+                        getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
 
-                    // When control resumes at this point, a reply will have been received that
-                    // indicates the broker has closed the channel successfully.
-                }
-                catch (AMQException e)
-                {
-                    JMSException jmse = new JMSException("Error closing session: " + e);
-                    jmse.setLinkedException(e);
-                    throw jmse;
-                }
-                // This is ignored because the channel is already marked as closed so the fail-over process will
-                // not re-open it.
-                catch (FailoverException e)
-                {
-                    _logger.debug(
-                        "Got FailoverException during channel close, ignored as channel already marked as closed.");
-                }
-                finally
-                {
-                    _connection.deregisterSession(_channelId);
+                        // When control resumes at this point, a reply will have been received that
+                        // indicates the broker has closed the channel successfully.
+                    }
+                    catch (AMQException e)
+                    {
+                        JMSException jmse = new JMSException("Error closing session: " + e);
+                        jmse.setLinkedException(e);
+                        throw jmse;
+                    }
+                    // This is ignored because the channel is already marked as closed so the fail-over process will
+                    // not re-open it.
+                    catch (FailoverException e)
+                    {
+                        _logger.debug(
+                                "Got FailoverException during channel close, ignored as channel already marked as closed.");
+                    }
+                    finally
+                    {
+                        _connection.deregisterSession(_channelId);
+                    }
                 }
             }
         }
-        }
     }
 
     /**
@@ -582,27 +576,26 @@
      */
     public void closed(Throwable e) throws JMSException
     {
-
-        synchronized(_messageDeliveryLock)
+        synchronized (_messageDeliveryLock)
         {
-        synchronized (_connection.getFailoverMutex())
-        {
-            // An AMQException has an error code and message already and will be passed in when closure occurs as a
-            // result of a channel close request
-            _closed.set(true);
-            AMQException amqe;
-            if (e instanceof AMQException)
-            {
-                amqe = (AMQException) e;
-            }
-            else
+            synchronized (_connection.getFailoverMutex())
             {
-                amqe = new AMQException("Closing session forcibly", e);
-            }
+                // An AMQException has an error code and message already and will be passed in when closure occurs as a
+                // result of a channel close request
+                _closed.set(true);
+                AMQException amqe;
+                if (e instanceof AMQException)
+                {
+                    amqe = (AMQException) e;
+                }
+                else
+                {
+                    amqe = new AMQException("Closing session forcibly", e);
+                }
 
-            _connection.deregisterSession(_channelId);
-            closeProducersAndConsumers(amqe);
-        }
+                _connection.deregisterSession(_channelId);
+                closeProducersAndConsumers(amqe);
+            }
         }
     }
 
@@ -637,7 +630,7 @@
             final AMQProtocolHandler handler = getProtocolHandler();
 
             handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
-                TxCommitOkBody.class);
+                              TxCommitOkBody.class);
         }
         catch (AMQException e)
         {
@@ -719,12 +712,12 @@
     }
 
     public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
-                messageSelector, null, true, true);
+                                  messageSelector, null, true, true);
     }
 
     public MessageConsumer createConsumer(Destination destination) throws JMSException
@@ -732,7 +725,7 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null,
-                false, false);
+                                  false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -740,20 +733,20 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false,
-                messageSelector, null, false, false);
+                                  messageSelector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
-                messageSelector, null, false, false);
+                                  messageSelector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
-        String selector) throws JMSException
+                                          String selector) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -761,7 +754,7 @@
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
-        boolean exclusive, String selector) throws JMSException
+                                          boolean exclusive, String selector) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -769,7 +762,7 @@
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
-        String selector, FieldTable rawSelector) throws JMSException
+                                          String selector, FieldTable rawSelector) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -777,12 +770,12 @@
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
-        boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
+                                          boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
     {
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
-                false);
+                                  false);
     }
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -797,7 +790,7 @@
             if (subscriber.getTopic().equals(topic))
             {
                 throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
-                    + name);
+                                                + name);
             }
             else
             {
@@ -825,7 +818,7 @@
                 else
                 {
                     _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
-                        + "for creation durableSubscriber. Requesting queue deletion regardless.");
+                                 + "for creation durableSubscriber. Requesting queue deletion regardless.");
                 }
 
                 deleteQueue(dest.getAMQQueueName());
@@ -835,7 +828,7 @@
                 // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
                 // says we must trash the subscription.
                 if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
-                        && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+                    && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
                 {
                     deleteQueue(dest.getAMQQueueName());
                 }
@@ -852,7 +845,7 @@
 
     /** Note, currently this does not handle reuse of the same name with different topics correctly. */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkNotClosed();
         checkValidTopic(topic);
@@ -909,13 +902,13 @@
     }
 
     public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
-        throws JMSException
+            throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate);
     }
 
     public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
-        boolean waitUntilSent) throws JMSException
+                                               boolean waitUntilSent) throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
     }
@@ -965,28 +958,28 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
-        final boolean exclusive) throws AMQException
+                            final boolean exclusive) throws AMQException
     {
         new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
             {
-                public Object execute() throws AMQException, FailoverException
-                {
-                    AMQFrame queueDeclare =
+                AMQFrame queueDeclare =
                         QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            null, // arguments
-                            autoDelete, // autoDelete
-                            durable, // durable
-                            exclusive, // exclusive
-                            false, // nowait
-                            false, // passive
-                            name, // queue
-                            getTicket()); // ticket
+                                                        null, // arguments
+                                                        autoDelete, // autoDelete
+                                                        durable, // durable
+                                                        exclusive, // exclusive
+                                                        false, // nowait
+                                                        false, // passive
+                                                        name, // queue
+                                                        getTicket()); // ticket
 
-                    getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+                getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
 
-                    return null;
-                }
-            }, _connection).execute();
+                return null;
+            }
+        }, _connection).execute();
     }
 
     /**
@@ -1279,8 +1272,8 @@
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Message["
-                + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
-                + "] received in session with channel id " + _channelId);
+                          + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
+                          + "] received in session with channel id " + _channelId);
         }
 
         if (message.getDeliverBody() == null)
@@ -1354,15 +1347,15 @@
             {
                 // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
                 _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                        getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+                                                                                            getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
                 _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
             }
             else
             {
 
                 _connection.getProtocolHandler().syncWrite(
-                    BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
-                    , BasicRecoverOkBody.class);
+                        BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+                        , BasicRecoverOkBody.class);
             }
 
             if (!isSuspended)
@@ -1412,8 +1405,8 @@
             }
 
             AMQFrame basicRejectBody =
-                BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                    requeue);
+                    BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+                                                   requeue);
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
@@ -1453,7 +1446,7 @@
                 }
 
                 _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
-                        getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+                                                                                         getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
 
                 if (!isSuspended)
                 {
@@ -1532,7 +1525,7 @@
                 else
                 {
                     _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
-                        + " Requesting queue deletion regardless.");
+                                 + " Requesting queue deletion regardless.");
                 }
 
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
@@ -1553,8 +1546,8 @@
     }
 
     protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
-        final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
-        final boolean noConsume, final boolean autoClose) throws JMSException
+                                                 final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+                                                 final boolean noConsume, final boolean autoClose) throws JMSException
     {
         checkTemporaryDestination(destination);
 
@@ -1597,9 +1590,9 @@
                         }
 
                         BasicMessageConsumer consumer =
-                            new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
-                                _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
-                                exclusive, _acknowledgeMode, noConsume, autoClose);
+                                new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
+                                                         _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
+                                                         exclusive, _acknowledgeMode, noConsume, autoClose);
 
                         if (_messageListener != null)
                         {
@@ -1619,7 +1612,7 @@
                         catch (AMQInvalidRoutingKeyException e)
                         {
                             JMSException ide =
-                                new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
+                                    new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
                             ide.setLinkedException(e);
                             throw ide;
                         }
@@ -1705,26 +1698,26 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
-        throws JMSException
+            throws JMSException
     {
         try
         {
             AMQMethodEvent response =
-                new FailoverRetrySupport<AMQMethodEvent, AMQException>(
-                    new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
-                    {
-                        public AMQMethodEvent execute() throws AMQException, FailoverException
-                        {
-                            AMQFrame boundFrame =
-                                ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
-                                    getProtocolMinorVersion(), exchangeName, // exchange
-                                    queueName, // queue
-                                    routingKey); // routingKey
+                    new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+                            new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+                            {
+                                public AMQMethodEvent execute() throws AMQException, FailoverException
+                                {
+                                    AMQFrame boundFrame =
+                                            ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
+                                                                             getProtocolMinorVersion(), exchangeName, // exchange
+                                                                             queueName, // queue
+                                                                             routingKey); // routingKey
 
-                            return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+                                    return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
 
-                        }
-                    }, _connection).execute();
+                                }
+                            }, _connection).execute();
 
             // Extract and return the response code from the query.
             ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
@@ -1794,9 +1787,16 @@
         }
     }
 
-    synchronized void startDistpatcherIfNecessary()
+    void startDistpatcherIfNecessary()
     {
+        //If we are the dispatcher then we don't need to check we are started
+        if (Thread.currentThread() == _dispatcher)
+        {
+            return;
+        }
+
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+        // This is final per session so will be multi-thread safe.
         if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection
@@ -1933,14 +1933,14 @@
         if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this))
         {
             throw new javax.jms.InvalidDestinationException(
-                "Cannot create a subscription on a temporary topic created in another session");
+                    "Cannot create a subscription on a temporary topic created in another session");
         }
 
         if (!(topic instanceof AMQTopic))
         {
             throw new javax.jms.InvalidDestinationException(
-                "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
-                + topic.getClass().getName());
+                    "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
+                    + topic.getClass().getName());
         }
 
         return (AMQTopic) topic;
@@ -2040,7 +2040,7 @@
      * @param queueName
      */
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
-        AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+                                  AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
         // need to generate a consumer tag on the client so we can exploit the nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
@@ -2069,14 +2069,14 @@
         {
             // TODO: Be aware of possible changes to parameter order as versions change.
             AMQFrame jmsConsume =
-                BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
-                    tag, // consumerTag
-                    consumer.isExclusive(), // exclusive
-                    consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
-                    consumer.isNoLocal(), // noLocal
-                    nowait, // nowait
-                    queueName, // queue
-                    getTicket()); // ticket
+                    BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+                                                    tag, // consumerTag
+                                                    consumer.isExclusive(), // exclusive
+                                                    consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+                                                    consumer.isNoLocal(), // noLocal
+                                                    nowait, // nowait
+                                                    queueName, // queue
+                                                    getTicket()); // ticket
 
             if (nowait)
             {
@@ -2096,13 +2096,13 @@
     }
 
     private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
-        throws JMSException
+            throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, false);
     }
 
     private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
-        final boolean immediate, final boolean waitUntilSent) throws JMSException
+                                                    final boolean immediate, final boolean waitUntilSent) throws JMSException
     {
         return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
                 new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
@@ -2112,8 +2112,8 @@
                         checkNotClosed();
                         long producerId = getNextProducerId();
                         BasicMessageProducer producer =
-                            new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
-                                AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+                                new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
+                                                         AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2141,29 +2141,29 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
-        final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+                                 final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
     {
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
             {
-                public Object execute() throws AMQException, FailoverException
-                {
-                    AMQFrame exchangeDeclare =
+                AMQFrame exchangeDeclare =
                         ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            null, // arguments
-                            false, // autoDelete
-                            false, // durable
-                            name, // exchange
-                            false, // internal
-                            nowait, // nowait
-                            false, // passive
-                            getTicket(), // ticket
-                            type); // type
+                                                           null, // arguments
+                                                           false, // autoDelete
+                                                           false, // durable
+                                                           name, // exchange
+                                                           false, // internal
+                                                           nowait, // nowait
+                                                           false, // passive
+                                                           getTicket(), // ticket
+                                                           type); // type
 
-                    protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+                protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
 
-                    return null;
-                }
-            }, _connection).execute();
+                return null;
+            }
+        }, _connection).execute();
     }
 
     /**
@@ -2188,7 +2188,7 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
-        throws AMQException
+            throws AMQException
     {
         /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
         return new FailoverNoopSupport<AMQShortString, AMQException>(
@@ -2203,15 +2203,15 @@
                         }
 
                         AMQFrame queueDeclare =
-                            QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                null, // arguments
-                                amqd.isAutoDelete(), // autoDelete
-                                amqd.isDurable(), // durable
-                                amqd.isExclusive(), // exclusive
-                                false, // nowait
-                                false, // passive
-                                amqd.getAMQQueueName(), // queue
-                                getTicket()); // ticket
+                                QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                                null, // arguments
+                                                                amqd.isAutoDelete(), // autoDelete
+                                                                amqd.isDurable(), // durable
+                                                                amqd.isExclusive(), // exclusive
+                                                                false, // nowait
+                                                                false, // passive
+                                                                amqd.getAMQQueueName(), // queue
+                                                                getTicket()); // ticket
 
                         protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
 
@@ -2236,22 +2236,22 @@
         try
         {
             new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+            {
+                public Object execute() throws AMQException, FailoverException
                 {
-                    public Object execute() throws AMQException, FailoverException
-                    {
-                        AMQFrame queueDeleteFrame =
+                    AMQFrame queueDeleteFrame =
                             QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                false, // ifEmpty
-                                false, // ifUnused
-                                true, // nowait
-                                queueName, // queue
-                                getTicket()); // ticket
+                                                           false, // ifEmpty
+                                                           false, // ifUnused
+                                                           true, // nowait
+                                                           queueName, // queue
+                                                           getTicket()); // ticket
 
-                        getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+                    getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
 
-                        return null;
-                    }
-                }, _connection).execute();
+                    return null;
+                }
+            }, _connection).execute();
         }
         catch (AMQException e)
         {
@@ -2370,7 +2370,7 @@
                     {
                         suspendChannel(true);
                         _logger.info(
-                            "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
+                                "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
                     }
                     catch (AMQException e)
                     {
@@ -2419,7 +2419,7 @@
         if (_logger.isInfoEnabled())
         {
             _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
-                + requeue);
+                         + requeue);
 
             if (messages.hasNext())
             {
@@ -2439,7 +2439,7 @@
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
-                        + message.getDeliverBody().deliveryTag);
+                                  + message.getDeliverBody().deliveryTag);
                 }
 
                 messages.remove();
@@ -2480,44 +2480,44 @@
     private void returnBouncedMessage(final UnprocessedMessage message)
     {
         _connection.performConnectionTask(new Runnable()
+        {
+            public void run()
             {
-                public void run()
+                try
                 {
-                    try
-                    {
-                        // Bounced message is processed here, away from the mina thread
-                        AbstractJMSMessage bouncedMessage =
+                    // Bounced message is processed here, away from the mina thread
+                    AbstractJMSMessage bouncedMessage =
                             _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange,
-                                message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
-
-                        AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
-                        AMQShortString reason = message.getBounceBody().replyText;
-                        _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+                                                                  message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
 
-                        // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
-                        if (errorCode == AMQConstant.NO_CONSUMERS)
-                        {
-                            _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
-                        }
-                        else if (errorCode == AMQConstant.NO_ROUTE)
-                        {
-                            _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
-                        }
-                        else
-                        {
-                            _connection.exceptionReceived(
-                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                        }
+                    AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+                    AMQShortString reason = message.getBounceBody().replyText;
+                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
+                    // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+                    if (errorCode == AMQConstant.NO_CONSUMERS)
+                    {
+                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
                     }
-                    catch (Exception e)
+                    else if (errorCode == AMQConstant.NO_ROUTE)
+                    {
+                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+                    }
+                    else
                     {
-                        _logger.error(
+                        _connection.exceptionReceived(
+                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+                    }
+
+                }
+                catch (Exception e)
+                {
+                    _logger.error(
                             "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
                             e);
-                    }
                 }
-            });
+            }
+        });
     }
 
     /**
@@ -2544,8 +2544,8 @@
                 _suspended = suspend;
 
                 AMQFrame channelFlowFrame =
-                    ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                        !suspend);
+                        ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                       !suspend);
 
                 _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
             }
@@ -2735,7 +2735,7 @@
                 if (_dispatcherLogger.isDebugEnabled())
                 {
                     _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started")
-                        + ": Currently " + (currently ? "Stopped" : "Started"));
+                                            + ": Currently " + (currently ? "Stopped" : "Started"));
                 }
             }
 
@@ -2747,7 +2747,7 @@
             if (message.getDeliverBody() != null)
             {
                 final BasicMessageConsumer consumer =
-                    (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+                        (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
 
                 if ((consumer == null) || consumer.isClosed())
                 {
@@ -2756,14 +2756,14 @@
                         if (consumer == null)
                         {
                             _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliverBody().deliveryTag + "] from queue "
-                                + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
+                                                   + message.getDeliverBody().deliveryTag + "] from queue "
+                                                   + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
                         }
                         else
                         {
                             _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
-                                + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
+                                                   + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+                                                   + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
                         }
                     }
                     // Don't reject if we're already closing

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=580293&r1=580292&r2=580293&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Sep 28 03:41:49 2007
@@ -240,15 +240,12 @@
 
             if (messageListener != null)
             {
-                // handle case where connection has already been started, and the dispatcher has alreaded started
+                //todo: handle case where connection has already been started, and the dispatcher has alreaded started
                 // putting values on the _synchronousQueue
 
-                synchronized (_session)
-                {
                     _messageListener.set(messageListener);
                     _session.setHasMessageListeners();
                     _session.startDistpatcherIfNecessary();
-                }
             }
         }
     }