You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/22 15:53:44 UTC
svn commit: r498631 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/protocol/
client/src/main/java/org/apache/qpid/client/protocol/
common/src/main/java/org/apache/qp...
Author: kpvdr
Date: Mon Jan 22 06:53:43 2007
New Revision: 498631
URL: http://svn.apache.org/viewvc?view=rev&rev=498631
Log:
Improvements to debugging messages from Request/ResponseManager. Added timed wait for Channel.CloseOk massage in broker's closeChannelRequest method. Added checks for illegal frames that would open a closed channel
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jan 22 06:53:43 2007
@@ -137,8 +137,8 @@
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _requestManager = new RequestManager(channelId, protocolWriter);
- _responseManager = new ResponseManager(channelId, methodListener, protocolWriter);
+ _requestManager = new RequestManager(channelId, protocolWriter, true);
+ _responseManager = new ResponseManager(channelId, methodListener, protocolWriter, true);
_txnBuffer = new TxnBuffer(_messageStore);
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 22 06:53:43 2007
@@ -27,8 +27,10 @@
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.AMQFrame;
@@ -224,7 +226,21 @@
AMQFrame frame = (AMQFrame) message;
AMQChannel channel = getChannel(frame.channel);
- if (channel == null) {
+ if (channel == null)
+ {
+ // Perform a check on incoming frames that may result in a new channel
+ // being opened. The frame MUST be:
+ // a. A new request;
+ // b. Have a request id of 1 (i.e. the first request on a new channel);
+ // c. Must be a ConnectionOpenBody method.
+ // Throw an exception for all other incoming frames on an unopened channel
+ if(!(frame.bodyFrame instanceof AMQRequestBody))
+ throw new AMQException("Incoming frame on unopened channel not a request");
+ AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+ if (requestBody.getMethodPayload() instanceof ConnectionOpenBody)
+ throw new AMQException("Incoming frame on unopened channel not a Connection.Open method");
+ if (requestBody.getRequestId() != 1)
+ throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1");
channel = createChannel(frame.channel);
}
@@ -268,6 +284,8 @@
public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener)
throws AMQException
{
+ if (!checkMethodBodyVersion(methodBody))
+ throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
return requestManager.sendRequest(methodBody, methodListener);
@@ -277,14 +295,14 @@
public long writeRequest(int channelNum, AMQMethodBody methodBody)
throws AMQException
{
- AMQChannel channel = getChannel(channelNum);
- RequestManager requestManager = channel.getRequestManager();
- return requestManager.sendRequest(methodBody, _stateManager);
+ return writeRequest(channelNum, methodBody, _stateManager);
}
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
throws AMQException
{
+ if (!checkMethodBodyVersion(methodBody))
+ throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
responseManager.sendResponse(requestId, methodBody);
@@ -380,6 +398,7 @@
* </ul>
*
* @param channelId id of the channel to close
+ * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
@@ -396,6 +415,7 @@
try
{
channel.close(this);
+ // Send the Channel.CloseOk response
// Be aware of possible changes to parameter order as versions change.
writeResponse(channelId, requestId, ChannelCloseOkBody.createMethodBody(_major, _minor));
}
@@ -425,6 +445,13 @@
replyCode, // replyCode
replyText); // replyText
writeRequest(channelId, cf);
+ // Wait a bit for the Channel.CloseOk to come in from the client, but don't
+ // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
+ // method handler has not already done so.
+ // TODO - Find a better way of doing this without holding up this thread...
+ try { Thread.currentThread().sleep(2000); } // 2 seconds
+ catch (InterruptedException e) {}
+ _channelMap.remove(channelId); // Returns null if already removed
}
}
@@ -577,8 +604,13 @@
return _minor;
}
- public boolean amqpVersionEquals(byte major, byte minor)
+ public boolean versionEquals(byte major, byte minor)
{
return _major == major && _minor == minor;
+ }
+
+ public boolean checkMethodBodyVersion(AMQMethodBody methodBody)
+ {
+ return versionEquals(methodBody.getMajor(), methodBody.getMinor());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 22 06:53:43 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -136,5 +137,6 @@
AMQStateManager getStateManager();
byte getMajor();
byte getMinor();
- boolean amqpVersionEquals(byte major, byte minor);
+ boolean versionEquals(byte major, byte minor);
+ boolean checkMethodBodyVersion(AMQMethodBody methodBody);
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 22 06:53:43 2007
@@ -117,8 +117,8 @@
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+ _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -130,8 +130,8 @@
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+ _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
@@ -145,8 +145,8 @@
_stateManager.setProtocolSession(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+ _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
}
public void init()
@@ -377,12 +377,12 @@
// Add request and response handlers, one per channel, if they do not already exist
if (_channelId2RequestMgrMap.get(channelId) == null)
{
- _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this));
+ _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false));
}
if (_channelId2ResponseMgrMap.get(channelId) == null)
{
- _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this));
+ _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false));
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Mon Jan 22 06:53:43 2007
@@ -29,7 +29,13 @@
public class RequestManager
{
private int channel;
- AMQProtocolWriter protocolWriter;
+ private AMQProtocolWriter protocolWriter;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
/**
* Request and response frames must have a requestID and responseID which
@@ -45,10 +51,11 @@
private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
- public RequestManager(int channel, AMQProtocolWriter protocolWriter)
+ public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.protocolWriter = protocolWriter;
+ this.serverFlag = serverFlag;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>();
@@ -64,7 +71,7 @@
lastProcessedResponseId, requestMethodBody);
requestSentMap.put(requestId, methodListener);
protocolWriter.writeFrame(requestFrame);
- // System.out.println("[" + channel + "] SEND REQUEST: requestId = " + requestId + " {" + this.toString().substring(this.toString().lastIndexOf("@")) + "} " + requestMethodBody);
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
return requestId;
}
@@ -73,7 +80,7 @@
{
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
- // System.out.println("[" + channel + "] RECEIVE RESPONSE: " + responseBody + "; " + responseBody.getMethodPayload());
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload());
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
AMQMethodListener methodListener = requestSentMap.get(requestId);
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=498631&r1=498630&r2=498631
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Mon Jan 22 06:53:43 2007
@@ -31,8 +31,14 @@
public class ResponseManager
{
private int channel;
- AMQMethodListener methodListener;
- AMQProtocolWriter protocolWriter;
+ private AMQMethodListener methodListener;
+ private AMQProtocolWriter protocolWriter;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
/**
* Determines the batch behaviour of the manager.
@@ -91,11 +97,12 @@
private ConcurrentHashMap<Long, ResponseStatus> responseMap;
public ResponseManager(int channel, AMQMethodListener methodListener,
- AMQProtocolWriter protocolWriter)
+ AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.methodListener = methodListener;
this.protocolWriter = protocolWriter;
+ this.serverFlag = serverFlag;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
@@ -106,7 +113,7 @@
public void requestReceived(AMQRequestBody requestBody) throws Exception
{
long requestId = requestBody.getRequestId();
- // System.out.println("[" + channel + "] RECEIVE REQUEST: " + requestBody + "; " + requestBody.getMethodPayload());
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload());
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
@@ -119,7 +126,7 @@
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
throws RequestResponseMappingException
{
- // System.out.println("[" + channel + "] SEND RESPONSE: requestId = " + requestId + "; " + responseMethodBody);
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody);
ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
throw new RequestResponseMappingException(requestId,