You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/02/25 05:56:45 UTC

svn commit: r630733 - /incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java

Author: ritchiem
Date: Sun Feb 24 20:56:42 2008
New Revision: 630733

URL: http://svn.apache.org/viewvc?rev=630733&view=rev
Log:
QPID-810 : Moved check for closingChannels higher in stack and close channel on any AMQException being thrown from the body.handle methods.

Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=630733&r1=630732&r2=630733&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Sun Feb 24 20:56:42 2008
@@ -109,7 +109,7 @@
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
-    private List<Integer> _closingChannelsList = new ArrayList<Integer>();
+    private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
@@ -208,9 +208,39 @@
         {
             _logger.debug("Frame Received: " + frame);
         }
+
+        // Check that this channel is not closing
+        if (channelAwaitingClosure(channelId))
+        {
+            if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+                }
+            }
+            else
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+                }
+
+                return;
+            }
+        }
                 
 
-        body.handle(channelId, this);
+
+        try
+        {
+            body.handle(channelId, this);
+        }
+        catch (AMQException e)
+        {
+            closeChannel(channelId);
+            throw e;
+        }
 
     }
 
@@ -259,27 +289,6 @@
 
         final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
 
-        // Check that this channel is not closing
-        if (channelAwaitingClosure(channelId))
-        {
-            if ((evt.getMethod() instanceof ChannelCloseOkBody))
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
-                }
-            }
-            else
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
-                }
-
-                return;
-            }
-        }
-
         try
         {
             try
@@ -341,6 +350,7 @@
                     _logger.info("Closing connection due to: " + e.getMessage());
                 }
 
+                markChannelawaitingCloseOk(channelId);
                 closeSession();
                 _stateManager.changeState(AMQState.CONNECTION_CLOSING);
                 writeFrame(e.getCloseFrame(channelId));