You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/11 13:39:10 UTC

svn commit: r574555 - in /incubator/qpid/branches/M2.1/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/server/failure/

Author: ritchiem
Date: Tue Sep 11 04:39:10 2007
New Revision: 574555

URL: http://svn.apache.org/viewvc?rev=574555&view=rev
Log:
QPID-590 : Provide test case and resolution to prevent deadlock occurring on the client when two threads work on the AMQSession object.

Added:
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java   (with props)
Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=574555&r1=574554&r2=574555&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Sep 11 04:39:10 2007
@@ -72,7 +72,6 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,7 +99,6 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -206,14 +204,14 @@
      * subscriptions between executions of the client.
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
-        new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+            new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
 
     /**
      * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
      * up in the {@link #_subscriptions} map.
      */
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
-        new ConcurrentHashMap<BasicMessageConsumer, String>();
+            new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
      * Used to hold incoming messages.
@@ -241,11 +239,11 @@
      * consumer.
      */
     private Map<AMQShortString, BasicMessageConsumer> _consumers =
-        new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+            new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
-        new ConcurrentHashMap<Destination, AtomicInteger>();
+            new ConcurrentHashMap<Destination, AtomicInteger>();
 
     /**
      * Used as a source of unique identifiers for producers within the session.
@@ -305,15 +303,15 @@
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
      */
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
-        MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
         _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
         _strictAMQPFATAL =
-            Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
+                Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
         _immediatePrefetch =
-            _strictAMQP
-            || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+                _strictAMQP
+                || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
 
         _connection = con;
         _transacted = transacted;
@@ -334,31 +332,31 @@
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue =
-                new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
-                    new FlowControllingBlockingQueue.ThresholdListener()
-                    {
-                        public void aboveThreshold(int currentValue)
-                        {
-                            if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                            {
-                                _logger.debug(
-                                    "Above threshold(" + _defaultPrefetchHighMark
-                                    + ") so suspending channel. Current value is " + currentValue);
-                                new Thread(new SuspenderRunner(true)).start();
-                            }
-                        }
-
-                        public void underThreshold(int currentValue)
-                        {
-                            if (_acknowledgeMode == NO_ACKNOWLEDGE)
-                            {
-                                _logger.debug(
-                                    "Below threshold(" + _defaultPrefetchLowMark
-                                    + ") so unsuspending channel. Current value is " + currentValue);
-                                new Thread(new SuspenderRunner(false)).start();
-                            }
-                        }
-                    });
+                    new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+                                                     new FlowControllingBlockingQueue.ThresholdListener()
+                                                     {
+                                                         public void aboveThreshold(int currentValue)
+                                                         {
+                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                             {
+                                                                 _logger.debug(
+                                                                         "Above threshold(" + _defaultPrefetchHighMark
+                                                                         + ") so suspending channel. Current value is " + currentValue);
+                                                                 new Thread(new SuspenderRunner(true)).start();
+                                                             }
+                                                         }
+
+                                                         public void underThreshold(int currentValue)
+                                                         {
+                                                             if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                             {
+                                                                 _logger.debug(
+                                                                         "Below threshold(" + _defaultPrefetchLowMark
+                                                                         + ") so unsuspending channel. Current value is " + currentValue);
+                                                                 new Thread(new SuspenderRunner(false)).start();
+                                                             }
+                                                         }
+                                                     });
         }
         else
         {
@@ -377,10 +375,10 @@
      * @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
      */
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
-        int defaultPrefetchLow)
+               int defaultPrefetchLow)
     {
         this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
-            defaultPrefetchLow);
+             defaultPrefetchLow);
     }
 
     // ===== JMS Session methods.
@@ -435,8 +433,8 @@
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         final AMQFrame ackFrame =
-            BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                multiple);
+                BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+                                            multiple);
 
         if (_logger.isDebugEnabled())
         {
@@ -463,27 +461,27 @@
      * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
      */
     public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
-        final AMQShortString exchangeName) throws AMQException
+                          final AMQShortString exchangeName) throws AMQException
     {
         /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
             {
-                public Object execute() throws AMQException, FailoverException
-                {
-                    AMQFrame queueBind =
+                AMQFrame queueBind =
                         QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            arguments, // arguments
-                            exchangeName, // exchange
-                            false, // nowait
-                            queueName, // queue
-                            routingKey, // routingKey
-                            getTicket()); // ticket
+                                                     arguments, // arguments
+                                                     exchangeName, // exchange
+                                                     false, // nowait
+                                                     queueName, // queue
+                                                     routingKey, // routingKey
+                                                     getTicket()); // ticket
 
-                    getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+                getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
 
-                    return null;
-                }
-            }, _connection).execute();
+                return null;
+            }
+        }, _connection).execute();
     }
 
     /**
@@ -510,59 +508,59 @@
         if (_logger.isInfoEnabled())
         {
             _logger.info("Closing session: " + this + ":"
-                + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                         + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
         }
 
-        synchronized(_messageDeliveryLock)
+        synchronized (_messageDeliveryLock)
         {
 
-        // We must close down all producers and consumers in an orderly fashion. This is the only method
-        // that can be called from a different thread of control from the one controlling the session.
-        synchronized (_connection.getFailoverMutex())
-        {
-            // Ensure we only try and close an open session.
-            if (!_closed.getAndSet(true))
+            // We must close down all producers and consumers in an orderly fashion. This is the only method
+            // that can be called from a different thread of control from the one controlling the session.
+            synchronized (_connection.getFailoverMutex())
             {
-                // we pass null since this is not an error case
-                closeProducersAndConsumers(null);
-
-                try
+                // Ensure we only try and close an open session.
+                if (!_closed.getAndSet(true))
                 {
+                    // we pass null since this is not an error case
+                    closeProducersAndConsumers(null);
 
-                    getProtocolHandler().closeSession(this);
+                    try
+                    {
 
-                    final AMQFrame frame =
-                        ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            0, // classId
-                            0, // methodId
-                            AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                            new AMQShortString("JMS client closing channel")); // replyText
+                        getProtocolHandler().closeSession(this);
 
-                    getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+                        final AMQFrame frame =
+                                ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                                0, // classId
+                                                                0, // methodId
+                                                                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                                                                new AMQShortString("JMS client closing channel")); // replyText
 
-                    // When control resumes at this point, a reply will have been received that
-                    // indicates the broker has closed the channel successfully.
-                }
-                catch (AMQException e)
-                {
-                    JMSException jmse = new JMSException("Error closing session: " + e);
-                    jmse.setLinkedException(e);
-                    throw jmse;
-                }
-                // This is ignored because the channel is already marked as closed so the fail-over process will
-                // not re-open it.
-                catch (FailoverException e)
-                {
-                    _logger.debug(
-                        "Got FailoverException during channel close, ignored as channel already marked as closed.");
-                }
-                finally
-                {
-                    _connection.deregisterSession(_channelId);
+                        getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+
+                        // When control resumes at this point, a reply will have been received that
+                        // indicates the broker has closed the channel successfully.
+                    }
+                    catch (AMQException e)
+                    {
+                        JMSException jmse = new JMSException("Error closing session: " + e);
+                        jmse.setLinkedException(e);
+                        throw jmse;
+                    }
+                    // This is ignored because the channel is already marked as closed so the fail-over process will
+                    // not re-open it.
+                    catch (FailoverException e)
+                    {
+                        _logger.debug(
+                                "Got FailoverException during channel close, ignored as channel already marked as closed.");
+                    }
+                    finally
+                    {
+                        _connection.deregisterSession(_channelId);
+                    }
                 }
             }
         }
-        }
     }
 
     /**
@@ -572,26 +570,26 @@
      */
     public void closed(Throwable e) throws JMSException
     {
-        synchronized(_messageDeliveryLock)
+        synchronized (_messageDeliveryLock)
         {
-        synchronized (_connection.getFailoverMutex())
-        {
-            // An AMQException has an error code and message already and will be passed in when closure occurs as a
-            // result of a channel close request
-            _closed.set(true);
-            AMQException amqe;
-            if (e instanceof AMQException)
+            synchronized (_connection.getFailoverMutex())
             {
-                amqe = (AMQException) e;
-            }
-            else
-            {
-                amqe = new AMQException("Closing session forcibly", e);
-            }
+                // An AMQException has an error code and message already and will be passed in when closure occurs as a
+                // result of a channel close request
+                _closed.set(true);
+                AMQException amqe;
+                if (e instanceof AMQException)
+                {
+                    amqe = (AMQException) e;
+                }
+                else
+                {
+                    amqe = new AMQException("Closing session forcibly", e);
+                }
 
-            _connection.deregisterSession(_channelId);
-            closeProducersAndConsumers(amqe);
-        }
+                _connection.deregisterSession(_channelId);
+                closeProducersAndConsumers(amqe);
+            }
         }
     }
 
@@ -626,7 +624,7 @@
             final AMQProtocolHandler handler = getProtocolHandler();
 
             handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
-                TxCommitOkBody.class);
+                              TxCommitOkBody.class);
         }
         catch (AMQException e)
         {
@@ -709,12 +707,12 @@
     }
 
     public MessageConsumer createBrowserConsumer(Destination destination, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
-                messageSelector, null, true, true);
+                                  messageSelector, null, true, true);
     }
 
     public MessageConsumer createConsumer(Destination destination) throws JMSException
@@ -722,7 +720,7 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null, null,
-                false, false);
+                                  false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -730,20 +728,20 @@
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false,
-                messageSelector, null, false, false);
+                                  messageSelector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false,
-                messageSelector, null, false, false);
+                                  messageSelector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
-        String selector) throws JMSException
+                                          String selector) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -751,7 +749,7 @@
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
-        boolean exclusive, String selector) throws JMSException
+                                          boolean exclusive, String selector) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -759,7 +757,7 @@
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
-        String selector, FieldTable rawSelector) throws JMSException
+                                          String selector, FieldTable rawSelector) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -767,12 +765,12 @@
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
-        boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
+                                          boolean exclusive, String selector, FieldTable rawSelector) throws JMSException
     {
         checkValidDestination(destination);
 
         return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
-                false);
+                                  false);
     }
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -787,7 +785,7 @@
             if (subscriber.getTopic().equals(topic))
             {
                 throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
-                    + name);
+                                                + name);
             }
             else
             {
@@ -815,7 +813,7 @@
                 else
                 {
                     _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
-                        + "for creation durableSubscriber. Requesting queue deletion regardless.");
+                                 + "for creation durableSubscriber. Requesting queue deletion regardless.");
                 }
 
                 deleteQueue(dest.getAMQQueueName());
@@ -825,7 +823,7 @@
                 // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
                 // says we must trash the subscription.
                 if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
-                        && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+                    && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
                 {
                     deleteQueue(dest.getAMQQueueName());
                 }
@@ -842,7 +840,7 @@
 
     /** Note, currently this does not handle reuse of the same name with different topics correctly. */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
-        throws JMSException
+            throws JMSException
     {
         checkNotClosed();
         checkValidTopic(topic);
@@ -899,13 +897,13 @@
     }
 
     public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
-        throws JMSException
+            throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate);
     }
 
     public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
-        boolean waitUntilSent) throws JMSException
+                                               boolean waitUntilSent) throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
     }
@@ -955,28 +953,28 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
-        final boolean exclusive) throws AMQException
+                            final boolean exclusive) throws AMQException
     {
         new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
             {
-                public Object execute() throws AMQException, FailoverException
-                {
-                    AMQFrame queueDeclare =
+                AMQFrame queueDeclare =
                         QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            null, // arguments
-                            autoDelete, // autoDelete
-                            durable, // durable
-                            exclusive, // exclusive
-                            false, // nowait
-                            false, // passive
-                            name, // queue
-                            getTicket()); // ticket
+                                                        null, // arguments
+                                                        autoDelete, // autoDelete
+                                                        durable, // durable
+                                                        exclusive, // exclusive
+                                                        false, // nowait
+                                                        false, // passive
+                                                        name, // queue
+                                                        getTicket()); // ticket
 
-                    getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+                getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
 
-                    return null;
-                }
-            }, _connection).execute();
+                return null;
+            }
+        }, _connection).execute();
     }
 
     /**
@@ -1269,8 +1267,8 @@
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Message["
-                + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
-                + "] received in session with channel id " + _channelId);
+                          + ((message.getDeliverBody() == null) ? ("B:" + message.getBounceBody()) : ("D:" + message.getDeliverBody()))
+                          + "] received in session with channel id " + _channelId);
         }
 
         if (message.getDeliverBody() == null)
@@ -1343,15 +1341,15 @@
             {
                 // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
                 _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                        getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+                                                                                            getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
                 _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
             }
             else
             {
 
                 _connection.getProtocolHandler().syncWrite(
-                    BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
-                    , BasicRecoverOkBody.class);
+                        BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+                        , BasicRecoverOkBody.class);
             }
 
             if (!isSuspended)
@@ -1401,8 +1399,8 @@
             }
 
             AMQFrame basicRejectBody =
-                BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                    requeue);
+                    BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+                                                   requeue);
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
@@ -1442,7 +1440,7 @@
                 }
 
                 _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
-                        getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+                                                                                         getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
 
                 if (!isSuspended)
                 {
@@ -1521,7 +1519,7 @@
                 else
                 {
                     _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
-                        + " Requesting queue deletion regardless.");
+                                 + " Requesting queue deletion regardless.");
                 }
 
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
@@ -1542,8 +1540,8 @@
     }
 
     protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
-        final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
-        final boolean noConsume, final boolean autoClose) throws JMSException
+                                                 final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+                                                 final boolean noConsume, final boolean autoClose) throws JMSException
     {
         checkTemporaryDestination(destination);
 
@@ -1586,9 +1584,9 @@
                         }
 
                         BasicMessageConsumer consumer =
-                            new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
-                                _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
-                                exclusive, _acknowledgeMode, noConsume, autoClose);
+                                new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
+                                                         _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
+                                                         exclusive, _acknowledgeMode, noConsume, autoClose);
 
                         if (_messageListener != null)
                         {
@@ -1608,7 +1606,7 @@
                         catch (AMQInvalidRoutingKeyException e)
                         {
                             JMSException ide =
-                                new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
+                                    new InvalidDestinationException("Invalid routing key:" + amqd.getRoutingKey().toString());
                             ide.setLinkedException(e);
                             throw ide;
                         }
@@ -1694,26 +1692,26 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
-        throws JMSException
+            throws JMSException
     {
         try
         {
             AMQMethodEvent response =
-                new FailoverRetrySupport<AMQMethodEvent, AMQException>(
-                    new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
-                    {
-                        public AMQMethodEvent execute() throws AMQException, FailoverException
-                        {
-                            AMQFrame boundFrame =
-                                ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
-                                    getProtocolMinorVersion(), exchangeName, // exchange
-                                    queueName, // queue
-                                    routingKey); // routingKey
+                    new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+                            new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+                            {
+                                public AMQMethodEvent execute() throws AMQException, FailoverException
+                                {
+                                    AMQFrame boundFrame =
+                                            ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
+                                                                             getProtocolMinorVersion(), exchangeName, // exchange
+                                                                             queueName, // queue
+                                                                             routingKey); // routingKey
 
-                            return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+                                    return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
 
-                        }
-                    }, _connection).execute();
+                                }
+                            }, _connection).execute();
 
             // Extract and return the response code from the query.
             ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
@@ -1783,9 +1781,16 @@
         }
     }
 
-    synchronized void startDistpatcherIfNecessary()
+    void startDistpatcherIfNecessary()
     {
+        //If we are the dispatcher then we don't need to check we are started
+        if (Thread.currentThread() == _dispatcher)
+        {
+            return;
+        }
+
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+        // This is final per session so will be multi-thread safe.
         if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection
@@ -1922,14 +1927,14 @@
         if ((topic instanceof TemporaryDestination) && (((TemporaryDestination) topic).getSession() != this))
         {
             throw new javax.jms.InvalidDestinationException(
-                "Cannot create a subscription on a temporary topic created in another session");
+                    "Cannot create a subscription on a temporary topic created in another session");
         }
 
         if (!(topic instanceof AMQTopic))
         {
             throw new javax.jms.InvalidDestinationException(
-                "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
-                + topic.getClass().getName());
+                    "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
+                    + topic.getClass().getName());
         }
 
         return (AMQTopic) topic;
@@ -2029,7 +2034,7 @@
      * @param queueName
      */
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
-        AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+                                  AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
         // need to generate a consumer tag on the client so we can exploit the nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
@@ -2058,14 +2063,14 @@
         {
             // TODO: Be aware of possible changes to parameter order as versions change.
             AMQFrame jmsConsume =
-                BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
-                    tag, // consumerTag
-                    consumer.isExclusive(), // exclusive
-                    consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
-                    consumer.isNoLocal(), // noLocal
-                    nowait, // nowait
-                    queueName, // queue
-                    getTicket()); // ticket
+                    BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+                                                    tag, // consumerTag
+                                                    consumer.isExclusive(), // exclusive
+                                                    consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+                                                    consumer.isNoLocal(), // noLocal
+                                                    nowait, // nowait
+                                                    queueName, // queue
+                                                    getTicket()); // ticket
 
             if (nowait)
             {
@@ -2085,13 +2090,13 @@
     }
 
     private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
-        throws JMSException
+            throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, false);
     }
 
     private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
-        final boolean immediate, final boolean waitUntilSent) throws JMSException
+                                                    final boolean immediate, final boolean waitUntilSent) throws JMSException
     {
         return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
                 new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
@@ -2101,8 +2106,8 @@
                         checkNotClosed();
                         long producerId = getNextProducerId();
                         BasicMessageProducer producer =
-                            new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
-                                AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+                                new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
+                                                         AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2130,29 +2135,29 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
-        final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+                                 final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
     {
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+        {
+            public Object execute() throws AMQException, FailoverException
             {
-                public Object execute() throws AMQException, FailoverException
-                {
-                    AMQFrame exchangeDeclare =
+                AMQFrame exchangeDeclare =
                         ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                            null, // arguments
-                            false, // autoDelete
-                            false, // durable
-                            name, // exchange
-                            false, // internal
-                            nowait, // nowait
-                            false, // passive
-                            getTicket(), // ticket
-                            type); // type
+                                                           null, // arguments
+                                                           false, // autoDelete
+                                                           false, // durable
+                                                           name, // exchange
+                                                           false, // internal
+                                                           nowait, // nowait
+                                                           false, // passive
+                                                           getTicket(), // ticket
+                                                           type); // type
 
-                    protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+                protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
 
-                    return null;
-                }
-            }, _connection).execute();
+                return null;
+            }
+        }, _connection).execute();
     }
 
     /**
@@ -2177,7 +2182,7 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
-        throws AMQException
+            throws AMQException
     {
         /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
         return new FailoverNoopSupport<AMQShortString, AMQException>(
@@ -2192,15 +2197,15 @@
                         }
 
                         AMQFrame queueDeclare =
-                            QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                null, // arguments
-                                amqd.isAutoDelete(), // autoDelete
-                                amqd.isDurable(), // durable
-                                amqd.isExclusive(), // exclusive
-                                false, // nowait
-                                false, // passive
-                                amqd.getAMQQueueName(), // queue
-                                getTicket()); // ticket
+                                QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                                null, // arguments
+                                                                amqd.isAutoDelete(), // autoDelete
+                                                                amqd.isDurable(), // durable
+                                                                amqd.isExclusive(), // exclusive
+                                                                false, // nowait
+                                                                false, // passive
+                                                                amqd.getAMQQueueName(), // queue
+                                                                getTicket()); // ticket
 
                         protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
 
@@ -2225,22 +2230,22 @@
         try
         {
             new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+            {
+                public Object execute() throws AMQException, FailoverException
                 {
-                    public Object execute() throws AMQException, FailoverException
-                    {
-                        AMQFrame queueDeleteFrame =
+                    AMQFrame queueDeleteFrame =
                             QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                false, // ifEmpty
-                                false, // ifUnused
-                                true, // nowait
-                                queueName, // queue
-                                getTicket()); // ticket
+                                                           false, // ifEmpty
+                                                           false, // ifUnused
+                                                           true, // nowait
+                                                           queueName, // queue
+                                                           getTicket()); // ticket
 
-                        getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+                    getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
 
-                        return null;
-                    }
-                }, _connection).execute();
+                    return null;
+                }
+            }, _connection).execute();
         }
         catch (AMQException e)
         {
@@ -2359,7 +2364,7 @@
                     {
                         suspendChannel(true);
                         _logger.info(
-                            "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
+                                "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
                     }
                     catch (AMQException e)
                     {
@@ -2408,7 +2413,7 @@
         if (_logger.isInfoEnabled())
         {
             _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
-                + requeue);
+                         + requeue);
 
             if (messages.hasNext())
             {
@@ -2428,7 +2433,7 @@
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
-                        + message.getDeliverBody().deliveryTag);
+                                  + message.getDeliverBody().deliveryTag);
                 }
 
                 messages.remove();
@@ -2469,44 +2474,44 @@
     private void returnBouncedMessage(final UnprocessedMessage message)
     {
         _connection.performConnectionTask(new Runnable()
+        {
+            public void run()
             {
-                public void run()
+                try
                 {
-                    try
-                    {
-                        // Bounced message is processed here, away from the mina thread
-                        AbstractJMSMessage bouncedMessage =
+                    // Bounced message is processed here, away from the mina thread
+                    AbstractJMSMessage bouncedMessage =
                             _messageFactoryRegistry.createMessage(0, false, message.getBounceBody().exchange,
-                                message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
+                                                                  message.getBounceBody().routingKey, message.getContentHeader(), message.getBodies());
 
-                        AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
-                        AMQShortString reason = message.getBounceBody().replyText;
-                        _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
-                        // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
-                        if (errorCode == AMQConstant.NO_CONSUMERS)
-                        {
-                            _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
-                        }
-                        else if (errorCode == AMQConstant.NO_ROUTE)
-                        {
-                            _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
-                        }
-                        else
-                        {
-                            _connection.exceptionReceived(
-                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                        }
+                    AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+                    AMQShortString reason = message.getBounceBody().replyText;
+                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
+                    // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+                    if (errorCode == AMQConstant.NO_CONSUMERS)
+                    {
+                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
                     }
-                    catch (Exception e)
+                    else if (errorCode == AMQConstant.NO_ROUTE)
                     {
-                        _logger.error(
+                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+                    }
+                    else
+                    {
+                        _connection.exceptionReceived(
+                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+                    }
+
+                }
+                catch (Exception e)
+                {
+                    _logger.error(
                             "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
                             e);
-                    }
                 }
-            });
+            }
+        });
     }
 
     /**
@@ -2533,8 +2538,8 @@
                 _suspended = suspend;
 
                 AMQFrame channelFlowFrame =
-                    ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                        !suspend);
+                        ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                       !suspend);
 
                 _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
             }
@@ -2670,7 +2675,7 @@
                             _lock.wait();
                         }
 
-                        synchronized(_messageDeliveryLock)
+                        synchronized (_messageDeliveryLock)
                         {
                             dispatchMessage(message);
                         }
@@ -2713,7 +2718,7 @@
                 if (_dispatcherLogger.isDebugEnabled())
                 {
                     _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started")
-                        + ": Currently " + (currently ? "Stopped" : "Started"));
+                                            + ": Currently " + (currently ? "Stopped" : "Started"));
                 }
             }
 
@@ -2725,7 +2730,7 @@
             if (message.getDeliverBody() != null)
             {
                 final BasicMessageConsumer consumer =
-                    (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
+                        (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
 
                 if ((consumer == null) || consumer.isClosed())
                 {
@@ -2734,14 +2739,14 @@
                         if (consumer == null)
                         {
                             _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliverBody().deliveryTag + "] from queue "
-                                + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
+                                                   + message.getDeliverBody().deliveryTag + "] from queue "
+                                                   + message.getDeliverBody().consumerTag + " )without a handler - rejecting(requeue)...");
                         }
                         else
                         {
                             _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
-                                + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
+                                                   + message.getDeliverBody().deliveryTag + "] from queue " + " consumer("
+                                                   + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
                         }
                     }
                     // Don't reject if we're already closing

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=574555&r1=574554&r2=574555&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Sep 11 04:39:10 2007
@@ -240,15 +240,12 @@
 
             if (messageListener != null)
             {
-                // handle case where connection has already been started, and the dispatcher has alreaded started
+                //todo: handle case where connection has already been started, and the dispatcher has alreaded started
                 // putting values on the _synchronousQueue
 
-                synchronized (_session)
-                {
                     _messageListener.set(messageListener);
                     _session.setHasMessageListeners();
                     _session.startDistpatcherIfNecessary();
-                }
             }
         }
     }

Added: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java?rev=574555&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java (added)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java Tue Sep 11 04:39:10 2007
@@ -0,0 +1,211 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.failure;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DeadlockTestCase:
+ *   From a client requirement.
+ *
+ *  The JMS Spec specifies that a Session has a single thread of control. And as such setting message listeners from a
+ * second thread is not allowed.
+ * Section  4.4.6  of the Spec states:
+  <quote>Another consequence is that a connection must be in stopped mode to set up a
+session with more than one message listener. The reason is that when a
+connection is actively delivering messages, once the first message listener for a
+session has been registered, the session is now controlled by the thread of
+control that delivers messages to it. At this point a client thread of control
+cannot be used to further configure the session.</quote>
+ *
+ * It, however, does not specified what we should do in the case. it only states:
+ <quote>Once a connection has been started, all its sessions with a registered message
+listener are dedicated to the thread of control that delivers messages to them. It
+is erroneous for client code to use such a session from another thread of
+control. The only exception to this is the use of the session or connection close
+method.</quote>
+ *
+ * While it may be erroneous the causing a Deadlock is not a very satisfactory solution. This test ensures that we do
+ * no do this. There is no technical reason we cannot currently allow the setting of a messageListener on a new consumer.
+ * The only caveate is due to QPID-577 there is likely to be temporary message 'loss'. As they are stuck on the internal
+ * _synchronousQueue pending a synchronous receive. 
+ *  
+ */
+public class DeadlockTest extends TestCase
+{
+    private static final Logger _logger = LoggerFactory.getLogger(DeadlockTest.class);
+
+
+    public static final String QPID_BROKER_CONNECTION_PROPERTY = "QPIDBROKER";
+
+    private String topic1 = "TEST.DeadLock1.TMP";
+    private String topic2 = "TEST.DeadLock2.TMP";
+
+    private Session sess;
+
+    private Semaphore s = new Semaphore(0);
+    private final String LOCAL = "tcp://localhost:5670";
+    private final String VM = "vm://:1";
+
+    private String BROKER = VM;
+
+    String connectionString = System.getProperty(QPID_BROKER_CONNECTION_PROPERTY,
+                                                 "amqp://guest:guest@/test?brokerlist='" + BROKER + "'");
+
+
+    public void setUp() throws AMQVMBrokerCreationException
+    {
+        if (BROKER.equals("vm://:1"))
+        {
+            TransportConnection.createVMBroker(1);
+        }
+    }
+
+    public void tearDown() throws AMQVMBrokerCreationException
+    {
+        if (BROKER.equals("vm://:1"))
+        {
+            TransportConnection.killAllVMBrokers();
+        }
+    }
+
+    public class EmptyMessageListener implements javax.jms.MessageListener
+    {
+        public void onMessage(Message message)
+        {
+            // do nothing
+        }
+    }
+
+    public void setSessionListener(String topic, javax.jms.MessageListener listener)
+    {
+        try
+        {
+            Topic jmsTopic = sess.createTopic(topic);
+            MessageConsumer subscriber = sess.createConsumer(jmsTopic);
+            subscriber.setMessageListener(listener);
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+            fail("Caught JMSException");
+        }
+    }
+
+    public class TestMessageListener implements javax.jms.MessageListener
+    {
+        public Random r = new Random();
+
+        public void onMessage(Message message)
+        {
+            if (r.nextBoolean())
+            {
+                setSessionListener(topic2, new EmptyMessageListener());
+            }
+        }
+
+    }
+
+    public void testDeadlock() throws InterruptedException, URLSyntaxException, JMSException
+    {
+        // in order to trigger the deadlock we need to
+        // set a message listener from one thread
+        // whilst receiving a message on another thread and on that thread also setting (the same?) message listener
+        AMQConnectionFactory acf = new AMQConnectionFactory(connectionString);
+        Connection conn = acf.createConnection();
+        conn.start();
+        sess = conn.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+        setSessionListener(topic1, new TestMessageListener());
+
+
+        Thread th = new Thread()
+        {
+            public void run()
+            {
+                try
+                {
+                    Topic jmsTopic = sess.createTopic(topic1);
+                    MessageProducer producer = sess.createProducer(jmsTopic);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    Random r = new Random();
+                    long end = System.currentTimeMillis() + 2000;
+                    while (end - System.currentTimeMillis() > 0)
+                    {
+                        if (r.nextBoolean())
+                        {
+                            _logger.info("***************** send message");
+                            Message jmsMessage = sess.createTextMessage("");
+                            producer.send(jmsMessage);
+                        }
+                        else
+                        {
+                            _logger.info("***************** set session listener");
+                            setSessionListener(topic2, new EmptyMessageListener());
+                        }
+                        Thread.yield();
+                    }
+                    _logger.info("done sends");
+                    s.release();
+                }
+                catch (JMSException e)
+                {
+                    e.printStackTrace();
+                    fail("Caught JMSException");
+                }
+            }
+        };
+        th.setDaemon(true);
+        th.setName("testDeadlock");
+        th.start();
+
+        boolean success = s.tryAcquire(1, 4, TimeUnit.SECONDS);
+
+        // if we failed, closing the connection will just hang the test case.
+        if (success)
+        {
+            conn.close();
+        }
+
+        if (!success)
+        {
+            fail("Deadlock ocurred");
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failure/DeadlockTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date