You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/02/25 05:56:45 UTC
svn commit: r630733 -
/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Author: ritchiem
Date: Sun Feb 24 20:56:42 2008
New Revision: 630733
URL: http://svn.apache.org/viewvc?rev=630733&view=rev
Log:
QPID-810 : Moved check for closingChannels higher in stack and close channel on any AMQException being thrown from the body.handle methods.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=630733&r1=630732&r2=630733&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Sun Feb 24 20:56:42 2008
@@ -109,7 +109,7 @@
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new ArrayList<Integer>();
+ private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
@@ -208,9 +208,39 @@
{
_logger.debug("Frame Received: " + frame);
}
+
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
+ {
+ if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+ }
+
+ return;
+ }
+ }
- body.handle(channelId, this);
+
+ try
+ {
+ body.handle(channelId, this);
+ }
+ catch (AMQException e)
+ {
+ closeChannel(channelId);
+ throw e;
+ }
}
@@ -259,27 +289,6 @@
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((evt.getMethod() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
- }
-
- return;
- }
- }
-
try
{
try
@@ -341,6 +350,7 @@
_logger.info("Closing connection due to: " + e.getMessage());
}
+ markChannelawaitingCloseOk(channelId);
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));