You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/03/17 18:40:15 UTC

svn commit: r637986 - in /incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client: AMQConnection.java AMQSession.java

Author: ritchiem
Date: Mon Mar 17 10:40:14 2008
New Revision: 637986

URL: http://svn.apache.org/viewvc?rev=637986&view=rev
Log:
QPID-849 : Client Deadlock, there are various points where we take the failover mutex to check the _closed values which if it is false then that is all we do. As the _closed check doesn't require the mutex then move all the checks out side of the mutex lock.

Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=637986&r1=637985&r2=637986&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Mar 17 10:40:14 2008
@@ -656,71 +656,71 @@
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
                                                      final int prefetchHigh, final int prefetchLow) throws JMSException
     {
-        synchronized(_sessionCreationLock)
+        synchronized (_sessionCreationLock)
         {
-        checkNotClosed();
+            checkNotClosed();
 
-        if (channelLimitReached())
-        {
-            throw new ChannelLimitReachedException(_maximumChannelCount);
-        }
+            if (channelLimitReached())
+            {
+                throw new ChannelLimitReachedException(_maximumChannelCount);
+            }
 
-        return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
-                new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
-                {
-                    public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+            return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+                    new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
                     {
-                        int channelId = _idFactory.incrementAndGet();
-
-                        if (_logger.isDebugEnabled())
+                        public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
                         {
-                            _logger.debug("Write channel open frame for channel id " + channelId);
-                        }
-
-                        // We must create the session and register it before actually sending the frame to the server to
-                        // open it, so that there is no window where we could receive data on the channel and not be set
-                        // up to handle it appropriately.
-                        AMQSession session =
-                                new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
-                                               prefetchLow);
-                        // _protocolHandler.addSessionByChannel(channelId, session);
-                        registerSession(channelId, session);
+                            int channelId = _idFactory.incrementAndGet();
 
-                        boolean success = false;
-                        try
-                        {
-                            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
-                            success = true;
-                        }
-                        catch (AMQException e)
-                        {
-                            JMSException jmse = new JMSException("Error creating session: " + e);
-                            jmse.setLinkedException(e);
-                            throw jmse;
-                        }
-                        finally
-                        {
-                            if (!success)
+                            if (_logger.isDebugEnabled())
                             {
-                                deregisterSession(channelId);
+                                _logger.debug("Write channel open frame for channel id " + channelId);
                             }
-                        }
 
-                        if (_started)
-                        {
+                            // We must create the session and register it before actually sending the frame to the server to
+                            // open it, so that there is no window where we could receive data on the channel and not be set
+                            // up to handle it appropriately.
+                            AMQSession session =
+                                    new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+                                                   prefetchLow);
+                            // _protocolHandler.addSessionByChannel(channelId, session);
+                            registerSession(channelId, session);
+
+                            boolean success = false;
                             try
                             {
-                                session.start();
+                                createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+                                success = true;
                             }
                             catch (AMQException e)
                             {
-                                throw new JMSAMQException(e);
+                                JMSException jmse = new JMSException("Error creating session: " + e);
+                                jmse.setLinkedException(e);
+                                throw jmse;
+                            }
+                            finally
+                            {
+                                if (!success)
+                                {
+                                    deregisterSession(channelId);
+                                }
                             }
-                        }
 
-                        return session;
-                    }
-                }, this).execute();
+                            if (_started)
+                            {
+                                try
+                                {
+                                    session.start();
+                                }
+                                catch (AMQException e)
+                                {
+                                    throw new JMSAMQException(e);
+                                }
+                            }
+
+                            return session;
+                        }
+                    }, this).execute();
         }
     }
 
@@ -732,13 +732,13 @@
 
         // TODO: Be aware of possible changes to parameter order as versions change.
 
-        _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId),  ChannelOpenOkBody.class);
+        _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
 
-        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
 
         // todo send low water mark when protocol allows.
         // todo Be aware of possible changes to parameter order as versions change.
-        _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+        _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
 
         if (transacted)
         {
@@ -926,13 +926,14 @@
             }
             else
             {
-            synchronized (getFailoverMutex())
-            {
                 if (!_closed.getAndSet(true))
                 {
-                    try
+
+                    synchronized (getFailoverMutex())
                     {
-                        long startCloseTime = System.currentTimeMillis();
+                        try
+                        {
+                            long startCloseTime = System.currentTimeMillis();
 
                         closeAllSessions(null, timeout, startCloseTime);
 

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=637986&r1=637985&r2=637986&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Mar 17 10:40:14 2008
@@ -594,14 +594,14 @@
                          + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
         }
 
-        synchronized (_connection.getFailoverMutex())
+        // Ensure we only try and close an open session.
+        if (!_closed.getAndSet(true))
         {
-            // We must close down all producers and consumers in an orderly fashion. This is the only method
-            // that can be called from a different thread of control from the one controlling the session.
-            synchronized (_messageDeliveryLock)
+            synchronized (_connection.getFailoverMutex())
             {
-                // Ensure we only try and close an open session.
-                if (!_closed.getAndSet(true))
+                // We must close down all producers and consumers in an orderly fashion. This is the only method
+                // that can be called from a different thread of control from the one controlling the session.
+                synchronized (_messageDeliveryLock)
                 {
                     // we pass null since this is not an error case
                     closeProducersAndConsumers(null);
@@ -655,33 +655,37 @@
         // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
         // We need to determin here if the connection should be
 
-        synchronized (_connection.getFailoverMutex())
+        if (e instanceof AMQDisconnectedException)
         {
-            if (e instanceof AMQDisconnectedException)
+            if (_dispatcher != null)
             {
-                if (_dispatcher != null)
-                {
-                    // Failover failed and ain't coming back. Knife the dispatcher.
-                    _dispatcher.interrupt();
-                }
+                // Failover failed and ain't coming back. Knife the dispatcher.
+                _dispatcher.interrupt();
             }
-            synchronized (_messageDeliveryLock)
+        }
+
+        if (!_closed.getAndSet(true))
+        {
+            synchronized (_connection.getFailoverMutex())
             {
-                // An AMQException has an error code and message already and will be passed in when closure occurs as a
-                // result of a channel close request
-                _closed.set(true);
-                AMQException amqe;
-                if (e instanceof AMQException)
-                {
-                    amqe = (AMQException) e;
-                }
-                else
+                synchronized (_messageDeliveryLock)
                 {
-                    amqe = new AMQException("Closing session forcibly", e);
-                }
+                    // An AMQException has an error code and message already and will be passed in when closure occurs as a
+                    // result of a channel close request
+                    AMQException amqe;
+                    if (e instanceof AMQException)
+                    {
+                        amqe = (AMQException) e;
+                    }
+                    else
+                    {
+                        amqe = new AMQException("Closing session forcibly", e);
+                    }
+
 
-                _connection.deregisterSession(_channelId);
-                closeProducersAndConsumers(amqe);
+                    _connection.deregisterSession(_channelId);
+                    closeProducersAndConsumers(amqe);
+                }
             }
         }
     }