You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/18 13:34:31 UTC
svn commit: r576853 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
./ message/
Author: arnaudsimon
Date: Tue Sep 18 04:34:30 2007
New Revision: 576853
URL: http://svn.apache.org/viewvc?rev=576853&view=rev
Log:
added 0_10 replyTo support
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 18 04:34:30 2007
@@ -29,6 +29,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.ExchangeQueryResult;
+import org.apache.qpidity.transport.Future;
import javax.jms.JMSException;
import java.io.IOException;
@@ -76,6 +78,18 @@
getSession().getAMQConnection().exceptionReceived(e);
}
Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+ // if there is a replyto destination then we need to request the exchange info
+ if (message.getMessageProperties().getReplyTo() != null)
+ {
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+ .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+ .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+ .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+ newMessage.setReplyToURL(replyToUrl);
+ }
newMessage.setContentHeader(headers);
getSession().messageReceived(newMessage);
}
@@ -111,10 +125,11 @@
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
+ UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
+ messageFrame.getExchange(), messageFrame.getRoutingKey(),
+ messageFrame.getContentHeader(), messageFrame.getBodies(), messageFrame.getReplyToURL());
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Tue Sep 18 04:34:30 2007
@@ -100,7 +100,7 @@
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
AMQShortString exchange, AMQShortString routingKey,
- List bodies) throws AMQException
+ List bodies, String replyToURL) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
@@ -135,7 +135,7 @@
// todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders());
props.setMessageId(mprop.getMessageId());
props.setPriority((byte) devprop.getPriority());
- // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo()));
+ props.setReplyTo(replyToURL);
props.setTimestamp(devprop.getTimestamp());
props.setType(mprop.getType());
props.setUserId(mprop.getUserId());
@@ -154,11 +154,12 @@
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey, List bodies)
+ AMQShortString exchange, AMQShortString routingKey, List bodies,
+ String replyToURL)
throws JMSException, AMQException
{
final AbstractJMSMessage msg =
- create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+ create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
msg.setJMSRedelivered(redelivered);
return msg;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Tue Sep 18 04:34:30 2007
@@ -41,7 +41,7 @@
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
Struct[] contentHeader,
AMQShortString exchange, AMQShortString routingKey,
- List bodies)
+ List bodies, String replyToURL)
throws JMSException, AMQException;
AbstractJMSMessage createMessage() throws JMSException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Tue Sep 18 04:34:30 2007
@@ -67,7 +67,6 @@
}
-
public void registerFactory(String mimeType, MessageFactory mf)
{
if (mf == null)
@@ -122,11 +121,11 @@
}
public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, Struct[] contentHeader, List bodies)
- throws AMQException, JMSException
+ AMQShortString routingKey, Struct[] contentHeader, List bodies,
+ String replyTo) throws AMQException, JMSException
{
MessageProperties mprop = (MessageProperties) contentHeader[0];
- String messageType = mprop.getContentType();
+ String messageType = mprop.getContentType();
if (messageType == null)
{
_logger.debug("no message type specified, building a byte message");
@@ -139,7 +138,7 @@
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Tue Sep 18 04:34:30 2007
@@ -88,4 +88,10 @@
public abstract List<B> getBodies();
public abstract H getContentHeader();
+
+ // specific to 0_10
+ public String getReplyToURL()
+ {
+ return "";
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Tue Sep 18 04:34:30 2007
@@ -38,6 +38,7 @@
public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
{
private Struct[] _headers;
+ private String _replyToURL;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
@@ -78,4 +79,14 @@
return _bodies;
}
+ // additional 0_10 method
+ public String getReplyToURL()
+ {
+ return _replyToURL;
+ }
+
+ public void setReplyToURL(String url)
+ {
+ _replyToURL = url;
+ }
}