You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/22 15:53:44 UTC

svn commit: r498631 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/org/apache/qpid/client/protocol/ common/src/main/java/org/apache/qp...

Author: kpvdr
Date: Mon Jan 22 06:53:43 2007
New Revision: 498631

URL: http://svn.apache.org/viewvc?view=rev&rev=498631
Log:
Improvements to debugging messages from Request/ResponseManager. Added timed wait for Channel.CloseOk massage in broker's closeChannelRequest method. Added checks for illegal frames that would open a closed channel

Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jan 22 06:53:43 2007
@@ -137,8 +137,8 @@
         _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
         _messageStore = messageStore;
         _exchanges = exchanges;
-        _requestManager = new RequestManager(channelId, protocolWriter);
-    	_responseManager = new ResponseManager(channelId, methodListener, protocolWriter);
+        _requestManager = new RequestManager(channelId, protocolWriter, true);
+    	_responseManager = new ResponseManager(channelId, methodListener, protocolWriter, true);
         _txnBuffer = new TxnBuffer(_messageStore);
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 22 06:53:43 2007
@@ -27,8 +27,10 @@
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.framing.AMQFrame;
@@ -224,7 +226,21 @@
             AMQFrame frame = (AMQFrame) message;
 
             AMQChannel channel = getChannel(frame.channel);
-            if (channel == null) {
+            if (channel == null)
+            {
+                // Perform a check on incoming frames that may result in a new channel
+                // being opened. The frame MUST be:
+                // a. A new request;
+                // b. Have a request id of 1 (i.e. the first request on a new channel);
+                // c. Must be a ConnectionOpenBody method.
+                // Throw an exception for all other incoming frames on an unopened channel
+                if(!(frame.bodyFrame instanceof AMQRequestBody))
+                    throw new AMQException("Incoming frame on unopened channel not a request");
+                AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+                if (requestBody.getMethodPayload() instanceof ConnectionOpenBody)
+                    throw new AMQException("Incoming frame on unopened channel not a Connection.Open method");
+                if (requestBody.getRequestId() != 1)
+                    throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1");
                 channel = createChannel(frame.channel);
             }
 
@@ -268,6 +284,8 @@
     public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener)
         throws AMQException
     {
+        if (!checkMethodBodyVersion(methodBody))
+            throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
         AMQChannel channel = getChannel(channelNum);
         RequestManager requestManager = channel.getRequestManager();
         return requestManager.sendRequest(methodBody, methodListener);
@@ -277,14 +295,14 @@
     public long writeRequest(int channelNum, AMQMethodBody methodBody)
         throws AMQException
     {
-        AMQChannel channel = getChannel(channelNum);
-        RequestManager requestManager = channel.getRequestManager();
-        return requestManager.sendRequest(methodBody, _stateManager);
+        return writeRequest(channelNum, methodBody, _stateManager);
     }
 
     public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
         throws AMQException
     {
+        if (!checkMethodBodyVersion(methodBody))
+            throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
         AMQChannel channel = getChannel(channelNum);
         ResponseManager responseManager = channel.getResponseManager();
         responseManager.sendResponse(requestId, methodBody);
@@ -380,6 +398,7 @@
      * </ul>
      *
      * @param channelId id of the channel to close
+     * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response
      * @throws AMQException if an error occurs closing the channel
      * @throws IllegalArgumentException if the channel id is not valid
      */
@@ -396,6 +415,7 @@
             try
             {
                 channel.close(this);
+                // Send the Channel.CloseOk response
                 // Be aware of possible changes to parameter order as versions change.
                 writeResponse(channelId, requestId, ChannelCloseOkBody.createMethodBody(_major, _minor));
             }
@@ -425,6 +445,13 @@
                 replyCode,	// replyCode
                 replyText);	// replyText
             writeRequest(channelId, cf);
+            // Wait a bit for the Channel.CloseOk to come in from the client, but don't
+            // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
+            // method handler has not already done so.
+            // TODO - Find a better way of doing this without holding up this thread...
+            try { Thread.currentThread().sleep(2000); } // 2 seconds
+            catch (InterruptedException e) {}
+            _channelMap.remove(channelId); // Returns null if already removed
         }
     }
 
@@ -577,8 +604,13 @@
         return _minor;
     }
 
-    public boolean amqpVersionEquals(byte major, byte minor)
+    public boolean versionEquals(byte major, byte minor)
     {
         return _major == major && _minor == minor;
+    }
+    
+    public boolean checkMethodBodyVersion(AMQMethodBody methodBody)
+    {
+        return versionEquals(methodBody.getMajor(), methodBody.getMinor());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 22 06:53:43 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
@@ -136,5 +137,6 @@
     AMQStateManager getStateManager();
     byte getMajor();
     byte getMinor();
-    boolean amqpVersionEquals(byte major, byte minor);
+    boolean versionEquals(byte major, byte minor);
+    boolean checkMethodBodyVersion(AMQMethodBody methodBody);
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 22 06:53:43 2007
@@ -117,8 +117,8 @@
         _stateManager = new AMQStateManager(this);
 
         // Add channel 0 request and response managers, since they will not be added through the usual mechanism
-        _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
-        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+        _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
     }
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -130,8 +130,8 @@
         _stateManager = new AMQStateManager(this);
 
         // Add channel 0 request and response managers, since they will not be added through the usual mechanism
-        _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
-        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+        _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
     }
  
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
@@ -145,8 +145,8 @@
         _stateManager.setProtocolSession(this);
                 
         // Add channel 0 request and response managers, since they will not be added through the usual mechanism
-        _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
-        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+        _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+        _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
     }
 
     public void init()
@@ -377,12 +377,12 @@
         // Add request and response handlers, one per channel, if they do not already exist
         if (_channelId2RequestMgrMap.get(channelId) == null)
         {
-            _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this));
+            _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false));
         }
         if (_channelId2ResponseMgrMap.get(channelId) == null)
         {
             
-            _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this));
+            _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false));
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Mon Jan 22 06:53:43 2007
@@ -29,7 +29,13 @@
 public class RequestManager
 {
     private int channel;
-    AMQProtocolWriter protocolWriter;
+    private AMQProtocolWriter protocolWriter;
+    
+    /**
+     * Used for logging and debugging only - allows the context of this instance
+     * to be known.
+     */
+    private boolean serverFlag;
 
     /**
      * Request and response frames must have a requestID and responseID which
@@ -45,10 +51,11 @@
 
     private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
 
-    public RequestManager(int channel, AMQProtocolWriter protocolWriter)
+    public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
     {
         this.channel = channel;
         this.protocolWriter = protocolWriter;
+        this.serverFlag = serverFlag;
         requestIdCount = 1L;
         lastProcessedResponseId = 0L;
         requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>();
@@ -64,7 +71,7 @@
             lastProcessedResponseId, requestMethodBody);
         requestSentMap.put(requestId, methodListener);
         protocolWriter.writeFrame(requestFrame);
-        // System.out.println("[" + channel + "] SEND REQUEST: requestId = " + requestId + " {" + this.toString().substring(this.toString().lastIndexOf("@")) + "} " + requestMethodBody);
+        // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
         return requestId;
     }
 
@@ -73,7 +80,7 @@
     {
         long requestIdStart = responseBody.getRequestId();
         long requestIdStop = requestIdStart + responseBody.getBatchOffset();
-        // System.out.println("[" + channel + "] RECEIVE RESPONSE: " + responseBody + "; " + responseBody.getMethodPayload());
+        // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload());
         for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
         {
             AMQMethodListener methodListener = requestSentMap.get(requestId);

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Mon Jan 22 06:53:43 2007
@@ -31,8 +31,14 @@
 public class ResponseManager
 {
     private int channel;
-    AMQMethodListener methodListener;
-    AMQProtocolWriter protocolWriter;
+    private AMQMethodListener methodListener;
+    private AMQProtocolWriter protocolWriter;
+    
+    /**
+     * Used for logging and debugging only - allows the context of this instance
+     * to be known.
+     */
+    private boolean serverFlag;
 
     /**
      * Determines the batch behaviour of the manager.
@@ -91,11 +97,12 @@
     private ConcurrentHashMap<Long, ResponseStatus> responseMap;
 
     public ResponseManager(int channel, AMQMethodListener methodListener,
-        AMQProtocolWriter protocolWriter)
+        AMQProtocolWriter protocolWriter, boolean serverFlag)
     {
         this.channel = channel;
         this.methodListener = methodListener;
         this.protocolWriter = protocolWriter;
+        this.serverFlag = serverFlag;
         responseIdCount = 1L;
         lastReceivedRequestId = 0L;
         responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
@@ -106,7 +113,7 @@
     public void requestReceived(AMQRequestBody requestBody) throws Exception
     {
         long requestId = requestBody.getRequestId();
-        // System.out.println("[" + channel + "] RECEIVE REQUEST: " + requestBody + "; " + requestBody.getMethodPayload());
+        // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload());
         // TODO: responseMark is used in HA, but until then, ignore...
         long responseMark = requestBody.getResponseMark();
         lastReceivedRequestId = requestId;
@@ -119,7 +126,7 @@
     public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
         throws RequestResponseMappingException
     {
-        // System.out.println("[" + channel + "] SEND RESPONSE: requestId = " + requestId + "; " + responseMethodBody);
+        // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody);
         ResponseStatus responseStatus = responseMap.get(requestId);
         if (responseStatus == null)
             throw new RequestResponseMappingException(requestId,