You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/24 19:42:38 UTC
svn commit: r499525 - in /incubator/qpid/branches/qpid.0-9/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/handler/
client/src/main/java/org/apache/qpid/client/message/
common/src/main/java/org/apache/qpid...
Author: kpvdr
Date: Wed Jan 24 10:42:37 2007
New Revision: 499525
URL: http://svn.apache.org/viewvc?view=rev&rev=499525
Log:
Fixed problem with missing type field in Message.transport. Used JMSHeaders instead.
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/CustomJMXProperty.java Wed Jan 24 10:42:37 2007
@@ -26,7 +26,8 @@
{
JMSX_QPID_JMSDESTINATIONURL,
JMSXGroupID,
- JMSXGroupSeq;
+ JMSXGroupSeq,
+ JMSXType;
private static Enumeration _names;
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Jan 24 10:42:37 2007
@@ -75,7 +75,7 @@
msg.contentHeader = messageHeaders;
- if(transferBody.getBody().contentType == Content.TypeEnum.INLINE_T)
+ if(transferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
{
msg.addContent(transferBody.getBody().getContentAsByteArray());
protocolSession.deliverMessageToAMQSession(evt.getChannelId(), msg);
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Jan 24 10:42:37 2007
@@ -34,6 +34,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.CustomJMXProperty;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -205,12 +206,18 @@
public String getJMSType() throws JMSException
{
- return getMessageHeaders().getType();
+ // Since the type field is not a part of message.transport and is used only for
+ // JMS messages, this change to JMS Headers solves the problem.
+ // return getMessageHeaders().getType();
+ return getStringProperty(CustomJMXProperty.JMSXType.toString());
}
public void setJMSType(String string) throws JMSException
{
- getMessageHeaders().setType(string);
+ // Since the type field is not a part of message.transport and is used only for
+ // JMS messages, this change to JMS Headers solves the problem.
+ // getMessageHeaders().setType(string);
+ setStringProperty(CustomJMXProperty.JMSXType.toString(), string);
}
public long getJMSExpiration() throws JMSException
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java Wed Jan 24 10:42:37 2007
@@ -42,8 +42,8 @@
}
}
- public TypeEnum contentType;
- public ByteBuffer content;
+ private TypeEnum contentType;
+ private ByteBuffer content;
// Constructors
@@ -126,7 +126,7 @@
{
EncodingUtils.writeUnsignedByte(buffer, contentType.toByte());
EncodingUtils.writeUnsignedInteger(buffer, content.remaining());
- buffer.put(content);
+ buffer.put(content.duplicate());
}
public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java?view=diff&rev=499525&r1=499524&r2=499525
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java Wed Jan 24 10:42:37 2007
@@ -44,25 +44,8 @@
*/
private boolean serverFlag;
- /**
- * Determines the batch behaviour of the manager.
- *
- * Responses are sent to the RequestResponseManager through sendResponse().
- * These may be internally stored/accumulated for batching purposes, depending
- * on the batching strategy/mode of the RequestResponseManager.
- *
- * The following modes are possibe:
- *
- * NONE: Each request results in an immediate single response, no batching
- * takes place.
- * DELAY_FIXED: Waits until a fixed period has passed to batch
- * accumulated responses. An optional fixed threshold may be set, which
- * if reached or exceeded within the delay period will trigger the batch. (TODO)
- * MANUAL: No response is sent until it is explicitly released by calling
- * function xxxx(). (TODO)
- */
- public enum batchResponseModeEnum { NONE }
- private batchResponseModeEnum batchResponseMode = batchResponseModeEnum.NONE;
+ private int maxAccumulatedResponses = 20; // Default
+// private Class currentResponseMethodBodyClass;
/**
* Request and response frames must have a requestID and responseID which
@@ -109,6 +92,7 @@
this.serverFlag = serverFlag;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
+// currentResponseMethodBodyClass = null;
responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
}
@@ -122,7 +106,7 @@
logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
" " + requestBody + "; " + requestBody.getMethodPayload());
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
+ //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
// " " + requestBody + "; " + requestBody.getMethodPayload());
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
@@ -150,26 +134,79 @@
if (responseStatus.responseMethodBody != null)
throw new RequestResponseMappingException(requestId, "RequestId " +
requestId + " already has a response in responseMap.");
+
responseStatus.responseMethodBody = responseMethodBody;
doBatches();
+
+// if (currentResponseMethodBodyClass == null)
+// {
+// currentResponseMethodBodyClass = responseMethodBody.getClass();
+// responseStatus.responseMethodBody = responseMethodBody;
+// }
+// else if (currentResponseMethodBodyClass.equals(responseMethodBody.getClass()))
+// {
+// doBatches();
+// currentResponseMethodBodyClass = responseMethodBody.getClass();
+// responseStatus.responseMethodBody = responseMethodBody;
+// }
+// else
+// {
+// responseStatus.responseMethodBody = responseMethodBody;
+// if (batchedResponses() >= maxAccumulatedResponses)
+// doBatches();
+// }
}
// *** Management functions ***
- public batchResponseModeEnum getBatchResponseMode()
- {
- return batchResponseMode;
- }
-
- public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
+ /**
+ * Sends batched responses - i.e. all those members of responseMap that have
+ * received a response.
+ */
+ public synchronized void doBatches()
{
- if (this.batchResponseMode != batchResponseMode)
- {
- this.batchResponseMode = batchResponseMode;
- doBatches();
+ long startRequestId = 0;
+ int numAdditionalRequestIds = 0;
+ Class responseMethodBodyClass = null;
+ Iterator<Long> lItr = responseMap.keySet().iterator();
+ while (lItr.hasNext())
+ {
+ long requestId = lItr.next();
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus.responseMethodBody != null)
+ {
+// if (startRequestId == 0 || responseMethodBodyClass == null)
+// {
+// startRequestId = requestId;
+// responseMethodBodyClass = responseStatus.responseMethodBody.getClass();
+// lItr.remove();
+// }
+// else if (responseMethodBodyClass.equals(responseStatus.responseMethodBody.getClass()))
+// {
+// numAdditionalRequestIds++;
+// lItr.remove();
+// }
+// else
+// {
+// sendResponseBatchFrame(startRequestId, numAdditionalRequestIds,
+// responseStatus.responseMethodBody);
+// numAdditionalRequestIds = 0;
+// startRequestId = requestId;
+// responseMethodBodyClass = responseStatus.responseMethodBody.getClass();
+// lItr.remove();
+// }
+ sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody);
+ lItr.remove();
+ }
}
}
+ /**
+ * Total number of entries in the responseMap - including both those that
+ * are outstanding (i.e. no response has been received) and those that are
+ * batched (those for which responses have been received but have not yet
+ * been collected together and sent).
+ */
public int responsesMapSize()
{
return responseMap.size();
@@ -216,31 +253,7 @@
return responseIdCount++;
}
- private synchronized void doBatches()
- {
- switch (batchResponseMode)
- {
- case NONE:
- Iterator<Long> lItr = responseMap.keySet().iterator();
- while (lItr.hasNext())
- {
- long requestId = lItr.next();
- ResponseStatus responseStatus = responseMap.get(requestId);
- if (responseStatus.responseMethodBody != null)
- {
- sendResponseBatch(requestId, 0, responseStatus.responseMethodBody);
- lItr.remove();
- }
- }
- break;
-
- // TODO: Add additional batch mode handlers here...
- // case DELAY_FIXED:
- // case MANUAL:
- }
- }
-
- private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
+ private void sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests,
AMQMethodBody responseMethodBody)
{
long responseId = getNextResponseId(); // Get new response ID