You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/19 15:05:49 UTC

svn commit: r497820 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/handler/ common/src/main/java/org/apache/qpid/framing/

Author: kpvdr
Date: Fri Jan 19 06:05:48 2007
New Revision: 497820

URL: http://svn.apache.org/viewvc?view=rev&rev=497820
Log:
Solved multithreading issue in RequestManager where responses would arrive before the request made it into the requestMap. Fixed server's ChannelCloseHandler and ConnectionCloseMethodHandler to work with new write methods.

Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Fri Jan 19 06:05:48 2007
@@ -54,10 +54,10 @@
         ChannelCloseBody body = evt.getMethod();
         _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
                      " and method " + body.methodId);
-        protocolSession.closeChannel(evt.getChannelId());
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         protocolSession.writeResponse(evt, ChannelCloseOkBody.createMethodBody((byte)0, (byte)9));
+        protocolSession.closeChannel(evt.getChannelId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Jan 19 06:05:48 2007
@@ -54,6 +54,10 @@
         final ConnectionCloseBody body = evt.getMethod();
         _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
                      body.replyText +  " for " + protocolSession);
+        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
         try
         {
             protocolSession.closeSession();
@@ -62,9 +66,5 @@
         {
             _logger.error("Error closing protocol session: " + e, e);
         }
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Jan 19 06:05:48 2007
@@ -82,9 +82,11 @@
 
     public String toString()
     {
-        StringBuffer buf = new StringBuffer(getClass().toString());
-        buf.append(" Class: ").append(getClazz());
-        buf.append(" Method: ").append(getMethod());
+        String className = getClass().getName();
+        StringBuffer buf = new StringBuffer(className.substring(className.lastIndexOf(".") + 1));
+        buf.append(" [C").append(getClazz());
+        buf.append(" M").append(getMethod());
+        buf.append("]");
         return buf.toString();
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Fri Jan 19 06:05:48 2007
@@ -62,8 +62,9 @@
         long requestId = getNextRequestId(); // Get new request ID
         AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
             lastProcessedResponseId, requestMethodBody);
-        protocolWriter.writeFrame(requestFrame);
         requestSentMap.put(requestId, methodListener);
+        protocolWriter.writeFrame(requestFrame);
+        // System.out.println("[" + channel + "] SEND REQUEST: requestId = " + requestId + " {" + this.toString().substring(this.toString().lastIndexOf("@")) + "} " + requestMethodBody);
         return requestId;
     }
 
@@ -72,6 +73,7 @@
     {
         long requestIdStart = responseBody.getRequestId();
         long requestIdStop = requestIdStart + responseBody.getBatchOffset();
+        // System.out.println("[" + channel + "] RECEIVE RESPONSE: " + responseBody + "; " + responseBody.getMethodPayload());
         for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
         {
             AMQMethodListener methodListener = requestSentMap.get(requestId);

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Fri Jan 19 06:05:48 2007
@@ -106,6 +106,7 @@
     public void requestReceived(AMQRequestBody requestBody) throws Exception
     {
         long requestId = requestBody.getRequestId();
+        // System.out.println("[" + channel + "] RECEIVE REQUEST: " + requestBody + "; " + requestBody.getMethodPayload());
         // TODO: responseMark is used in HA, but until then, ignore...
         long responseMark = requestBody.getResponseMark();
         lastReceivedRequestId = requestId;
@@ -118,6 +119,7 @@
     public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
         throws RequestResponseMappingException
     {
+        // System.out.println("[" + channel + "] SEND RESPONSE: requestId = " + requestId + "; " + responseMethodBody);
         ResponseStatus responseStatus = responseMap.get(requestId);
         if (responseStatus == null)
             throw new RequestResponseMappingException(requestId,