You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/22 21:58:03 UTC
svn commit: r498797 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/protocol/
common/src/main/java/org/apache/qpid/framing/
Author: kpvdr
Date: Mon Jan 22 12:58:01 2007
New Revision: 498797
URL: http://svn.apache.org/viewvc?view=rev&rev=498797
Log:
Added session close convinience methods to broker ProtocolSession, modified handlers that need to close a session to use new methods. Added logger to RequestManager and ResponseManager.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Mon Jan 22 12:58:01 2007
@@ -51,8 +51,8 @@
AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
{
ChannelCloseBody body = evt.getMethod();
- _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
- " and method " + body.methodId);
+ _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " +
+ body.classId + " and method " + body.methodId);
protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Mon Jan 22 12:58:01 2007
@@ -51,19 +51,8 @@
AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
{
final ConnectionCloseBody body = evt.getMethod();
- _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + protocolSession);
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
- try
- {
- protocolSession.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
+ _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode +
+ "/" + body.replyText + " for " + protocolSession);
+ protocolSession.closeSessionResponse(evt.getRequestId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Mon Jan 22 12:58:01 2007
@@ -49,17 +49,7 @@
public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
{
- //todo should this not do more than just log the method?
_logger.info("Received Connection-close-ok");
-
- try
- {
- protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- protocolSession.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
+ protocolSession.closeSession();
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Mon Jan 22 12:58:01 2007
@@ -76,16 +76,9 @@
// Can't do this as we violate protocol. Need to send Close
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
- stateManager.changeState(AMQState.CONNECTION_CLOSING);
- // Be aware of possible changes to parameter order as versions change.
- AMQMethodBody close = ConnectionCloseBody.createMethodBody(
- major, minor, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz(major, minor), // classId
- ConnectionCloseBody.getMethod(major, minor), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
- protocolSession.writeResponse(evt, close);
disposeSaslServer(protocolSession);
+ protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), body.getMethod());
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java Mon Jan 22 12:58:01 2007
@@ -23,20 +23,15 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
@@ -77,14 +72,11 @@
{
session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
"No such queue, '" + body.queue + "'");
-// channelClose(session, channelId, stateManager,
-// "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
}
else
{
- connectionClose(session, channelId, session.getStateManager(),
- "No queue name provided, no default queue defined.",
- AMQConstant.NOT_ALLOWED);
+ session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ "No queue name provided, no default queue defined.", body.getClazz(), body.getMethod());
}
}
else
@@ -103,54 +95,18 @@
}
catch (AMQInvalidSelectorException ise)
{
- _log.info("Closing connection due to invalid selector");
+ _log.info("Closing connection due to invalid selector: " + ise.getMessage());
session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
ise.getMessage());
-// channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
}
catch (ConsumerTagNotUniqueException e)
{
- connectionClose(session, channelId, session.getStateManager(),
- "Non-unique consumer tag, '" + body.destination + "'",
- AMQConstant.NOT_ALLOWED);
+ _log.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage());
+ session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ "Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod());
}
}
}
}
-
-// private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
-// String message, AMQConstant code)
-// throws AMQException
-// {
-// /*AMQShort*/String msg = new /*AMQShort*/String(message);
-// // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-// // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-// // Be aware of possible changes to parameter order as versions change.
-// session.writeRequest(channelId, ChannelCloseBody.createMethodBody
-// ((byte)0, (byte)9, // AMQP version (major, minor)
-// MessageConsumeBody.getClazz((byte)0, (byte)9), // classId
-// MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId
-// code.getCode(), // replyCode
-// msg), // replyText
-// listener);
-// }
-
- private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
- String message, AMQConstant code)
- throws AMQException
- {
- byte major = session.getMajor();
- byte minor = session.getMinor();
- /*AMQShort*/String msg = new /*AMQShort*/String(message);
- // Be aware of possible changes to parameter order as versions change.
- session.writeRequest(channelId, ConnectionCloseBody.createMethodBody(
- major, minor, // AMQP version (major, minor)
- MessageConsumeBody.getClazz(major, minor), // classId
- MessageConsumeBody.getMethod(major, minor), // methodId
- code.getCode(), // replyCode
- msg), // replyText
- listener);
- }
-
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 22 12:58:01 2007
@@ -33,6 +33,9 @@
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
@@ -59,6 +62,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.AMQState;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -100,6 +104,7 @@
private Object _lastSent;
+ private boolean _closePending;
private boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
@@ -128,6 +133,8 @@
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
+ _closePending = false;
+ _closed = false;
}
public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -143,6 +150,8 @@
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
+ _closePending = false;
+ _closed = false;
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -168,7 +177,8 @@
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
- private AMQChannel createChannel(int id) throws AMQException {
+ private AMQChannel createChannel(int id) throws AMQException
+ {
IApplicationRegistry registry = ApplicationRegistry.getInstance();
AMQChannel channel = new AMQChannel(id, registry.getMessageStore(),
_exchangeRegistry, this, _stateManager);
@@ -221,12 +231,22 @@
}
}
- else
+ else if(!_closed)
{
AMQFrame frame = (AMQFrame) message;
-
AMQChannel channel = getChannel(frame.channel);
- if (channel == null)
+
+ if (_closePending)
+ {
+ // If a close is pending (ie ChannelClose has been sent, but no ChannelCloseOk received), then
+ // all methods except ChannelCloseOk must be rejected. (AMQP spec)
+ if((frame.bodyFrame instanceof AMQRequestBody))
+ throw new AMQException("Incoming request frame on connection which is pending close.");
+ AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+ if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody))
+ throw new AMQException("Incoming frame on unopened channel is not a Connection.Open method.");
+ }
+ else if (channel == null)
{
// Perform a check on incoming frames that may result in a new channel
// being opened. The frame MUST be:
@@ -235,12 +255,12 @@
// c. Must be a ConnectionOpenBody method.
// Throw an exception for all other incoming frames on an unopened channel
if(!(frame.bodyFrame instanceof AMQRequestBody))
- throw new AMQException("Incoming frame on unopened channel not a request");
+ throw new AMQException("Incoming frame on unopened channel is not a request.");
AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
- if (requestBody.getMethodPayload() instanceof ConnectionOpenBody)
- throw new AMQException("Incoming frame on unopened channel not a Connection.Open method");
+ if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody))
+ throw new AMQException("Incoming frame on unopened channel is not a Channel.Open method.");
if (requestBody.getRequestId() != 1)
- throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1");
+ throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1.");
channel = createChannel(frame.channel);
}
@@ -391,17 +411,36 @@
channel.rollback();
}
}
+
+ // Used to initiate a channel close from the server side and inform the client
+ public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+ {
+ final AMQChannel channel = _channelMap.get(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Unknown channel id " + channelId);
+ }
+ else
+ {
+ channel.close(this);
+ // Be aware of possible changes to parameter order as versions change.
+ AMQMethodBody cf = ChannelCloseBody.createMethodBody
+ (_major, _minor, // AMQP version (major, minor)
+ MessageTransferBody.getClazz((byte)0, (byte)9), // classId
+ MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
+ replyCode, // replyCode
+ replyText); // replyText
+ writeRequest(channelId, cf);
+ // Wait a bit for the Channel.CloseOk to come in from the client, but don't
+ // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
+ // method handler has not already done so.
+ // TODO - Find a better way of doing this without holding up this thread...
+ try { Thread.currentThread().sleep(2000); } // 2 seconds
+ catch (InterruptedException e) {}
+ _channelMap.remove(channelId); // Returns null if already removed (by closeOk handler
+ }
+ }
- /**
- * Close a specific channel. This will remove any resources used by the channel, including:
- * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
- * </ul>
- *
- * @param channelId id of the channel to close
- * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response
- * @throws AMQException if an error occurs closing the channel
- * @throws IllegalArgumentException if the channel id is not valid
- */
// Used to close a channel as a response to a client close request
public void closeChannelResponse(int channelId, long requestId) throws AMQException
{
@@ -425,33 +464,52 @@
}
}
}
+
+ // Used to initiate a connection close from the server side and inform the client
+ public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException
+ {
+ _closePending = true; // This prevents all methods except Close-Ok from being accepted
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ AMQMethodBody close = ConnectionCloseBody.createMethodBody(
+ _major, _minor, // AMQP version (major, minor)
+ classId, // classId
+ methodId, // methodId
+ replyCode, // replyCode
+ replyText); // replyText
+ writeRequest(0, close);
+ // Wait a bit for the Connection.CloseOk to come in from the client, but don't
+ // rely on it. Attempt to close the connection if the ConnectionCloseOk
+ // method handler has not already done so.
+ // TODO - Find a better way of doing this without holding up this thread...
+ try { Thread.currentThread().sleep(2000); } // 2 seconds
+ catch (InterruptedException e) {}
+ closeSession();
+ }
- // Used to close a channel from the server side and inform the client
- public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+ public void closeSessionRequest(int replyCode, String replyText) throws AMQException
{
- final AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new IllegalArgumentException("Unknown channel id");
- }
- else
+ closeSessionRequest(replyCode, replyText, 0, 0);
+ }
+
+ // Used to close a connection as a response to a client close request
+ public void closeSessionResponse(long requestId) throws AMQException
+ {
+ // Be aware of possible changes to parameter order as versions change.
+ writeResponse(0, requestId, ConnectionCloseOkBody.createMethodBody(_major, _minor)); // AMQP version
+ closeSession();
+ }
+
+ public void closeSession() throws AMQException
+ {
+ if (!_closed)
{
- channel.close(this);
- // Be aware of possible changes to parameter order as versions change.
- AMQMethodBody cf = ChannelCloseBody.createMethodBody
- (_major, _minor, // AMQP version (major, minor)
- MessageTransferBody.getClazz((byte)0, (byte)9), // classId
- MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
- replyCode, // replyCode
- replyText); // replyText
- writeRequest(channelId, cf);
- // Wait a bit for the Channel.CloseOk to come in from the client, but don't
- // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
- // method handler has not already done so.
- // TODO - Find a better way of doing this without holding up this thread...
- try { Thread.currentThread().sleep(2000); } // 2 seconds
- catch (InterruptedException e) {}
- _channelMap.remove(channelId); // Returns null if already removed
+ _closed = true;
+ closeAllChannels();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ }
}
}
@@ -492,23 +550,6 @@
channel.close(this);
}
_channelMap.clear();
- }
-
- /**
- * This must be called when the session is _closed in order to free up any resources
- * managed by the session.
- */
- public void closeSession() throws AMQException
- {
- if (!_closed)
- {
- _closed = true;
- closeAllChannels();
- if (_managedObject != null)
- {
- _managedObject.unregister();
- }
- }
}
public String toString()
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Mon Jan 22 12:58:01 2007
@@ -174,18 +174,9 @@
}
else
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- 200, // replyCode
- throwable.getMessage()); // replyText
- session.writeRequest(0, closeBody, methodListener);
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
- protocolSession.close();
+ // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right!
+ session.closeSessionRequest(200, throwable.getMessage());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 22 12:58:01 2007
@@ -72,19 +72,18 @@
*/
void addChannel(AMQChannel channel) throws AMQException;
- /**
- * Close a specific channel. This will remove any resources used by the channel, including:
- * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
- * </ul>
- * @param channelId id of the channel to close
- * @param requestId id of the request that initiated the close, used in response
- * @throws org.apache.qpid.AMQException if an error occurs closing the channel
- * @throws IllegalArgumentException if the channel id is not valid
- */
+ void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
+
void closeChannelResponse(int channelId, long requestId) throws AMQException;
- void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
+ void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException;
+ void closeSessionRequest(int replyCode, String replyText) throws AMQException;
+
+ void closeSessionResponse(long requestId) throws AMQException;
+
+ void closeSession() throws AMQException;
+
/**
* Remove a channel from the session but do not close it.
* @param channelId
@@ -96,12 +95,6 @@
* @param delay delay in seconds (not ms)
*/
void initHeartbeats(int delay);
-
- /**
- * This must be called when the session is _closed in order to free up any resources
- * managed by the session.
- */
- void closeSession() throws AMQException;
/**
* @return a key that uniquely identifies this session
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Mon Jan 22 12:58:01 2007
@@ -22,12 +22,16 @@
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
public class RequestManager
{
+ private static final Logger logger = Logger.getLogger(RequestManager.class);
+
private int channel;
private AMQProtocolWriter protocolWriter;
@@ -71,7 +75,11 @@
lastProcessedResponseId, requestMethodBody);
requestSentMap.put(requestId, methodListener);
protocolWriter.writeFrame(requestFrame);
- // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
+ " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ }
return requestId;
}
@@ -80,7 +88,11 @@
{
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
- // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload());
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
+ " " + responseBody + "; " + responseBody.getMethodPayload());
+ }
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
AMQMethodListener methodListener = requestSentMap.get(requestId);
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Mon Jan 22 12:58:01 2007
@@ -23,6 +23,8 @@
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -30,6 +32,8 @@
public class ResponseManager
{
+ private static final Logger logger = Logger.getLogger(ResponseManager.class);
+
private int channel;
private AMQMethodListener methodListener;
private AMQProtocolWriter protocolWriter;
@@ -113,12 +117,15 @@
public void requestReceived(AMQRequestBody requestBody) throws Exception
{
long requestId = requestBody.getRequestId();
- // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload());
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
+ " " + requestBody + "; " + requestBody.getMethodPayload());
+ }
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
- // TODO: Update MethodEvent to use the RequestBody instead of MethodBody
AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload(), requestId);
methodListener.methodReceived(methodEvent);
}
@@ -126,7 +133,11 @@
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
throws RequestResponseMappingException
{
- // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody);
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
+ " Res[# " + requestId + "]; " + responseMethodBody);
+ }
ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
throw new RequestResponseMappingException(requestId,