You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/13 14:35:51 UTC
svn commit: r575289 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
./ message/
Author: arnaudsimon
Date: Thu Sep 13 05:35:48 2007
New Revision: 575289
URL: http://svn.apache.org/viewvc?rev=575289&view=rev
Log:
updated message hierarchy for using 0_10 messages
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Sep 13 05:35:48 2007
@@ -112,7 +112,7 @@
int defaultPrefetchLow)
{
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault010Registry(),
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
defaultPrefetchHigh, defaultPrefetchLow);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Sep 13 05:35:48 2007
@@ -100,7 +100,7 @@
AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
{
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefault08Registry(), defaultPrefetchHigh,
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
defaultPrefetchLow);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Sep 13 05:35:48 2007
@@ -39,18 +39,18 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class BasicMessageConsumer<H,B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
- /** The connection being used by this consumer */
+ /**
+ * The connection being used by this consumer
+ */
private AMQConnection _connection;
private String _messageSelector;
@@ -59,15 +59,23 @@
private AMQDestination _destination;
- /** When true indicates that a blocking receive call is in progress */
+ /**
+ * When true indicates that a blocking receive call is in progress
+ */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
- /** Holds an atomic reference to the listener installed. */
+ /**
+ * Holds an atomic reference to the listener installed.
+ */
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
- /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
+ /**
+ * The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker
+ */
protected AMQShortString _consumerTag;
- /** We need to know the channel id when constructing frames */
+ /**
+ * We need to know the channel id when constructing frames
+ */
protected int _channelId;
/**
@@ -82,7 +90,9 @@
protected AMQProtocolHandler _protocolHandler;
- /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
+ /**
+ * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
+ */
private FieldTable _rawSelectorFieldTable;
/**
@@ -97,7 +107,9 @@
*/
private int _prefetchLow;
- /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
+ /**
+ * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+ */
private boolean _exclusive;
/**
@@ -107,7 +119,9 @@
*/
private int _acknowledgeMode;
- /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
+ /**
+ * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ */
private int _outstanding;
/**
@@ -118,7 +132,9 @@
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
+ /**
+ * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
+ */
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
/**
@@ -138,9 +154,10 @@
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
+ AMQSession session, AMQProtocolHandler protocolHandler,
+ FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -216,8 +233,8 @@
if (_logger.isDebugEnabled())
{
- _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
- + _destination);
+ _logger.debug(
+ "Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
}
}
else
@@ -418,9 +435,7 @@
* it by throwing it (if an exception) or returning it (in any other case).
*
* @param o
- *
* @return a message only if o is a Message
- *
* @throws JMSException if the argument is a throwable. If it is a JMSException it is rethrown as is, but if not a
* JMSException is created with the linked exception set appropriately
*/
@@ -465,8 +480,8 @@
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " close():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace())
+ .subList(3, 6));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -518,8 +533,8 @@
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " markClosed():" + Arrays
+ .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously:" + _closedStack.toString());
}
else
@@ -588,14 +603,9 @@
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
- {
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame)
+ throws Exception;
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
-
- }
/**
* @param jmsMessage this message has already been processed so can't redo preDeliver
@@ -643,77 +653,79 @@
switch (_acknowledgeMode)
{
- case Session.PRE_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- break;
+ case Session.PRE_ACKNOWLEDGE:
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ break;
- case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
- break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ break;
}
}
- void postDeliver(AbstractJMSMessage msg) throws JMSException
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
{
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.DUPS_OK_ACKNOWLEDGE:
- if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
- if (_outstanding <= _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
+ if (_dups_ok_acknowledge_send)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ }
}
- }
- break;
+ break;
- case Session.AUTO_ACKNOWLEDGE:
- // we do not auto ack a message if the application code called recover()
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
+ case Session.AUTO_ACKNOWLEDGE:
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
- break;
+ break;
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- }
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ }
- break;
+ break;
}
}
- /** Acknowledge up to last message delivered (if any). Used when commiting. */
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ */
void acknowledgeLastDelivered()
{
if (!_receivedDeliveryTags.isEmpty())
@@ -740,8 +752,8 @@
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():"
- + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " notifyError():" + Arrays
+ .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
_logger.trace(_consumerTag + " previously" + _closedStack.toString());
}
else
@@ -819,7 +831,9 @@
}
}
- /** Called on recovery to reset the list of delivery tags */
+ /**
+ * Called on recovery to reset the list of delivery tags
+ */
public void clearUnackedMessages()
{
_unacknowledgedDeliveryTags.clear();
@@ -860,8 +874,8 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
- + "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags
+ .size() + ") in _receivedDTs (RQ)" + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -890,8 +904,8 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
- + "for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _synchronousQueue
+ .size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
@@ -914,8 +928,8 @@
}
else
{
- _logger.error("Queue contained a :" + o.getClass()
- + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ _logger.error("Queue contained a :" + o
+ .getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
iterator.remove();
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Sep 13 05:35:48 2007
@@ -32,11 +32,12 @@
import javax.jms.JMSException;
import java.io.IOException;
+import java.nio.ByteBuffer;
/**
* This is a 0.10 message consumer.
*/
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
implements org.apache.qpidity.client.util.MessageListener
{
/**
@@ -108,5 +109,12 @@
((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
super.postDeliver(msg);
}
-
+
+
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ {
+ return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Sep 13 05:35:48 2007
@@ -75,4 +75,13 @@
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
}
+
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+ {
+
+ return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Sep 13 05:35:48 2007
@@ -21,19 +21,19 @@
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpidity.jms.message.MessageImpl;
-import org.apache.qpidity.jms.message.MessageHelper;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpidity.jms.ExceptionHelper;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.apache.qpidity.ReplyTo;
import javax.jms.Message;
import javax.jms.JMSException;
-import java.util.UUID;
import java.io.IOException;
/**
- *
- * This is a 0_10 message producer.
+ * This is a 0_10 message producer.
*/
public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
@@ -66,85 +66,79 @@
//--- Overwritten methods
-
/**
* Sends a message to a given destination
- * We will always convert the received message
*/
public void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
boolean wait) throws JMSException
{
- // Only get current time if required
- long currentTime = Long.MIN_VALUE;
- if (!((timeToLive == 0) && _disableTimestamps))
- {
- currentTime = System.currentTimeMillis();
- }
- // the messae UID
- String uid = (getDisableMessageID()) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
- MessageImpl qpidMessage;
- // check that the message is not a foreign one
+ message.prepareForSending();
+ org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage();
+ // set the payload
try
{
- qpidMessage = (MessageImpl) origMessage;
+ qpidityMessage.appendData(message.getData().buf());
+ }
+ catch (IOException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- catch (ClassCastException cce)
+ // set the delivery properties
+ if (!_disableTimestamps)
{
- // this is a foreign message
- qpidMessage = MessageHelper.transformMessage(origMessage);
- // set message's properties in case they are queried after send.
- origMessage.setJMSDestination(destination);
- origMessage.setJMSDeliveryMode(deliveryMode);
- origMessage.setJMSPriority(priority);
- origMessage.setJMSMessageID(uid);
- if (timeToLive != 0)
+ final long currentTime = System.currentTimeMillis();
+ qpidityMessage.getDeliveryProperties().setTimestamp(currentTime);
+ if (timeToLive > 0)
{
- origMessage.setJMSExpiration(timeToLive + currentTime);
- _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ qpidityMessage.getDeliveryProperties().setExpiration(currentTime + timeToLive);
}
else
{
- origMessage.setJMSExpiration(timeToLive);
+ qpidityMessage.getDeliveryProperties().setExpiration(0);
}
- origMessage.setJMSTimestamp(currentTime);
}
- // set the message properties
- qpidMessage.setJMSDestination(destination);
- qpidMessage.setJMSMessageID(uid);
- qpidMessage.setJMSDeliveryMode(deliveryMode);
- qpidMessage.setJMSPriority(priority);
- if (timeToLive != 0)
+ qpidityMessage.getDeliveryProperties().setDeliveryMode((byte) deliveryMode);
+ qpidityMessage.getDeliveryProperties().setPriority((byte) priority);
+ qpidityMessage.getDeliveryProperties().setExchange(destination.getExchangeName().toString());
+ qpidityMessage.getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString());
+ BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+ // set the application properties
+ qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString());
+ qpidityMessage.getMessageProperties().setCorrelationId(contentHeaderProperties.getCorrelationId().toString());
+ String replyToURL = contentHeaderProperties.getReplyToAsString();
+ if (replyToURL != null)
{
- qpidMessage.setJMSExpiration(timeToLive + currentTime);
- }
- else
- {
- qpidMessage.setJMSExpiration(timeToLive);
- }
- qpidMessage.setJMSTimestamp(currentTime);
- qpidMessage.setRoutingKey(destination.getDestinationName().toString());
- qpidMessage.setExchangeName(destination.getExchangeName().toString());
- // call beforeMessageDispatch
- try
- {
- qpidMessage.beforeMessageDispatch();
+ AMQBindingURL dest;
+ try
+ {
+ dest = new AMQBindingURL(replyToURL);
+ }
+ catch (URLSyntaxException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ qpidityMessage.getMessageProperties()
+ .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
}
- catch (QpidException e)
+ if (contentHeaderProperties.getHeaders() != null)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ // todo use the new fieldTable
+ qpidityMessage.getMessageProperties().setApplicationHeaders(null);
}
+ // send the message
try
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
- qpidMessage.getQpidityMessage(),
+ ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(),
+ qpidityMessage,
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
}
catch (IOException e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ }
+
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Thu Sep 13 05:35:48 2007
@@ -72,11 +72,11 @@
_data.setAutoExpand(true);
}
- AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Thu Sep 13 05:35:48 2007
@@ -34,6 +34,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
* @author Apache Software Foundation
@@ -86,7 +87,7 @@
}
- AbstractBytesTypedMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,6 +26,10 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,10 +44,12 @@
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
- AMQShortString routingKey, ContentHeaderBody contentHeader) throws AMQException;
+ AMQShortString routingKey,
+ BasicContentHeaderProperties contentHeader) throws AMQException;
- protected AbstractJMSMessage createMessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies) throws AMQException
+ protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
+ List bodies) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -62,8 +68,8 @@
{
if (debug)
{
- _logger.debug("Fragmented message body (" + bodies.size() + " frames, bodySize=" + contentHeader.bodySize
- + ")");
+ _logger.debug("Fragmented message body (" + bodies
+ .size() + " frames, bodySize=" + contentHeader.bodySize + ")");
}
data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem?
@@ -84,17 +90,71 @@
if (debug)
{
- _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining="
- + data.remaining());
+ _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
+ .remaining());
}
- return createMessage(messageNbr, data, exchange, routingKey, contentHeader);
+ return createMessage(messageNbr, data, exchange, routingKey,
+ (BasicContentHeaderProperties) contentHeader.properties);
}
+ protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
+ List bodies) throws AMQException
+ {
+ ByteBuffer data;
+ final boolean debug = _logger.isDebugEnabled();
+
+ // we optimise the non-fragmented case to avoid copying
+ if ((bodies != null))
+ {
+ data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+ }
+ else // bodies == null
+ {
+ data = ByteBuffer.allocate(0);
+ }
+
+ if (debug)
+ {
+ _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
+ .remaining());
+ }
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ // set the properties of this message
+ MessageProperties mprop = (MessageProperties) contentHeader[0];
+ DeliveryProperties devprop = (DeliveryProperties) contentHeader[1];
+ props.setContentType(mprop.getContentType());
+ props.setCorrelationId(mprop.getCorrelationId());
+ props.setEncoding(mprop.getContentEncoding());
+ props.setExpiration(devprop.getExpiration());
+ // todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders());
+ props.setMessageId(mprop.getMessageId());
+ props.setPriority((byte) devprop.getPriority());
+ // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo()));
+ props.setTimestamp(devprop.getTimestamp());
+ props.setType(mprop.getType());
+ props.setUserId(mprop.getUserId());
+ return createMessage(messageNbr, data, exchange, routingKey, props);
+ }
+
+
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies) throws JMSException, AMQException
+ AMQShortString exchange, AMQShortString routingKey, List bodies)
+ throws JMSException, AMQException
+ {
+ final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ msg.setJMSRedelivered(redelivered);
+
+ return msg;
+ }
+
+ public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
+ AMQShortString exchange, AMQShortString routingKey, List bodies)
+ throws JMSException, AMQException
{
- final AbstractJMSMessage msg = createMessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ final AbstractJMSMessage msg =
+ create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
return msg;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Thu Sep 13 05:35:48 2007
@@ -34,6 +34,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
@@ -57,7 +58,7 @@
super(data); // this instanties a content header
}
- JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,12 +26,13 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
@@ -40,4 +41,7 @@
{
return new JMSBytesMessage();
}
+
+ // 0_10 specific
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Sep 13 05:35:48 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@
populateMapFromData();
}
- JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,6 +26,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSMapMessageFactory extends AbstractJMSMessageFactory
{
@@ -36,8 +37,9 @@
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Sep 13 05:35:48 2007
@@ -69,10 +69,10 @@
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+ JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
ByteBuffer data) throws AMQException
{
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+ super(messageNbr, contentHeader, exchange, routingKey, data);
}
public void clearBodyImpl() throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,12 +26,13 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Sep 13 05:35:48 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
* @author Apache Software Foundation
@@ -60,7 +61,7 @@
}
- JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+ JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey, ByteBuffer data) throws AMQException
{
super(messageNbr, contentHeader, exchange, routingKey, data);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -26,16 +26,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
}
-
public AbstractJMSMessage createMessage() throws JMSException
{
return new JMSStreamMessage();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Thu Sep 13 05:35:48 2007
@@ -38,9 +38,9 @@
protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
AMQShortString exchange, AMQShortString routingKey,
- ContentHeaderBody contentHeader) throws AMQException
+ BasicContentHeaderProperties contentHeader) throws AMQException
{
- return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties,
+ return new JMSTextMessage(deliveryTag, contentHeader,
exchange, routingKey, data);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Sep 13 05:35:48 2007
@@ -27,12 +27,19 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpidity.Struct;
public interface MessageFactory
{
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
ContentHeaderBody contentHeader,
+ AMQShortString exchange, AMQShortString routingKey,
+ List bodies)
+ throws JMSException, AMQException;
+
+ AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ Struct[] contentHeader,
AMQShortString exchange, AMQShortString routingKey,
List bodies)
throws JMSException, AMQException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=575289&r1=575288&r2=575289&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Sep 13 05:35:48 2007
@@ -30,18 +30,29 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageFactoryRegistry
{
+ /**
+ * This class logger
+ */
+ protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
- new HashMap<AMQShortString, MessageFactory>();
+ new HashMap<AMQShortString, MessageFactory>();
/**
* Construct a new registry with the default message factories registered
+ *
* @return a message factory registry
*/
- public static MessageFactoryRegistry newDefault08Registry()
+ public static MessageFactoryRegistry newDefaultRegistry()
{
MessageFactoryRegistry mf = new MessageFactoryRegistry();
mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
@@ -55,24 +66,6 @@
return mf;
}
- /**
- * Construct a new 010 registry with the default message factories registered
- * @return a message factory registry
- */
- public static MessageFactoryRegistry newDefault010Registry()
- {
- // TODO use 0.10 classes
- MessageFactoryRegistry mf = new MessageFactoryRegistry();
- mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
- mf.registerFactory("text/plain", new JMSTextMessageFactory());
- mf.registerFactory("text/xml", new JMSTextMessageFactory());
- mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
- mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
- mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
- mf.registerFactory(null, new JMSBytesMessageFactory());
-
- return mf;
- }
public void registerFactory(String mimeType, MessageFactory mf)
@@ -96,25 +89,26 @@
/**
* Create a message. This looks up the MIME type from the content header and instantiates the appropriate
* concrete message type.
- * @param deliveryTag the AMQ message id
- * @param redelivered true if redelivered
+ *
+ * @param deliveryTag the AMQ message id
+ * @param redelivered true if redelivered
* @param contentHeader the content header that was received
- * @param bodies a list of ContentBody instances
+ * @param bodies a list of ContentBody instances
* @return the message.
* @throws AMQException
* @throws JMSException
*/
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
- throws AMQException, JMSException
+ throws AMQException, JMSException
{
BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
AMQShortString contentTypeShortString = properties.getContentType();
- contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
- : contentTypeShortString;
+ contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(
+ JMSBytesMessage.MIME_TYPE) : contentTypeShortString;
MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
@@ -126,6 +120,29 @@
return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
}
}
+
+ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+ AMQShortString routingKey, Struct[] contentHeader, List bodies)
+ throws AMQException, JMSException
+ {
+ MessageProperties mprop = (MessageProperties) contentHeader[0];
+ String messageType = mprop.getContentType();
+ if (messageType == null)
+ {
+ _logger.debug("no message type specified, building a byte message");
+ messageType = JMSBytesMessage.MIME_TYPE;
+ }
+ MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+ if (mf == null)
+ {
+ throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
+ }
+ else
+ {
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+ }
+ }
+
public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
{