You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/02/05 11:13:22 UTC

svn commit: r906890 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: ritchiem
Date: Fri Feb  5 10:13:21 2010
New Revision: 906890

URL: http://svn.apache.org/viewvc?rev=906890&view=rev
Log:
QPID-2370 : Committing patch to improve broker logging. This will not cleanly apply to trunk due to IO changes.
QPID-1084 : Committed change to prevent flow control threads being created/sent if the channel/session is closed or the state has acutally changed in the mean time.
Wrapped .debug statements as per review feedback

Merged and adapted these changes from 0.5.x r905592,905596,905605

The AMQMinaProtocolSession Changes were moved to the AMQProtocolEngine

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=906890&r1=906889&r2=906890&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Feb  5 10:13:21 2010
@@ -407,11 +407,13 @@
                             evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
                                                                    AMQConstant.CHANNEL_ERROR.getName().toString());
 
+                    _logger.info(e.getMessage() + " whilst processing:" + methodBody);
                     closeConnection(channelId, ce, false);
                 }
             }
             catch (AMQConnectionException e)
             {
+                _logger.info(e.getMessage() + " whilst processing:" + methodBody);
                 closeConnection(channelId, e, false);
             }
         }
@@ -744,7 +746,7 @@
     {
         if (_logger.isInfoEnabled())
         {
-            _logger.info("Closing connection due to: " + e.getMessage());
+            _logger.info("Closing connection due to: " + e);
         }
 
         markChannelAwaitingCloseOk(channelId);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=906890&r1=906889&r2=906890&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Feb  5 10:13:21 2010
@@ -196,7 +196,7 @@
                   || (body instanceof ChannelCloseOkBody)
                   || (body instanceof ChannelCloseBody)))
             {
-                throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
+                throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed won't process:" + body);
             }
 
         }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=906890&r1=906889&r2=906890&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Feb  5 10:13:21 2010
@@ -170,6 +170,8 @@
         }
     }
 
+    final AMQSession _thisSession = this;
+    
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
@@ -438,22 +440,45 @@
 
                                                          public void aboveThreshold(int currentValue)
                                                          {
-                                                             _logger.debug(
-                                                                     "Above threshold(" + _prefetchHighMark
-                                                                     + ") so suspending channel. Current value is " + currentValue);
-                                                             _suspendState.set(true);
-                                                             new Thread(new SuspenderRunner(_suspendState)).start();
-
+                                                             // If the session has been closed don't waste time creating a thread to do
+                                                             // flow control
+                                                             if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                                                             {   
+                                                                 // Only executute change if previous state
+                                                                 // was False
+                                                                 if (!_suspendState.getAndSet(true))
+                                                                 {
+                                                                     if (_logger.isDebugEnabled())
+                                                                     {
+                                                                         _logger.debug(
+                                                                                 "Above threshold(" + _prefetchHighMark
+                                                                                 + ") so suspending channel. Current value is " + currentValue);
+                                                                     }
+                                                                     new Thread(new SuspenderRunner(_suspendState)).start();
+                                                                 }
+                                                             }
                                                          }
 
                                                          public void underThreshold(int currentValue)
                                                          {
-                                                             _logger.debug(
-                                                                     "Below threshold(" + _prefetchLowMark
-                                                                     + ") so unsuspending channel. Current value is " + currentValue);
-                                                             _suspendState.set(false);
-                                                             new Thread(new SuspenderRunner(_suspendState)).start();
-
+                                                             // If the session has been closed don't waste time creating a thread to do
+                                                             // flow control
+                                                             if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                                                             {
+                                                                 // Only executute change if previous state
+                                                                 // was true
+                                                                 if (_suspendState.getAndSet(false))
+                                                                 {
+                                                                     if (_logger.isDebugEnabled())
+                                                                     {
+
+                                                                         _logger.debug(
+                                                                                 "Below threshold(" + _prefetchLowMark
+                                                                                 + ") so unsuspending channel. Current value is " + currentValue);
+                                                                     }
+                                                                    new Thread(new SuspenderRunner(_suspendState)).start();
+                                                                 }
+                                                             }
                                                          }
                                                      });
         }
@@ -2947,8 +2972,9 @@
 
             if (_dispatcherLogger.isInfoEnabled())
             {
-                _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
+                _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
             }
+
         }
 
         // only call while holding lock
@@ -3110,12 +3136,22 @@
             {
                 synchronized (_suspensionLock)
                 {
-                    suspendChannel(_suspend.get());
+                    // If the session has closed by the time we get here
+                    // then we should not attempt to write to the sesion/channel.
+                    if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+                    {
+                        suspendChannel(_suspend.get());
+                    }
                 }
             }
             catch (AMQException e)
             {
-                _logger.warn("Unable to suspend channel");
+                _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Is the _queue empty?" + _queue.isEmpty());
+                    _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+                }
             }
         }
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org