You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/15 17:26:01 UTC
svn commit: r496389 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/protocol/
common/src/main/java/org/apache/qpid/framing/
Author: kpvdr
Date: Mon Jan 15 08:26:00 2007
New Revision: 496389
URL: http://svn.apache.org/viewvc?view=rev&rev=496389
Log:
Request and Respone managers now use the new common AMQMethodListener class
Removed:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496389&r1=496388&r2=496389
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 15 08:26:00 2007
@@ -36,9 +36,7 @@
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
-//import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.HeartbeatBody;
-//import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -221,24 +219,6 @@
{
_logger.error("Received invalid frame: " + frame.toString());
}
-// else if (frame.bodyFrame instanceof AMQMethodBody)
-// {
-// methodFrameReceived(frame);
-// }
-// else
-// {
-// try
-// {
-// contentFrameReceived(frame);
-// }
-// catch (RequiredDeliveryException e)
-// {
-// //need to return the message:
-// _logger.info("Returning message to " + this + " channel " + frame.channel
-// + ": " + e.getMessage());
-// writeFrame(e.getReturnMessage(frame.channel));
-// }
-// }
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java?view=diff&rev=496389&r1=496388&r2=496389
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java Mon Jan 15 08:26:00 2007
@@ -28,21 +28,21 @@
{
private int channel;
AMQProtocolWriter protocolSession;
-
+
/**
* Request and response frames must have a requestID and responseID which
* indepenedently increment from 0 on a per-channel basis. These are the
* counters, and contain the value of the next (not yet used) frame.
*/
private long requestIdCount;
-
+
/**
* These keep track of the last requestId and responseId to be received.
*/
private long lastProcessedResponseId;
-
+
private Hashtable<Long, AMQResponseCallback> requestSentMap;
-
+
public RequestManager(int channel, AMQProtocolWriter protocolSession)
{
this.channel = channel;
@@ -51,9 +51,9 @@
lastProcessedResponseId = 0L;
requestSentMap = new Hashtable<Long, AMQResponseCallback>();
}
-
+
// *** Functions to originate a request ***
-
+
public long sendRequest(AMQMethodBody requestMethodBody,
AMQResponseCallback responseCallback)
{
@@ -64,7 +64,7 @@
requestSentMap.put(requestId, responseCallback);
return requestId;
}
-
+
public void responseReceived(AMQResponseBody responseBody)
throws RequestResponseMappingException
{
@@ -81,16 +81,16 @@
}
lastProcessedResponseId = responseBody.getResponseId();
}
-
+
// *** Management functions ***
-
+
public int requestsMapSize()
{
return requestSentMap.size();
}
-
+
// *** Private helper functions ***
-
+
private long getNextRequestId()
{
return requestIdCount++;
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=496389&r1=496388&r2=496389
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Mon Jan 15 08:26:00 2007
@@ -24,12 +24,14 @@
import java.util.Hashtable;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
public class ResponseManager
{
private int channel;
- RequestHandler requestHandler;
+ AMQMethodListener methodListener;
AMQProtocolWriter protocolSession;
/**
@@ -51,66 +53,68 @@
*/
public enum batchResponseModeEnum { NONE }
private batchResponseModeEnum batchResponseMode;
-
+
/**
* Request and response frames must have a requestID and responseID which
* indepenedently increment from 0 on a per-channel basis. These are the
* counters, and contain the value of the next (not yet used) frame.
*/
private long responseIdCount;
-
+
/**
* These keep track of the last requestId and responseId to be received.
*/
private long lastReceivedRequestId;
-
+
/**
* Last requestID sent in a response (for batching)
*/
private long lastSentRequestId;
-
+
private class ResponseStatus implements Comparable<ResponseStatus>
{
- public long requestId;
- public AMQMethodBody responseMethodBody;
-
+ private long requestId;
+ private AMQMethodBody responseMethodBody;
+
public ResponseStatus(long requestId)
{
- this.requestId = requestId;
+ this.requestId = requestId;
responseMethodBody = null;
}
-
+
public int compareTo(ResponseStatus o)
{
return (int)(requestId - o.requestId);
}
}
-
+
private Hashtable<Long, ResponseStatus> responseMap;
-
- public ResponseManager(int channel, RequestHandler requestHandler,
+
+ public ResponseManager(int channel, AMQMethodListener methodListener,
AMQProtocolWriter protocolSession)
{
this.channel = channel;
- this.requestHandler = requestHandler;
+ this.methodListener = methodListener;
this.protocolSession = protocolSession;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
responseMap = new Hashtable<Long, ResponseStatus>();
}
-
+
// *** Functions to handle an incoming request ***
-
- public void requestReceived(AMQRequestBody requestBody)
+
+ public void requestReceived(AMQRequestBody requestBody) throws Exception
{
long requestId = requestBody.getRequestId();
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
- requestHandler.requestReceived(requestBody);
+ // TODO: Update MethodEvent to use the RequestBody instead of MethodBody
+ AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload());
+ methodListener.methodReceived(methodEvent);
}
-
+
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
throws RequestResponseMappingException
{
@@ -124,14 +128,14 @@
responseStatus.responseMethodBody = responseMethodBody;
doBatches();
}
-
+
// *** Management functions ***
public batchResponseModeEnum getBatchResponseMode()
{
return batchResponseMode;
}
-
+
public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
{
if (this.batchResponseMode != batchResponseMode)
@@ -140,12 +144,12 @@
doBatches();
}
}
-
+
public int responsesMapSize()
{
return responseMap.size();
}
-
+
/**
* As the responseMap may contain both outstanding responses (those with
* ResponseStatus.responseMethodBody still null) and responses waiting to
@@ -162,7 +166,7 @@
}
return cnt;
}
-
+
/**
* As the responseMap may contain both outstanding responses (those with
* ResponseStatus.responseMethodBody still null) and responses waiting to
@@ -179,14 +183,14 @@
}
return cnt;
}
-
+
// *** Private helper functions ***
-
+
private long getNextResponseId()
{
return responseIdCount++;
}
-
+
private void doBatches()
{
switch (batchResponseMode)
@@ -204,13 +208,13 @@
}
}
break;
-
+
// TODO: Add additional batch mode handlers here...
// case DELAY_FIXED:
// case MANUAL:
}
}
-
+
private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
AMQMethodBody responseMethodBody)
{