You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/09/14 14:43:16 UTC

svn commit: r575663 - in /incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client: AMQConnection.java AMQSession.java

Author: rgodfrey
Date: Fri Sep 14 05:43:13 2007
New Revision: 575663

URL: http://svn.apache.org/viewvc?rev=575663&view=rev
Log:
QPID-600 : Deadlock on connection.close

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=575663&r1=575662&r2=575663&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Sep 14 05:43:13 2007
@@ -69,11 +69,7 @@
 import java.net.ConnectException;
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -91,6 +87,8 @@
      */
     private final Object _failoverMutex = new Object();
 
+    private final Object _sessionCreationLock = new Object();
+
     /**
      * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
      * and we must prevent the client from opening too many. Zero means unlimited.
@@ -503,6 +501,8 @@
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
                                                      final int prefetchHigh, final int prefetchLow) throws JMSException
     {
+        synchronized(_sessionCreationLock)
+        {
         checkNotClosed();
 
         if (channelLimitReached())
@@ -566,6 +566,7 @@
                         return session;
                     }
                 }, this).execute();
+        }
     }
 
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
@@ -754,44 +755,63 @@
 
     public void close(long timeout) throws JMSException
     {
-        synchronized (getFailoverMutex())
+        close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+    }
+
+    public void close(List<AMQSession> sessions, long timeout) throws JMSException
+    {
+        synchronized(_sessionCreationLock)
         {
-            if (!_closed.getAndSet(true))
+            if(!sessions.isEmpty())
             {
-                try
+                AMQSession session = sessions.remove(0);
+                synchronized(session.getMessageDeliveryLock())
                 {
-                    long startCloseTime = System.currentTimeMillis();
+                    close(sessions, timeout);
+                }
+            }
+            else
+            {
+            synchronized (getFailoverMutex())
+            {
+                if (!_closed.getAndSet(true))
+                {
+                    try
+                    {
+                        long startCloseTime = System.currentTimeMillis();
 
-                    _taskPool.shutdown();
-                    closeAllSessions(null, timeout, startCloseTime);
+                        _taskPool.shutdown();
+                        closeAllSessions(null, timeout, startCloseTime);
 
-                    if (!_taskPool.isTerminated())
-                    {
-                        try
+                        if (!_taskPool.isTerminated())
                         {
-                            // adjust timeout
-                            long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+                            try
+                            {
+                                // adjust timeout
+                                long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
 
-                            _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
-                        }
-                        catch (InterruptedException e)
-                        {
-                            _logger.info("Interrupted while shutting down connection thread pool.");
+                                _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+                            }
+                            catch (InterruptedException e)
+                            {
+                                _logger.info("Interrupted while shutting down connection thread pool.");
+                            }
                         }
-                    }
 
-                    // adjust timeout
-                    timeout = adjustTimeout(timeout, startCloseTime);
+                        // adjust timeout
+                        timeout = adjustTimeout(timeout, startCloseTime);
 
-                    _protocolHandler.closeConnection(timeout);
+                        _protocolHandler.closeConnection(timeout);
 
+                    }
+                    catch (AMQException e)
+                    {
+                        JMSException jmse = new JMSException("Error closing connection: " + e);
+                        jmse.setLinkedException(e);
+                        throw jmse;
+                    }
                 }
-                catch (AMQException e)
-                {
-                    JMSException jmse = new JMSException("Error closing connection: " + e);
-                    jmse.setLinkedException(e);
-                    throw jmse;
-                }
+            }
             }
         }
     }

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=575663&r1=575662&r2=575663&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Sep 14 05:43:13 2007
@@ -513,13 +513,16 @@
                 + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
         }
 
+
+
         synchronized(_messageDeliveryLock)
         {
-
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session.
         synchronized (_connection.getFailoverMutex())
         {
+
+
             // Ensure we only try and close an open session.
             if (!_closed.getAndSet(true))
             {
@@ -572,6 +575,7 @@
      */
     public void closed(Throwable e) throws JMSException
     {
+
         synchronized(_messageDeliveryLock)
         {
         synchronized (_connection.getFailoverMutex())
@@ -2543,6 +2547,11 @@
                 throw new AMQException("Fail-over interrupted suspend/unsuspend channel.", e);
             }
         }
+    }
+
+    Object getMessageDeliveryLock()
+    {
+        return _messageDeliveryLock;
     }
 
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */