You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/01/25 19:24:48 UTC

svn commit: r499880 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/ common/src/main/java/org/apache/qpid/ common/src/main/java/org/apache/qpid/framing/

Author: gsim
Date: Thu Jan 25 10:24:48 2007
New Revision: 499880

URL: http://svn.apache.org/viewvc?view=rev&rev=499880
Log:
Improved channel/connection exception handling.


Added:
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Thu Jan 25 10:24:48 2007
@@ -25,6 +25,7 @@
 import org.apache.mina.common.IoSession;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQProtocolVersionException;
@@ -246,7 +247,7 @@
                     throw new AMQException("Incoming request frame on connection which is pending close.");
                 AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
                 if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody))
-                    throw new AMQException("Incoming frame on unopened channel is not a Connection.Open method.");         
+                    throw new AMQException("Incoming frame on closing connection is not a Connection.CloseOk method.");
             }
             else if (channel == null)
             {
@@ -259,8 +260,13 @@
                 if(!(frame.bodyFrame instanceof AMQRequestBody))
                     throw new AMQException("Incoming frame on unopened channel is not a request.");
                 AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
-                if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody))
-                    throw new AMQException("Incoming frame on unopened channel is not a Channel.Open method.");
+                if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody)) {
+                    closeSessionRequest(
+                        requestBody.getMethodPayload().getConnectionException(
+                            504, "Incoming frame on unopened channel is not a Connection.Open method."
+                        )
+                    ); 
+                }
                 if (requestBody.getRequestId() != 1)
                     throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1.");
                 channel = createChannel(frame.channel);
@@ -283,13 +289,29 @@
     
     private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws Exception
     {
-        if (_logger.isDebugEnabled())
+        try{
+            if (_logger.isDebugEnabled())
+            {
+                    _logger.debug("Request frame received: " + requestBody);
+            }
+            AMQChannel channel = getChannel(channelNum);
+            ResponseManager responseManager = channel.getResponseManager();
+            responseManager.requestReceived(requestBody);
+        }
+        catch (AMQChannelException e)
+        {
+            _logger.error("Closing channel due to: " + e.getMessage());
+            writeRequest(channelNum, e.getCloseMethodBody());
+            AMQChannel channel = _channelMap.remove(channelNum);
+            if (channel != null) {
+                channel.close(this);
+            }
+        }
+        catch (AMQConnectionException e)
         {
-            _logger.debug("Request frame received: " + requestBody);
+            _logger.error("Closing connection due to: " + e.getMessage());
+            closeSessionRequest(e);
         }
-        AMQChannel channel = getChannel(channelNum);
-        ResponseManager responseManager = channel.getResponseManager();
-        responseManager.requestReceived(requestBody);
     }
     
     private void responseFrameReceived(int channelNum, AMQResponseBody responseBody) throws Exception
@@ -490,6 +512,13 @@
     {
         closeSessionRequest(replyCode, replyText, 0, 0);
     }
+
+
+    public void closeSessionRequest(AMQConnectionException e) throws AMQException
+    {
+        closeSessionRequest(e.getErrorCode(), e.getMessage(), e.getClassId(), e.getMethodId());
+    }
+
     
     // Used to close a connection as a response to a client close request
     public void closeSessionResponse(long requestId) throws AMQException

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jan 25 10:24:48 2007
@@ -52,6 +52,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.AMQUnresolvedAddressException;
@@ -288,7 +289,7 @@
                 message = "Unable to Connect";
             }
 
-            AMQException e = new AMQConnectionException(message);
+            AMQException e = new AMQConnectionFailureException(message);
 
             if (lastException != null)
             {

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java Thu Jan 25 10:24:48 2007
@@ -21,10 +21,46 @@
 
 package org.apache.qpid;
 
+import org.apache.qpid.framing.ConnectionCloseBody;
+
 public class AMQConnectionException extends AMQException
 {
-    public AMQConnectionException(String message)
+    private final int _classId;
+    private final int _methodId;
+    /* AMQP version for which exception ocurred */
+    private final byte major;
+    private final byte minor;
+
+    public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
     {
-        super(message);
+        super(errorCode, msg, t);
+        _classId = classId;
+        _methodId = methodId;
+        this.major = major;
+        this.minor = minor;
     }
+
+    public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor)
+    {
+        super(errorCode, msg);
+        _classId = classId;
+        _methodId = methodId;
+        this.major = major;
+        this.minor = minor;
+    }
+
+    public ConnectionCloseBody getCloseMethodBody()
+    {
+        return ConnectionCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage());
+    }
+
+    public int getClassId()
+    {
+        return _classId;
+    }
+
+    public int getMethodId(){
+        return _methodId;
+    }
+
 }

Added: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java?view=auto&rev=499880
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java Thu Jan 25 10:24:48 2007
@@ -0,0 +1,9 @@
+package org.apache.qpid;
+
+public class AMQConnectionFailureException extends AMQException
+{
+    public AMQConnectionFailureException(String message)
+    {
+        super(message);
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=499880&r1=499879&r2=499880
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Thu Jan 25 10:24:48 2007
@@ -22,6 +22,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
 
 public abstract class AMQMethodBody extends AMQBody
 {
@@ -102,5 +103,17 @@
     public AMQChannelException getChannelException(int code, String message, Throwable cause)
     {
         return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
+    }
+
+    public AMQConnectionException getConnectionException(int code, String message)
+    {
+        return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
+    }
+
+
+
+    public AMQConnectionException getConnectionException(int code, String message, Throwable cause)
+    {
+        return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
     }
 }