You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/09/24 12:22:39 UTC

svn commit: r578745 - in /incubator/qpid/branches/M2.1: ./ java/client/src/main/java/org/apache/qpid/client/ java/client/src/main/java/org/apache/qpid/client/util/ java/client/src/test/java/org/apache/qpid/test/unit/client/connection/

Author: rupertlssmith
Date: Mon Sep 24 03:22:32 2007
New Revision: 578745

URL: http://svn.apache.org/viewvc?rev=578745&view=rev
Log:
Merged revisions 575663-575687,575689-576860,576862-577192,577194-577315,577317-577659,577661-578047,578049-578060,578062-578604 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r575663 | rgodfrey | 2007-09-14 13:43:13 +0100 (Fri, 14 Sep 2007) | 1 line
  
  QPID-600 : Deadlock on connection.close
........
  r577931 | rgreig | 2007-09-20 22:26:37 +0100 (Thu, 20 Sep 2007) | 1 line
  
  Adding timeouts to two wait() calls to prevent hanging
........
  r578258 | rgreig | 2007-09-21 21:31:18 +0100 (Fri, 21 Sep 2007) | 1 line
  
  QPID-607: dispatcher threads now poll so that the can die when the connection is closed.
........
  r578475 | rgreig | 2007-09-22 20:01:59 +0100 (Sat, 22 Sep 2007) | 1 line
  
  QPID-608 Fix the test by adding in creation of the VM broker
........
  r578509 | rgreig | 2007-09-22 23:05:30 +0100 (Sat, 22 Sep 2007) | 1 line
  
  QPID-609 : dispatcher thread was being restarted by the code that closed the consumer due to the receipt of a basic.cancel frame. Move the dispatcher shutdown to the end of the consumer close process. Also rename the dispatcher _closed field since it clashes with a field in the container class.
........
  r578604 | rgreig | 2007-09-23 22:29:33 +0100 (Sun, 23 Sep 2007) | 4 lines
  
  QPID-589: avoid the deadlock between the session close and the BasicCancelOkHandler by
  not sending a BasicCancel when the session is being closed.
........

Modified:
    incubator/qpid/branches/M2.1/   (props changed)
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java

Propchange: incubator/qpid/branches/M2.1/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Sep 24 03:22:32 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2:1-568187,574873,574876,575253,575688,576861,577193,577316,577660,578048,578061
+/incubator/qpid/branches/M2:1-568187,574873,574876,575253,575663-578604

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Sep 24 03:22:32 2007
@@ -70,11 +70,7 @@
 import java.net.ConnectException;
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -92,6 +88,8 @@
      */
     private final Object _failoverMutex = new Object();
 
+    private final Object _sessionCreationLock = new Object();
+
     /**
      * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
      * and we must prevent the client from opening too many. Zero means unlimited.
@@ -509,6 +507,8 @@
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
                                                      final int prefetchHigh, final int prefetchLow) throws JMSException
     {
+        synchronized(_sessionCreationLock)
+        {
         checkNotClosed();
 
         if (channelLimitReached())
@@ -572,6 +572,7 @@
                         return session;
                     }
                 }, this).execute();
+        }
     }
 
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
@@ -760,44 +761,63 @@
 
     public void close(long timeout) throws JMSException
     {
-        synchronized (getFailoverMutex())
+        close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+    }
+
+    public void close(List<AMQSession> sessions, long timeout) throws JMSException
+    {
+        synchronized(_sessionCreationLock)
         {
-            if (!_closed.getAndSet(true))
+            if(!sessions.isEmpty())
             {
-                try
+                AMQSession session = sessions.remove(0);
+                synchronized(session.getMessageDeliveryLock())
                 {
-                    long startCloseTime = System.currentTimeMillis();
+                    close(sessions, timeout);
+                }
+            }
+            else
+            {
+            synchronized (getFailoverMutex())
+            {
+                if (!_closed.getAndSet(true))
+                {
+                    try
+                    {
+                        long startCloseTime = System.currentTimeMillis();
 
-                    _taskPool.shutdown();
-                    closeAllSessions(null, timeout, startCloseTime);
+                        _taskPool.shutdown();
+                        closeAllSessions(null, timeout, startCloseTime);
 
-                    if (!_taskPool.isTerminated())
-                    {
-                        try
+                        if (!_taskPool.isTerminated())
                         {
-                            // adjust timeout
-                            long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+                            try
+                            {
+                                // adjust timeout
+                                long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
 
-                            _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
-                        }
-                        catch (InterruptedException e)
-                        {
-                            _logger.info("Interrupted while shutting down connection thread pool.");
+                                _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS);
+                            }
+                            catch (InterruptedException e)
+                            {
+                                _logger.info("Interrupted while shutting down connection thread pool.");
+                            }
                         }
-                    }
 
-                    // adjust timeout
-                    timeout = adjustTimeout(timeout, startCloseTime);
+                        // adjust timeout
+                        timeout = adjustTimeout(timeout, startCloseTime);
 
-                    _protocolHandler.closeConnection(timeout);
+                        _protocolHandler.closeConnection(timeout);
 
+                    }
+                    catch (AMQException e)
+                    {
+                        JMSException jmse = new JMSException("Error closing connection: " + e);
+                        jmse.setLinkedException(e);
+                        throw jmse;
+                    }
                 }
-                catch (AMQException e)
-                {
-                    JMSException jmse = new JMSException("Error closing connection: " + e);
-                    jmse.setLinkedException(e);
-                    throw jmse;
-                }
+            }
             }
         }
     }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Sep 24 03:22:32 2007
@@ -106,6 +106,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -519,7 +520,6 @@
 
         synchronized (_messageDeliveryLock)
         {
-
             // We must close down all producers and consumers in an orderly fashion. This is the only method
             // that can be called from a different thread of control from the one controlling the session.
             synchronized (_connection.getFailoverMutex())
@@ -666,11 +666,10 @@
                 else
                 {
                     _logger.info("Dispatcher is null so created stopped dispatcher");
-
                     startDistpatcherIfNecessary(true);
                 }
 
-                _dispatcher.rejectPending(consumer);
+                _dispatcher.rejectPending(consumer);                
             }
             else
             {
@@ -1954,11 +1953,6 @@
      */
     private void closeConsumers(Throwable error) throws JMSException
     {
-        if (_dispatcher != null)
-        {
-            _dispatcher.close();
-            _dispatcher = null;
-        }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
         final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1973,10 +1967,15 @@
             }
             else
             {
-                con.close();
+                con.close(false);
             }
         }
         // at this point the _consumers map will be empty
+        if (_dispatcher != null)
+        {
+            _dispatcher.close();
+            _dispatcher = null;
+        }
     }
 
     /**
@@ -2557,12 +2556,17 @@
         }
     }
 
+    Object getMessageDeliveryLock()
+    {
+        return _messageDeliveryLock;
+    }
+
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     private class Dispatcher extends Thread
     {
 
         /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
-        private final AtomicBoolean _closed = new AtomicBoolean(false);
+        private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false);
 
         private final Object _lock = new Object();
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2578,7 +2582,7 @@
 
         public void close()
         {
-            _closed.set(true);
+            _dispatcherClosed.set(true);
             interrupt();
 
             // fixme awaitTermination
@@ -2673,30 +2677,33 @@
 
             try
             {
-                while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
+                while (!_dispatcherClosed.get())
                 {
-                    synchronized (_lock)
+                    message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS);
+                    if (message != null)
                     {
-
-                        while (connectionStopped())
+                        synchronized (_lock)
                         {
-                            _lock.wait(2000);
-                        }
 
-                        if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
-                        {
-                            rejectMessage(message, true);
-                        }
-                        else
-                        {
-                            synchronized (_messageDeliveryLock)
+                            while (connectionStopped())
                             {
-                                dispatchMessage(message);
+                                _lock.wait(2000);
                             }
-                        }
 
-                    }
+                            if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
+                            {
+                                rejectMessage(message, true);
+                            }
+                            else
+                            {
+                                synchronized (_messageDeliveryLock)
+                                {
+                                    dispatchMessage(message);
+                                }
+                            }
 
+                        }
+                    }
                 }
             }
             catch (InterruptedException e)
@@ -2760,7 +2767,7 @@
                         }
                     }
                     // Don't reject if we're already closing
-                    if (!_closed.get())
+                    if (!_dispatcherClosed.get())
                     {
                         rejectMessage(message, true);
                     }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Mon Sep 24 03:22:32 2007
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -69,10 +70,10 @@
         _listener = listener;
     }
 
-    public Object take() throws InterruptedException
+    public Object poll(long time, TimeUnit unit) throws InterruptedException
     {
-        Object o = _queue.take();
-        if (_listener != null)
+        Object o = _queue.poll(time, unit);
+        if (o != null && _listener != null)
         {
             synchronized (_listener)
             {

Modified: incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=578745&r1=578744&r2=578745&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Mon Sep 24 03:22:32 2007
@@ -47,12 +47,12 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-//        TransportConnection.createVMBroker(1);
+        TransportConnection.createVMBroker(1);
     }
 
     protected void tearDown() throws Exception
     {
-//        TransportConnection.killVMBroker(1);
+        TransportConnection.killVMBroker(1);
     }
 
     public void testSimpleConnection()