You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/01/19 19:52:09 UTC

svn commit: r497906 - /incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java

Author: rajith
Date: Fri Jan 19 10:52:08 2007
New Revision: 497906

URL: http://svn.apache.org/viewvc?view=rev&rev=497906
Log:
added logic to handle the Reference case when transfering large messages

Modified:
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497906&r1=497905&r2=497906
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Jan 19 10:52:08 2007
@@ -32,6 +32,9 @@
 import javax.jms.*;
 import java.io.UnsupportedEncodingException;
 import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 
 public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
@@ -559,49 +562,43 @@
         }
 
         int size = (payload != null) ? payload.limit() : 0;
-        Content[] content = createContent(payload);
-        if (content.length > 0 && _logger.isDebugEnabled())
-        {
-            _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination);
-        }
-        for (int i = 0; i < content.length; i++)
-        {
-            try
-            {
-                AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
-                    (byte)0, (byte)9,               // AMQP version (major, minor)
-                    messageHeaders.getAppId(),      // String appId
-                    messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
-                    content[i],                     // Content body
-                    messageHeaders.getEncoding(),   // String contentEncoding
-                    messageHeaders.getContentType(), // String contentType
-                    messageHeaders.getCorrelationId(), // String correlationId
-                    (short)deliveryMode,            // short deliveryMode
-                    messageHeaders.getDestination(), // String destination
-                    destination.getExchangeName(),  // String exchange
-                    messageHeaders.getExpiration(), // long expiration
-                    immediate,                      // boolean immediate
-                    messageHeaders.getMessageId(),  // String messageId
-                    (short)priority,                // short priority
-                    message.getJMSRedelivered(),    // boolean redelivered
-                    messageHeaders.getReplyTo(),    // String replyTo
-                    destination.getRoutingKey(),    // String routingKey
-                    new String("abc123").getBytes(), // byte[] securityToken
-                    0,                              // int ticket
-                    messageHeaders.getTimestamp(),  // long timestamp
-                    messageHeaders.getTransactionId(), // String transactionId
-                    timeToLive,                     // long ttl
-                    messageHeaders.getUserId());    // String userId
+        final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
         
-                _protocolHandler.writeRequest(_channelId, methodBody);
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException(e.toString());
-            }
+        if (size < framePayloadMax){
+        	// Inline message case
+        	_logger.debug("Inline case, sending data inline with the transfer method");
+        	Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE,payload.array()); 
+        	doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate);
+        } else {
+        	// Reference message case
+            // Sequence is as follows 
+        	// 1. Message.open
+        	// 2. Message.Transfer
+        	// 3. "n" of Message.append
+        	// 4. Message.close
+        	List content = createContent(payload);
+        	if(_logger.isDebugEnabled())
+        	{
+        		_logger.debug("Reference case, sending data as chunks");
+        		_logger.debug("Sending " + content.size() + " Message.Transfer frames to " + destination);
+        	}
+        	// Message.Open
+        	String referenceId = generateReferenceId();
+        	doMessageOpen(referenceId);
+        	
+        	// Message.Transfer
+        	Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_REFERENCE,referenceId.getBytes()); 
+        	doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate);
+        	
+        	//Message.Append
+        	for(Iterator it = content.iterator(); it.hasNext();){
+        		doMessageAppend(referenceId,(byte[])it.next());
+        	}
+        	
+        	//Message.Close
+        	doMessageClose(referenceId);
         }
-
-
+                
         if (message != origMessage)
         {
             _logger.warn("Updating original message");
@@ -612,6 +609,59 @@
             origMessage.setJMSMessageID(message.getJMSMessageID());
         }
     }
+    
+    private void doMessageTransfer(MessageHeaders messageHeaders,AMQDestination destination, Content content, AbstractJMSMessage message, int deliveryMode, int priority,
+            long timeToLive, boolean immediate)throws JMSException{
+    	try
+        {
+            AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+                (byte)0, (byte)9,               // AMQP version (major, minor)
+                messageHeaders.getAppId(),      // String appId
+                messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+                content,                     // Content body
+                messageHeaders.getEncoding(),   // String contentEncoding
+                messageHeaders.getContentType(), // String contentType
+                messageHeaders.getCorrelationId(), // String correlationId
+                (short)deliveryMode,            // short deliveryMode
+                messageHeaders.getDestination(), // String destination
+                destination.getExchangeName(),  // String exchange
+                messageHeaders.getExpiration(), // long expiration
+                immediate,                      // boolean immediate
+                messageHeaders.getMessageId(),  // String messageId
+                (short)priority,                // short priority
+                message.getJMSRedelivered(),    // boolean redelivered
+                messageHeaders.getReplyTo(),    // String replyTo
+                destination.getRoutingKey(),    // String routingKey
+                new String("abc123").getBytes(), // byte[] securityToken
+                0,                              // int ticket
+                messageHeaders.getTimestamp(),  // long timestamp
+                messageHeaders.getTransactionId(), // String transactionId
+                timeToLive,                     // long ttl
+                messageHeaders.getUserId());    // String userId
+    
+            _protocolHandler.writeRequest(_channelId, methodBody);
+        }
+        catch (AMQException e)
+        {
+            throw new JMSException(e.toString());
+        }
+    }
+    
+    private void doMessageOpen(String referenceId){
+    	AMQMethodBody methodBody = MessageOpenBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes());
+    }
+    
+    private void doMessageAppend(String referenceId,byte[] data){
+    	AMQMethodBody methodBody = MessageAppendBody.createMethodBody((byte)0, (byte)9, data, referenceId.getBytes());
+    }
+    
+    private void doMessageClose(String referenceId){
+    	AMQMethodBody methodBody = MessageCloseBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes());
+    }
+    
+    private String generateReferenceId(){
+    	return String.valueOf(System.currentTimeMillis());
+    }
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {
@@ -631,7 +681,7 @@
             }
         }
     }
-
+       
     /**
      * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
      * maximum frame size.
@@ -639,34 +689,22 @@
      * @param payload
      * @return the array of content bodies
      */
-    private Content[] createContent(ByteBuffer payload)
+    private List createContent(ByteBuffer payload)
     {
-        if (payload == null || payload.remaining() == 0)
-        {
-            return NO_CONTENT;
-        }
-
         int dataLength = payload.remaining();
         final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
         int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
         int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
-        final Content[] bodies = new Content[frameCount];
+        List bodies = new LinkedList();
 
-        if (frameCount == 1)
-        {
-            bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload);
-        }
-        else
+        long remaining = dataLength;
+        for (int i = 0; i < frameCount + lastFrame; i++)
         {
-            long remaining = dataLength;
-            for (int i = 0; i < bodies.length; i++)
-            {
-                payload.position((int) framePayloadMax * i);
-                int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
-                payload.limit(payload.position() + length);
-                bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice());
-                remaining -= length;
-            }
+            payload.position((int) framePayloadMax * i);
+            int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+            payload.limit(payload.position() + length);
+            bodies.add(payload.slice().array());
+            remaining -= length;
         }
         return bodies;
     }