You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/19 15:05:49 UTC
svn commit: r497820 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/handler/
common/src/main/java/org/apache/qpid/framing/
Author: kpvdr
Date: Fri Jan 19 06:05:48 2007
New Revision: 497820
URL: http://svn.apache.org/viewvc?view=rev&rev=497820
Log:
Solved multithreading issue in RequestManager where responses would arrive before the request made it into the requestMap. Fixed server's ChannelCloseHandler and ConnectionCloseMethodHandler to work with new write methods.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Fri Jan 19 06:05:48 2007
@@ -54,10 +54,10 @@
ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
- protocolSession.closeChannel(evt.getChannelId());
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeResponse(evt, ChannelCloseOkBody.createMethodBody((byte)0, (byte)9));
+ protocolSession.closeChannel(evt.getChannelId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Fri Jan 19 06:05:48 2007
@@ -54,6 +54,10 @@
final ConnectionCloseBody body = evt.getMethod();
_logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
body.replyText + " for " + protocolSession);
+ // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
try
{
protocolSession.closeSession();
@@ -62,9 +66,5 @@
{
_logger.error("Error closing protocol session: " + e, e);
}
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody((byte)0, (byte)9));
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Jan 19 06:05:48 2007
@@ -82,9 +82,11 @@
public String toString()
{
- StringBuffer buf = new StringBuffer(getClass().toString());
- buf.append(" Class: ").append(getClazz());
- buf.append(" Method: ").append(getMethod());
+ String className = getClass().getName();
+ StringBuffer buf = new StringBuffer(className.substring(className.lastIndexOf(".") + 1));
+ buf.append(" [C").append(getClazz());
+ buf.append(" M").append(getMethod());
+ buf.append("]");
return buf.toString();
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Fri Jan 19 06:05:48 2007
@@ -62,8 +62,9 @@
long requestId = getNextRequestId(); // Get new request ID
AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
lastProcessedResponseId, requestMethodBody);
- protocolWriter.writeFrame(requestFrame);
requestSentMap.put(requestId, methodListener);
+ protocolWriter.writeFrame(requestFrame);
+ // System.out.println("[" + channel + "] SEND REQUEST: requestId = " + requestId + " {" + this.toString().substring(this.toString().lastIndexOf("@")) + "} " + requestMethodBody);
return requestId;
}
@@ -72,6 +73,7 @@
{
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
+ // System.out.println("[" + channel + "] RECEIVE RESPONSE: " + responseBody + "; " + responseBody.getMethodPayload());
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
AMQMethodListener methodListener = requestSentMap.get(requestId);
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=497820&r1=497819&r2=497820
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Fri Jan 19 06:05:48 2007
@@ -106,6 +106,7 @@
public void requestReceived(AMQRequestBody requestBody) throws Exception
{
long requestId = requestBody.getRequestId();
+ // System.out.println("[" + channel + "] RECEIVE REQUEST: " + requestBody + "; " + requestBody.getMethodPayload());
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
@@ -118,6 +119,7 @@
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
throws RequestResponseMappingException
{
+ // System.out.println("[" + channel + "] SEND RESPONSE: requestId = " + requestId + "; " + responseMethodBody);
ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
throw new RequestResponseMappingException(requestId,