You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/18 13:34:31 UTC

svn commit: r576853 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: ./ message/

Author: arnaudsimon
Date: Tue Sep 18 04:34:30 2007
New Revision: 576853

URL: http://svn.apache.org/viewvc?rev=576853&view=rev
Log:
added 0_10 replyTo support

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 18 04:34:30 2007
@@ -29,6 +29,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.ExchangeQueryResult;
+import org.apache.qpidity.transport.Future;
 
 import javax.jms.JMSException;
 import java.io.IOException;
@@ -76,6 +78,18 @@
             getSession().getAMQConnection().exceptionReceived(e);
         }
         Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+        // if there is a replyto destination then we need to request the exchange info
+        if (message.getMessageProperties().getReplyTo() != null)
+        {
+            Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+                    .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+            ExchangeQueryResult res = future.get();
+            // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+            String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+                    .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+                    .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+            newMessage.setReplyToURL(replyToUrl);
+        }
         newMessage.setContentHeader(headers);
         getSession().messageReceived(newMessage);
     }
@@ -111,10 +125,11 @@
     }
 
 
-    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<Struct[], ByteBuffer>  messageFrame) throws Exception
+    public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
+            UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
     {
-        return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
-            messageFrame.isRedelivered(), messageFrame.getExchange(),
-            messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+        return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
+                                             messageFrame.getExchange(), messageFrame.getRoutingKey(),
+                                             messageFrame.getContentHeader(), messageFrame.getBodies(), messageFrame.getReplyToURL());
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Tue Sep 18 04:34:30 2007
@@ -100,7 +100,7 @@
 
     protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
                                                           AMQShortString exchange, AMQShortString routingKey,
-                                                          List bodies) throws AMQException
+                                                          List bodies, String replyToURL) throws AMQException
     {
         ByteBuffer data;
         final boolean debug = _logger.isDebugEnabled();
@@ -135,7 +135,7 @@
         // todo update when fieldtable is used props.setHeaders(mprop.getApplicationHeaders());
         props.setMessageId(mprop.getMessageId());
         props.setPriority((byte) devprop.getPriority());
-        // todo we need to match the reply to props.setReplyTo(new AMQShortString(mprop.getReplyTo()));
+        props.setReplyTo(replyToURL);
         props.setTimestamp(devprop.getTimestamp());
         props.setType(mprop.getType());
         props.setUserId(mprop.getUserId());
@@ -154,11 +154,12 @@
     }
 
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
-                                            AMQShortString exchange, AMQShortString routingKey, List bodies)
+                                            AMQShortString exchange, AMQShortString routingKey, List bodies,
+                                            String replyToURL)
             throws JMSException, AMQException
     {
         final AbstractJMSMessage msg =
-                create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
+                create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
         msg.setJMSRedelivered(redelivered);
 
         return msg;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Tue Sep 18 04:34:30 2007
@@ -41,7 +41,7 @@
      AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
                                      Struct[] contentHeader,
                                      AMQShortString exchange, AMQShortString routingKey,
-                                     List bodies)
+                                     List bodies, String replyToURL)
         throws JMSException, AMQException;
 
     AbstractJMSMessage createMessage() throws JMSException;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Tue Sep 18 04:34:30 2007
@@ -67,7 +67,6 @@
     }
 
 
-
     public void registerFactory(String mimeType, MessageFactory mf)
     {
         if (mf == null)
@@ -122,11 +121,11 @@
     }
 
     public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
-                                            AMQShortString routingKey, Struct[] contentHeader, List bodies)
-            throws AMQException, JMSException
+                                            AMQShortString routingKey, Struct[] contentHeader, List bodies,
+                                            String replyTo) throws AMQException, JMSException
     {
         MessageProperties mprop = (MessageProperties) contentHeader[0];
-        String messageType =  mprop.getContentType();
+        String messageType = mprop.getContentType();
         if (messageType == null)
         {
             _logger.debug("no message type specified, building a byte message");
@@ -139,7 +138,7 @@
         }
         else
         {
-            return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+            return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Tue Sep 18 04:34:30 2007
@@ -88,4 +88,10 @@
     public abstract List<B> getBodies();
 
     public abstract H getContentHeader();
+
+    // specific to 0_10
+    public String getReplyToURL()
+    {
+        return "";
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=576853&r1=576852&r2=576853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Tue Sep 18 04:34:30 2007
@@ -38,6 +38,7 @@
 public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
 {
     private Struct[] _headers;
+    private String _replyToURL;
 
     /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
     private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
@@ -78,4 +79,14 @@
         return _bodies;
     }
 
+    // additional 0_10 method
+    public String getReplyToURL()
+    {
+        return _replyToURL;
+    }
+
+    public void setReplyToURL(String url)
+    {
+        _replyToURL = url;
+    }
 }