You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/02/02 13:05:36 UTC

svn commit: r905596 - /qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: ritchiem
Date: Tue Feb  2 12:05:35 2010
New Revision: 905596

URL: http://svn.apache.org/viewvc?rev=905596&view=rev
Log:
QPID-2370,QPID-1084 : Committed change to prevent flow control threads being created/sent if the channel/session is closed or the state has acutally changed in the mean time.

Modified:
    qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=905596&r1=905595&r2=905596&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb  2 12:05:35 2010
@@ -190,6 +190,8 @@
         }
     }
 
+    final AMQSession _thisSession = this;
+    
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
@@ -456,22 +458,38 @@
 
                                                          public void aboveThreshold(int currentValue)
                                                          {
-                                                             _logger.debug(
-                                                                     "Above threshold(" + _prefetchHighMark
-                                                                     + ") so suspending channel. Current value is " + currentValue);
-                                                             _suspendState.set(true);
-                                                             new Thread(new SuspenderRunner(_suspendState)).start();
-
+                                                             // If the session has been closed don't waste time creating a thread to do
+                                                             // flow control
+                                                             if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                                                             {   
+                                                                 // Only executute change if previous state
+                                                                 // was False
+                                                                 if (!_suspendState.getAndSet(true))
+                                                                 {
+                                                                     _logger.debug(
+                                                                             "Above threshold(" + _prefetchHighMark
+                                                                             + ") so suspending channel. Current value is " + currentValue);
+                                                                     new Thread(new SuspenderRunner(_suspendState)).start();
+                                                                 }
+                                                             }
                                                          }
 
                                                          public void underThreshold(int currentValue)
                                                          {
-                                                             _logger.debug(
-                                                                     "Below threshold(" + _prefetchLowMark
-                                                                     + ") so unsuspending channel. Current value is " + currentValue);
-                                                             _suspendState.set(false);
-                                                             new Thread(new SuspenderRunner(_suspendState)).start();
-
+                                                             // If the session has been closed don't waste time creating a thread to do
+                                                             // flow control
+                                                             if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                                                             {
+                                                                 // Only executute change if previous state
+                                                                 // was true
+                                                                 if (_suspendState.getAndSet(false))
+                                                                 {
+                                                                     _logger.debug(
+                                                                             "Below threshold(" + _prefetchLowMark
+                                                                             + ") so unsuspending channel. Current value is " + currentValue);
+                                                                    new Thread(new SuspenderRunner(_suspendState)).start();
+                                                                 }
+                                                             }
                                                          }
                                                      });
         }
@@ -2946,8 +2964,9 @@
 
             if (_dispatcherLogger.isInfoEnabled())
             {
-                _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
+                _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
             }
+
         }
 
         // only call while holding lock
@@ -3109,12 +3128,22 @@
             {
                 synchronized (_suspensionLock)
                 {
-                    suspendChannel(_suspend.get());
+                    // If the session has closed by the time we get here
+                    // then we should not attempt to write to the sesion/channel.
+                    if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                    {
+                        suspendChannel(_suspend.get());
+                    }
                 }
             }
             catch (AMQException e)
             {
-                _logger.warn("Unable to suspend channel");
+                _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Is the _queue empty?" + _queue.isEmpty());
+                    _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+                }
             }
         }
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org