You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/17 20:49:41 UTC

svn commit: r497138 - in /incubator/qpid/branches/qpid.0-9/java: client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java common/src/main/java/org/apache/qpid/framing/Content.java

Author: kpvdr
Date: Wed Jan 17 11:49:40 2007
New Revision: 497138

URL: http://svn.apache.org/viewvc?view=rev&rev=497138
Log:
Conversion of client/BasicMessageProducer to new Message class

Modified:
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497138&r1=497137&r2=497138
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Jan 17 11:49:40 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.MessageHeaders;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.*;
 
@@ -100,7 +101,7 @@
     private final boolean _mandatory;
 
     private final boolean _waitUntilSent;
-    private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+    private static final Content[] NO_CONTENT = new Content[0];
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
                                    int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -515,24 +516,14 @@
      * @throws JMSException
      */
     protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
-                            long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+                            long timeToLive, boolean mandatory, boolean immediate, boolean wait)
+                            throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
-
         
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
         message.getMessageHeaders().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
-            (byte)0, (byte)9,	// AMQP version (major, minor)
-            destination.getExchangeName(),	// exchange
-            immediate,	// immediate
-            mandatory,	// mandatory
-            destination.getRoutingKey(),	// routingKey
-            0);	// ticket
 
         long currentTime = 0;
         if (!_disableTimestamps)
@@ -542,53 +533,61 @@
         }
         message.prepareForSending();
         ByteBuffer payload = message.getData();
-        BasicContentHeaderProperties contentHeaderProperties = message.getMessageHeaders();
+        MessageHeaders messageHeaders = message.getMessageHeaders();
 
         if (timeToLive > 0)
         {
             if (!_disableTimestamps)
             {
-                contentHeaderProperties.setExpiration(currentTime + timeToLive);
+                messageHeaders.setExpiration(currentTime + timeToLive);
             }
         }
         else
         {
             if (!_disableTimestamps)
             {
-                contentHeaderProperties.setExpiration(0);
+                messageHeaders.setExpiration(0);
             }
         }
-        contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
-        contentHeaderProperties.setPriority((byte) priority);
+//        messageHeaders.setDeliveryMode((byte) deliveryMode);
+//        messageHeaders.setPriority((byte) priority);
 
         int size = (payload != null) ? payload.limit() : 0;
-        ContentBody[] contentBodies = createContentBodies(payload);
-        AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
-        for (int i = 0; i < contentBodies.length; i++)
+        Content[] content = createContent(payload);
+        if (content.length > 0 && _logger.isDebugEnabled())
         {
-            frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]);
+            _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination);
         }
-        if (contentBodies.length > 0 && _logger.isDebugEnabled())
+        for (int i = 0; i < content.length; i++)
         {
-            _logger.debug("Sending content body frames to " + destination);
-        }
-
-        // weight argument of zero indicates no child content headers, just bodies
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)0, (byte)9), 0,
-                                                                       contentHeaderProperties,
-                                                                       size);
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Sending content header frame to " + destination);
+            AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+                (byte)0, (byte)9,               // AMQP version (major, minor)
+                messageHeaders.getAppId(),      // String appId
+                messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+                content[i],                     // Content body
+                messageHeaders.getEncoding(),   // String contentEncoding
+                messageHeaders.getContentType(), // String contentType
+                messageHeaders.getCorrelationId(), // String correlationId
+                (short)deliveryMode,            // short deliveryMode
+                messageHeaders.getDestination(), // String destination
+                destination.getExchangeName(),  // String exchange
+                messageHeaders.getExpiration(), // long expiration
+                immediate,                      // boolean immediate
+                messageHeaders.getMessageId(),  // String messageId
+                (short)priority,                // short priority
+                false,                          // boolean redelivered
+                messageHeaders.getReplyTo(),    // String replyTo
+                destination.getRoutingKey(),    // String routingKey
+                new String("abc123").getBytes(), // byte[] securityToken
+                0,                              // int ticket
+                messageHeaders.getTimestamp(),  // long timestamp
+                messageHeaders.getTransactionId(), // String transactionId
+                timeToLive,                     // long ttl
+                messageHeaders.getUserId());    // String userId
+        
+            _protocolHandler.writeRequest(_channelId, methodBody);
         }
 
-        frames[0] = publishFrame;
-        frames[1] = contentHeaderFrame;
-        CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-        _protocolHandler.writeFrame(compositeFrame, wait);
-
 
         if (message != origMessage)
         {
@@ -627,36 +626,32 @@
      * @param payload
      * @return the array of content bodies
      */
-    private ContentBody[] createContentBodies(ByteBuffer payload)
+    private Content[] createContent(ByteBuffer payload)
     {
         if (payload == null || payload.remaining() == 0)
         {
-            return NO_CONTENT_BODIES;
+            return NO_CONTENT;
         }
 
-        // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
-        // (0xCE byte).
         int dataLength = payload.remaining();
-        final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+        final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
         int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
         int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
-        final ContentBody[] bodies = new ContentBody[frameCount];
+        final Content[] bodies = new Content[frameCount];
 
         if (frameCount == 1)
         {
-            bodies[0] = new ContentBody();
-            bodies[0].payload = payload;
+            bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.array());
         }
         else
         {
             long remaining = dataLength;
             for (int i = 0; i < bodies.length; i++)
             {
-                bodies[i] = new ContentBody();
                 payload.position((int) framePayloadMax * i);
                 int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
                 payload.limit(payload.position() + length);
-                bodies[i].payload = payload.slice();
+                bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array());
                 remaining -= length;
             }
         }

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java?view=diff&rev=497138&r1=497137&r2=497138
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java Wed Jan 17 11:49:40 2007
@@ -24,7 +24,7 @@
 
 public class Content
 {
-	enum ContentTypeEnum
+	public enum ContentTypeEnum
     {
     	CONTENT_TYPE_INLINE((byte)0), CONTENT_TYPE_REFERENCE((byte)1);
         private byte type;
@@ -77,6 +77,19 @@
         }
     	this.contentType = contentType;
         this.content = content.getBytes();
+    }
+    
+    public Content(ContentTypeEnum contentType, ByteBuffer content)
+    {
+    	if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE)
+        {
+        	if (content == null)
+            	throw new IllegalArgumentException("Content cannot be null for a ref type.");
+        	if (content.array().length == 0)
+            	throw new IllegalArgumentException("Content cannot be empty for a ref type.");
+        }
+    	this.contentType = contentType;
+        this.content = content.array();
     }
     
     // Get functions