You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/17 20:49:41 UTC
svn commit: r497138 - in /incubator/qpid/branches/qpid.0-9/java:
client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
common/src/main/java/org/apache/qpid/framing/Content.java
Author: kpvdr
Date: Wed Jan 17 11:49:40 2007
New Revision: 497138
URL: http://svn.apache.org/viewvc?view=rev&rev=497138
Log:
Conversion of client/BasicMessageProducer to new Message class
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497138&r1=497137&r2=497138
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Jan 17 11:49:40 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.MessageHeaders;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
@@ -100,7 +101,7 @@
private final boolean _mandatory;
private final boolean _waitUntilSent;
- private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+ private static final Content[] NO_CONTENT = new Content[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -515,24 +516,14 @@
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate, boolean wait)
+ throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
-
AbstractJMSMessage message = convertToNativeMessage(origMessage);
message.getMessageHeaders().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
- (byte)0, (byte)9, // AMQP version (major, minor)
- destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- 0); // ticket
long currentTime = 0;
if (!_disableTimestamps)
@@ -542,53 +533,61 @@
}
message.prepareForSending();
ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getMessageHeaders();
+ MessageHeaders messageHeaders = message.getMessageHeaders();
if (timeToLive > 0)
{
if (!_disableTimestamps)
{
- contentHeaderProperties.setExpiration(currentTime + timeToLive);
+ messageHeaders.setExpiration(currentTime + timeToLive);
}
}
else
{
if (!_disableTimestamps)
{
- contentHeaderProperties.setExpiration(0);
+ messageHeaders.setExpiration(0);
}
}
- contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
- contentHeaderProperties.setPriority((byte) priority);
+// messageHeaders.setDeliveryMode((byte) deliveryMode);
+// messageHeaders.setPriority((byte) priority);
int size = (payload != null) ? payload.limit() : 0;
- ContentBody[] contentBodies = createContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
- for (int i = 0; i < contentBodies.length; i++)
+ Content[] content = createContent(payload);
+ if (content.length > 0 && _logger.isDebugEnabled())
{
- frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]);
+ _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination);
}
- if (contentBodies.length > 0 && _logger.isDebugEnabled())
+ for (int i = 0; i < content.length; i++)
{
- _logger.debug("Sending content body frames to " + destination);
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)0, (byte)9), 0,
- contentHeaderProperties,
- size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
+ AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+ (byte)0, (byte)9, // AMQP version (major, minor)
+ messageHeaders.getAppId(), // String appId
+ messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+ content[i], // Content body
+ messageHeaders.getEncoding(), // String contentEncoding
+ messageHeaders.getContentType(), // String contentType
+ messageHeaders.getCorrelationId(), // String correlationId
+ (short)deliveryMode, // short deliveryMode
+ messageHeaders.getDestination(), // String destination
+ destination.getExchangeName(), // String exchange
+ messageHeaders.getExpiration(), // long expiration
+ immediate, // boolean immediate
+ messageHeaders.getMessageId(), // String messageId
+ (short)priority, // short priority
+ false, // boolean redelivered
+ messageHeaders.getReplyTo(), // String replyTo
+ destination.getRoutingKey(), // String routingKey
+ new String("abc123").getBytes(), // byte[] securityToken
+ 0, // int ticket
+ messageHeaders.getTimestamp(), // long timestamp
+ messageHeaders.getTransactionId(), // String transactionId
+ timeToLive, // long ttl
+ messageHeaders.getUserId()); // String userId
+
+ _protocolHandler.writeRequest(_channelId, methodBody);
}
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
-
if (message != origMessage)
{
@@ -627,36 +626,32 @@
* @param payload
* @return the array of content bodies
*/
- private ContentBody[] createContentBodies(ByteBuffer payload)
+ private Content[] createContent(ByteBuffer payload)
{
if (payload == null || payload.remaining() == 0)
{
- return NO_CONTENT_BODIES;
+ return NO_CONTENT;
}
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- final ContentBody[] bodies = new ContentBody[frameCount];
+ final Content[] bodies = new Content[frameCount];
if (frameCount == 1)
{
- bodies[0] = new ContentBody();
- bodies[0].payload = payload;
+ bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.array());
}
else
{
long remaining = dataLength;
for (int i = 0; i < bodies.length; i++)
{
- bodies[i] = new ContentBody();
payload.position((int) framePayloadMax * i);
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- bodies[i].payload = payload.slice();
+ bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array());
remaining -= length;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java?view=diff&rev=497138&r1=497137&r2=497138
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java Wed Jan 17 11:49:40 2007
@@ -24,7 +24,7 @@
public class Content
{
- enum ContentTypeEnum
+ public enum ContentTypeEnum
{
CONTENT_TYPE_INLINE((byte)0), CONTENT_TYPE_REFERENCE((byte)1);
private byte type;
@@ -77,6 +77,19 @@
}
this.contentType = contentType;
this.content = content.getBytes();
+ }
+
+ public Content(ContentTypeEnum contentType, ByteBuffer content)
+ {
+ if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE)
+ {
+ if (content == null)
+ throw new IllegalArgumentException("Content cannot be null for a ref type.");
+ if (content.array().length == 0)
+ throw new IllegalArgumentException("Content cannot be empty for a ref type.");
+ }
+ this.contentType = contentType;
+ this.content = content.array();
}
// Get functions