You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/01/19 19:52:09 UTC
svn commit: r497906 -
/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Author: rajith
Date: Fri Jan 19 10:52:08 2007
New Revision: 497906
URL: http://svn.apache.org/viewvc?view=rev&rev=497906
Log:
added logic to handle the Reference case when transfering large messages
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497906&r1=497905&r2=497906
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Jan 19 10:52:08 2007
@@ -32,6 +32,9 @@
import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -559,49 +562,43 @@
}
int size = (payload != null) ? payload.limit() : 0;
- Content[] content = createContent(payload);
- if (content.length > 0 && _logger.isDebugEnabled())
- {
- _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination);
- }
- for (int i = 0; i < content.length; i++)
- {
- try
- {
- AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- content[i], // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)deliveryMode, // short deliveryMode
- messageHeaders.getDestination(), // String destination
- destination.getExchangeName(), // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- messageHeaders.getMessageId(), // String messageId
- (short)priority, // short priority
- message.getJMSRedelivered(), // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- destination.getRoutingKey(), // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- timeToLive, // long ttl
- messageHeaders.getUserId()); // String userId
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
- _protocolHandler.writeRequest(_channelId, methodBody);
- }
- catch (AMQException e)
- {
- throw new JMSException(e.toString());
- }
+ if (size < framePayloadMax){
+ // Inline message case
+ _logger.debug("Inline case, sending data inline with the transfer method");
+ Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE,payload.array());
+ doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate);
+ } else {
+ // Reference message case
+ // Sequence is as follows
+ // 1. Message.open
+ // 2. Message.Transfer
+ // 3. "n" of Message.append
+ // 4. Message.close
+ List content = createContent(payload);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Reference case, sending data as chunks");
+ _logger.debug("Sending " + content.size() + " Message.Transfer frames to " + destination);
+ }
+ // Message.Open
+ String referenceId = generateReferenceId();
+ doMessageOpen(referenceId);
+
+ // Message.Transfer
+ Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_REFERENCE,referenceId.getBytes());
+ doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate);
+
+ //Message.Append
+ for(Iterator it = content.iterator(); it.hasNext();){
+ doMessageAppend(referenceId,(byte[])it.next());
+ }
+
+ //Message.Close
+ doMessageClose(referenceId);
}
-
-
+
if (message != origMessage)
{
_logger.warn("Updating original message");
@@ -612,6 +609,59 @@
origMessage.setJMSMessageID(message.getJMSMessageID());
}
}
+
+ private void doMessageTransfer(MessageHeaders messageHeaders,AMQDestination destination, Content content, AbstractJMSMessage message, int deliveryMode, int priority,
+ long timeToLive, boolean immediate)throws JMSException{
+ try
+ {
+ AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+ (byte)0, (byte)9, // AMQP version (major, minor)
+ messageHeaders.getAppId(), // String appId
+ messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+ content, // Content body
+ messageHeaders.getEncoding(), // String contentEncoding
+ messageHeaders.getContentType(), // String contentType
+ messageHeaders.getCorrelationId(), // String correlationId
+ (short)deliveryMode, // short deliveryMode
+ messageHeaders.getDestination(), // String destination
+ destination.getExchangeName(), // String exchange
+ messageHeaders.getExpiration(), // long expiration
+ immediate, // boolean immediate
+ messageHeaders.getMessageId(), // String messageId
+ (short)priority, // short priority
+ message.getJMSRedelivered(), // boolean redelivered
+ messageHeaders.getReplyTo(), // String replyTo
+ destination.getRoutingKey(), // String routingKey
+ new String("abc123").getBytes(), // byte[] securityToken
+ 0, // int ticket
+ messageHeaders.getTimestamp(), // long timestamp
+ messageHeaders.getTransactionId(), // String transactionId
+ timeToLive, // long ttl
+ messageHeaders.getUserId()); // String userId
+
+ _protocolHandler.writeRequest(_channelId, methodBody);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
+ }
+
+ private void doMessageOpen(String referenceId){
+ AMQMethodBody methodBody = MessageOpenBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes());
+ }
+
+ private void doMessageAppend(String referenceId,byte[] data){
+ AMQMethodBody methodBody = MessageAppendBody.createMethodBody((byte)0, (byte)9, data, referenceId.getBytes());
+ }
+
+ private void doMessageClose(String referenceId){
+ AMQMethodBody methodBody = MessageCloseBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes());
+ }
+
+ private String generateReferenceId(){
+ return String.valueOf(System.currentTimeMillis());
+ }
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
@@ -631,7 +681,7 @@
}
}
}
-
+
/**
* Create content bodies. This will split a large message into numerous bodies depending on the negotiated
* maximum frame size.
@@ -639,34 +689,22 @@
* @param payload
* @return the array of content bodies
*/
- private Content[] createContent(ByteBuffer payload)
+ private List createContent(ByteBuffer payload)
{
- if (payload == null || payload.remaining() == 0)
- {
- return NO_CONTENT;
- }
-
int dataLength = payload.remaining();
final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- final Content[] bodies = new Content[frameCount];
+ List bodies = new LinkedList();
- if (frameCount == 1)
- {
- bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload);
- }
- else
+ long remaining = dataLength;
+ for (int i = 0; i < frameCount + lastFrame; i++)
{
- long remaining = dataLength;
- for (int i = 0; i < bodies.length; i++)
- {
- payload.position((int) framePayloadMax * i);
- int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice());
- remaining -= length;
- }
+ payload.position((int) framePayloadMax * i);
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+ payload.limit(payload.position() + length);
+ bodies.add(payload.slice().array());
+ remaining -= length;
}
return bodies;
}