You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/22 21:58:03 UTC

svn commit: r498797 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ common/src/main/java/org/apache/qpid/framing/

Author: kpvdr
Date: Mon Jan 22 12:58:01 2007
New Revision: 498797

URL: http://svn.apache.org/viewvc?view=rev&rev=498797
Log:
Added session close convinience methods to broker ProtocolSession, modified handlers that need to close a session to use new methods. Added logger to RequestManager and ResponseManager.

Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Mon Jan 22 12:58:01 2007
@@ -51,8 +51,8 @@
                                AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
     {
         ChannelCloseBody body = evt.getMethod();
-        _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
-                     " and method " + body.methodId);
+        _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " +
+            body.classId + " and method " + body.methodId);
         protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Mon Jan 22 12:58:01 2007
@@ -51,19 +51,8 @@
                                AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
     {
         final ConnectionCloseBody body = evt.getMethod();
-        _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
-                     body.replyText +  " for " + protocolSession);
-        // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody(
-            protocolSession.getMajor(),  // AMQP major version
-            protocolSession.getMinor())); // AMQP minor version
-        try
-        {
-            protocolSession.closeSession();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Error closing protocol session: " + e, e);
-        }
+        _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode +
+            "/" + body.replyText +  " for " + protocolSession);
+        protocolSession.closeSessionResponse(evt.getRequestId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Mon Jan 22 12:58:01 2007
@@ -49,17 +49,7 @@
     public void methodReceived(AMQProtocolSession protocolSession,
                                AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
     {
-        //todo should this not do more than just log the method?
         _logger.info("Received Connection-close-ok");
-
-        try
-        {
-            protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-            protocolSession.closeSession();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Error closing protocol session: " + e, e);
-        }
+        protocolSession.closeSession();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Mon Jan 22 12:58:01 2007
@@ -76,16 +76,9 @@
                 // Can't do this as we violate protocol. Need to send Close
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
-                stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                // Be aware of possible changes to parameter order as versions change.
-                AMQMethodBody close = ConnectionCloseBody.createMethodBody(
-                    major, minor,	// AMQP version (major, minor)
-                    ConnectionCloseBody.getClazz(major, minor),		// classId
-                    ConnectionCloseBody.getMethod(major, minor),	// methodId
-                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
-                    AMQConstant.NOT_ALLOWED.getName());	// replyText
-                protocolSession.writeResponse(evt, close);
                 disposeSaslServer(protocolSession);
+                protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                    AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), body.getMethod());
                 break;
             case SUCCESS:
                 _logger.info("Connected as: " + ss.getAuthorizationID());

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java Mon Jan 22 12:58:01 2007
@@ -23,20 +23,15 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidSelectorException;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
 public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
@@ -77,14 +72,11 @@
                 {
                     session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
                         "No such queue, '" + body.queue + "'");
-//                     channelClose(session, channelId, stateManager,
-//                                  "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
                 }
                 else
                 {
-                    connectionClose(session, channelId, session.getStateManager(),
-                                    "No queue name provided, no default queue defined.",
-                                    AMQConstant.NOT_ALLOWED);
+                    session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                        "No queue name provided, no default queue defined.", body.getClazz(), body.getMethod());
                 }
             }
             else
@@ -103,54 +95,18 @@
                 }
                 catch (AMQInvalidSelectorException ise)
                 {
-                    _log.info("Closing connection due to invalid selector");
+                    _log.info("Closing connection due to invalid selector: " + ise.getMessage());
                     session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
                         ise.getMessage());
-//                    channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
                 }
                 catch (ConsumerTagNotUniqueException e)
                 {
-                    connectionClose(session, channelId, session.getStateManager(),
-                                    "Non-unique consumer tag, '" + body.destination + "'",
-                                    AMQConstant.NOT_ALLOWED);
+                    _log.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage());
+                    session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                        "Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod());
                 }
             }
         }
     }
-
-//     private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
-//                               String message, AMQConstant code)
-//         throws AMQException
-//     {
-//         /*AMQShort*/String msg = new /*AMQShort*/String(message);
-//         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-//         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-//         // Be aware of possible changes to parameter order as versions change.
-//         session.writeRequest(channelId, ChannelCloseBody.createMethodBody
-//                              ((byte)0, (byte)9,	// AMQP version (major, minor)
-//                               MessageConsumeBody.getClazz((byte)0, (byte)9),	// classId
-//                               MessageConsumeBody.getMethod((byte)0, (byte)9),	// methodId
-//                               code.getCode(),	// replyCode
-//                               msg),	// replyText
-//                              listener);
-//     }
-
-    private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
-                                 String message, AMQConstant code)
-        throws AMQException
-    {
-        byte major = session.getMajor();
-        byte minor = session.getMinor();
-        /*AMQShort*/String msg = new /*AMQShort*/String(message);
-        // Be aware of possible changes to parameter order as versions change.
-        session.writeRequest(channelId, ConnectionCloseBody.createMethodBody(
-                                major, minor,	// AMQP version (major, minor)
-                                MessageConsumeBody.getClazz(major, minor),	// classId
-                                MessageConsumeBody.getMethod(major, minor),	// methodId
-                                code.getCode(),	// replyCode
-                                msg),	// replyText
-                             listener);
-    }
-
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 22 12:58:01 2007
@@ -33,6 +33,9 @@
 import org.apache.qpid.framing.ConnectionOpenBody;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.Content;
 import org.apache.qpid.framing.FieldTable;
@@ -59,6 +62,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.AMQState;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -100,6 +104,7 @@
 
     private Object _lastSent;
 
+    private boolean _closePending;
     private boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
@@ -128,6 +133,8 @@
         _codecFactory = codecFactory;
         _managedObject = createMBean();
         _managedObject.register();
+        _closePending = false;
+        _closed = false;
     }
 
     public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -143,6 +150,8 @@
         _codecFactory = codecFactory;
         _managedObject = createMBean();
         _managedObject.register();
+        _closePending = false;
+        _closed = false;
     }
 
     private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -168,7 +177,8 @@
         return (AMQProtocolSession) minaProtocolSession.getAttachment();
     }
 
-    private AMQChannel createChannel(int id) throws AMQException {
+    private AMQChannel createChannel(int id) throws AMQException
+    {
         IApplicationRegistry registry = ApplicationRegistry.getInstance();
         AMQChannel channel = new AMQChannel(id, registry.getMessageStore(),
                                             _exchangeRegistry, this, _stateManager);
@@ -221,12 +231,22 @@
 
             }
         }
-        else
+        else if(!_closed)
         {
             AMQFrame frame = (AMQFrame) message;
-
             AMQChannel channel = getChannel(frame.channel);
-            if (channel == null)
+
+            if (_closePending)
+            {
+                // If a close is pending (ie ChannelClose has been sent, but no ChannelCloseOk received), then
+                // all methods except ChannelCloseOk must be rejected. (AMQP spec)
+                if((frame.bodyFrame instanceof AMQRequestBody))
+                    throw new AMQException("Incoming request frame on connection which is pending close.");
+                AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+                if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody))
+                    throw new AMQException("Incoming frame on unopened channel is not a Connection.Open method.");         
+            }
+            else if (channel == null)
             {
                 // Perform a check on incoming frames that may result in a new channel
                 // being opened. The frame MUST be:
@@ -235,12 +255,12 @@
                 // c. Must be a ConnectionOpenBody method.
                 // Throw an exception for all other incoming frames on an unopened channel
                 if(!(frame.bodyFrame instanceof AMQRequestBody))
-                    throw new AMQException("Incoming frame on unopened channel not a request");
+                    throw new AMQException("Incoming frame on unopened channel is not a request.");
                 AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
-                if (requestBody.getMethodPayload() instanceof ConnectionOpenBody)
-                    throw new AMQException("Incoming frame on unopened channel not a Connection.Open method");
+                if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody))
+                    throw new AMQException("Incoming frame on unopened channel is not a Channel.Open method.");
                 if (requestBody.getRequestId() != 1)
-                    throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1");
+                    throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1.");
                 channel = createChannel(frame.channel);
             }
 
@@ -391,17 +411,36 @@
             channel.rollback();
         }
     }
+    
+    // Used to initiate a channel close from the server side and inform the client
+    public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+    {
+        final AMQChannel channel = _channelMap.get(channelId);
+        if (channel == null)
+        {
+            throw new IllegalArgumentException("Unknown channel id " + channelId);
+        }
+        else
+        {
+            channel.close(this);
+            // Be aware of possible changes to parameter order as versions change.
+            AMQMethodBody cf = ChannelCloseBody.createMethodBody
+                (_major, _minor,	// AMQP version (major, minor)
+                MessageTransferBody.getClazz((byte)0, (byte)9),	// classId
+                MessageTransferBody.getMethod((byte)0, (byte)9),	// methodId
+                replyCode,	// replyCode
+                replyText);	// replyText
+            writeRequest(channelId, cf);
+            // Wait a bit for the Channel.CloseOk to come in from the client, but don't
+            // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
+            // method handler has not already done so.
+            // TODO - Find a better way of doing this without holding up this thread...
+            try { Thread.currentThread().sleep(2000); } // 2 seconds
+            catch (InterruptedException e) {}
+            _channelMap.remove(channelId); // Returns null if already removed (by closeOk handler
+        }
+    }
 
-    /**
-     * Close a specific channel. This will remove any resources used by the channel, including:
-     * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
-     * </ul>
-     *
-     * @param channelId id of the channel to close
-     * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response
-     * @throws AMQException if an error occurs closing the channel
-     * @throws IllegalArgumentException if the channel id is not valid
-     */
     // Used to close a channel as a response to a client close request
     public void closeChannelResponse(int channelId, long requestId) throws AMQException
     {
@@ -425,33 +464,52 @@
             }
         }
     }
+
+    // Used to initiate a connection close from the server side and inform the client
+    public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException
+    {
+        _closePending = true; // This prevents all methods except Close-Ok from being accepted
+        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        AMQMethodBody close = ConnectionCloseBody.createMethodBody(
+            _major, _minor,	// AMQP version (major, minor)
+            classId,		// classId
+            methodId,	// methodId
+            replyCode,	// replyCode
+            replyText);	// replyText
+        writeRequest(0, close);        
+        // Wait a bit for the Connection.CloseOk to come in from the client, but don't
+        // rely on it. Attempt to close the connection if the ConnectionCloseOk
+        // method handler has not already done so.
+        // TODO - Find a better way of doing this without holding up this thread...
+        try { Thread.currentThread().sleep(2000); } // 2 seconds
+        catch (InterruptedException e) {}
+        closeSession();
+    }
     
-    // Used to close a channel from the server side and inform the client
-    public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+    public void closeSessionRequest(int replyCode, String replyText) throws AMQException
     {
-        final AMQChannel channel = _channelMap.get(channelId);
-        if (channel == null)
-        {
-            throw new IllegalArgumentException("Unknown channel id");
-        }
-        else
+        closeSessionRequest(replyCode, replyText, 0, 0);
+    }
+    
+    // Used to close a connection as a response to a client close request
+    public void closeSessionResponse(long requestId) throws AMQException
+    {
+        // Be aware of possible changes to parameter order as versions change.
+        writeResponse(0, requestId, ConnectionCloseOkBody.createMethodBody(_major, _minor)); // AMQP version
+        closeSession();
+    }
+    
+    public void closeSession() throws AMQException
+    {
+        if (!_closed)
         {
-            channel.close(this);
-            // Be aware of possible changes to parameter order as versions change.
-            AMQMethodBody cf = ChannelCloseBody.createMethodBody
-                (_major, _minor,	// AMQP version (major, minor)
-                MessageTransferBody.getClazz((byte)0, (byte)9),	// classId
-                MessageTransferBody.getMethod((byte)0, (byte)9),	// methodId
-                replyCode,	// replyCode
-                replyText);	// replyText
-            writeRequest(channelId, cf);
-            // Wait a bit for the Channel.CloseOk to come in from the client, but don't
-            // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
-            // method handler has not already done so.
-            // TODO - Find a better way of doing this without holding up this thread...
-            try { Thread.currentThread().sleep(2000); } // 2 seconds
-            catch (InterruptedException e) {}
-            _channelMap.remove(channelId); // Returns null if already removed
+            _closed = true;
+            closeAllChannels();
+            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            if (_managedObject != null)
+            {
+                _managedObject.unregister();
+            }        
         }
     }
 
@@ -492,23 +550,6 @@
             channel.close(this);
         }
         _channelMap.clear();
-    }
-
-    /**
-     * This must be called when the session is _closed in order to free up any resources
-     * managed by the session.
-     */
-    public void closeSession() throws AMQException
-    {
-        if (!_closed)
-        {
-            _closed = true;
-            closeAllChannels();
-            if (_managedObject != null)
-            {
-                _managedObject.unregister();
-            }
-        }
     }
 
     public String toString()

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Mon Jan 22 12:58:01 2007
@@ -174,18 +174,9 @@
         }
         else
         {
-            // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody(
-            	(byte)0, (byte)9,	// AMQP version (major, minor)
-            	0,	// classId
-                0,	// methodId
-                200,	// replyCode
-                throwable.getMessage());	// replyText
-            session.writeRequest(0, closeBody, methodListener);
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
-            protocolSession.close();
+            // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right!
+            session.closeSessionRequest(200, throwable.getMessage());
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 22 12:58:01 2007
@@ -72,19 +72,18 @@
      */
     void addChannel(AMQChannel channel) throws AMQException;
 
-    /**
-     * Close a specific channel. This will remove any resources used by the channel, including:
-     * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
-     * </ul>
-     * @param channelId id of the channel to close
-     * @param requestId id of the request that initiated the close, used in response
-     * @throws org.apache.qpid.AMQException if an error occurs closing the channel
-     * @throws IllegalArgumentException if the channel id is not valid
-     */
+    void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
+    
     void closeChannelResponse(int channelId, long requestId) throws AMQException;
     
-    void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
+    void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException;
 
+    void closeSessionRequest(int replyCode, String replyText) throws AMQException;
+    
+    void closeSessionResponse(long requestId) throws AMQException;
+    
+    void closeSession() throws AMQException;
+    
     /**
      * Remove a channel from the session but do not close it.
      * @param channelId
@@ -96,12 +95,6 @@
      * @param delay delay in seconds (not ms)
      */
     void initHeartbeats(int delay);
-
-    /**
-     * This must be called when the session is _closed in order to free up any resources
-     * managed by the session.
-     */
-    void closeSession() throws AMQException;
 
     /**
      * @return a key that uniquely identifies this session

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Mon Jan 22 12:58:01 2007
@@ -22,12 +22,16 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQProtocolWriter;
 
 public class RequestManager
 {
+    private static final Logger logger = Logger.getLogger(RequestManager.class);
+
     private int channel;
     private AMQProtocolWriter protocolWriter;
     
@@ -71,7 +75,11 @@
             lastProcessedResponseId, requestMethodBody);
         requestSentMap.put(requestId, methodListener);
         protocolWriter.writeFrame(requestFrame);
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
+                " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+        }
         return requestId;
     }
 
@@ -80,7 +88,11 @@
     {
         long requestIdStart = responseBody.getRequestId();
         long requestIdStop = requestIdStart + responseBody.getBatchOffset();
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload());
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
+                " " + responseBody + "; " + responseBody.getMethodPayload());
+        }
         for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
         {
             AMQMethodListener methodListener = requestSentMap.get(requestId);

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=498797&r1=498796&r2=498797
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Mon Jan 22 12:58:01 2007
@@ -23,6 +23,8 @@
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -30,6 +32,8 @@
 
 public class ResponseManager
 {
+    private static final Logger logger = Logger.getLogger(ResponseManager.class);
+
     private int channel;
     private AMQMethodListener methodListener;
     private AMQProtocolWriter protocolWriter;
@@ -113,12 +117,15 @@
     public void requestReceived(AMQRequestBody requestBody) throws Exception
     {
         long requestId = requestBody.getRequestId();
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload());
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
+                " " + requestBody + "; " + requestBody.getMethodPayload());
+        }
         // TODO: responseMark is used in HA, but until then, ignore...
         long responseMark = requestBody.getResponseMark();
         lastReceivedRequestId = requestId;
         responseMap.put(requestId, new ResponseStatus(requestId));
-        // TODO: Update MethodEvent to use the RequestBody instead of MethodBody
         AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload(), requestId);
         methodListener.methodReceived(methodEvent);
     }
@@ -126,7 +133,11 @@
     public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
         throws RequestResponseMappingException
     {
-        // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody);
+        if (logger.isDebugEnabled())
+        {
+            logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
+                " Res[# " + requestId + "]; " + responseMethodBody);
+        }
         ResponseStatus responseStatus = responseMap.get(requestId);
         if (responseStatus == null)
             throw new RequestResponseMappingException(requestId,