You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/28 16:31:52 UTC

svn commit: r1621143 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid: client/ client/message/ client/messaging/address/ jndi/

Author: rgodfrey
Date: Thu Aug 28 14:31:52 2014
New Revision: 1621143

URL: http://svn.apache.org/r1621143
Log:
QPID-6052 : Use ADDR addresses for JMSDestination on incoming messages in 0-9-1 when the address mode is ADDR

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAnyDestination.java Thu Aug 28 14:31:52 2014
@@ -58,7 +58,7 @@ public class AMQAnyDestination extends A
         super(str);
     }
     
-    public AMQAnyDestination(Address addr) throws Exception
+    public AMQAnyDestination(Address addr)
     {
         super(addr);
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Aug 28 14:31:52 2014
@@ -211,7 +211,7 @@ public abstract class AMQDestination imp
     {
     }
 
-    protected AMQDestination(Address address) throws Exception
+    protected AMQDestination(Address address)
     {
         this._address = address;
         getInfoFromAddress();
@@ -749,7 +749,8 @@ public abstract class AMQDestination imp
         }
     }
 
-    public static Destination createDestination(String str) throws Exception
+    public static Destination createDestination(String str, final boolean useNodeTypeForDestinationType)
+            throws URISyntaxException
     {
          DestSyntax syntax = getDestType(str);
          str = stripSyntaxPrefix(str);
@@ -760,7 +761,24 @@ public abstract class AMQDestination imp
          else
          {
              Address address = createAddressFromString(str);
-             return new AMQAnyDestination(address);
+             if(useNodeTypeForDestinationType)
+             {
+                 AddressHelper helper = new AddressHelper(address);
+                 switch(helper.getNodeType())
+                 {
+                     case AMQDestination.QUEUE_TYPE:
+                         return new AMQQueue(address);
+                     case AMQDestination.TOPIC_TYPE:
+                         return new AMQTopic(address);
+                     default:
+                         return new AMQAnyDestination(address);
+                 }
+
+             }
+             else
+             {
+                 return new AMQAnyDestination(address);
+             }
          }
     }
 
@@ -912,7 +930,7 @@ public abstract class AMQDestination imp
         return Address.parse(str);
     }
 
-    private void getInfoFromAddress() throws Exception
+    private void getInfoFromAddress()
     {
         _name = _address.getName();
         _subject = _address.getSubject();

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java Thu Aug 28 14:31:52 2014
@@ -20,13 +20,15 @@
  */
 package org.apache.qpid.client;
 
+import java.net.URISyntaxException;
+
+import javax.jms.Queue;
+
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
 import org.apache.qpid.url.BindingURL;
 
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-
 public class AMQQueue extends AMQDestination implements Queue
 {
     private static final long serialVersionUID = -1283142598932655606L;
@@ -36,6 +38,11 @@ public class AMQQueue extends AMQDestina
         super();
     }
 
+    public AMQQueue(Address address)
+    {
+        super(address);
+    }
+
     public AMQQueue(String address) throws URISyntaxException
     {
         super(address);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Aug 28 14:31:52 2014
@@ -1424,7 +1424,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
         Queue dest = validateQueue(destination);
         C consumer = (C) createConsumer(dest);
-
+        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
         return new QueueReceiverAdaptor(dest, consumer);
     }
 
@@ -1443,7 +1443,7 @@ public abstract class AMQSession<C exten
         checkValidDestination(destination);
         Queue dest = validateQueue(destination);
         C consumer = (C) createConsumer(dest, messageSelector);
-
+        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
         return new QueueReceiverAdaptor(dest, consumer);
     }
 
@@ -1461,7 +1461,7 @@ public abstract class AMQSession<C exten
         checkNotClosed();
         Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest);
-
+        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
         return new QueueReceiverAdaptor(dest, consumer);
     }
 
@@ -1480,7 +1480,7 @@ public abstract class AMQSession<C exten
         checkNotClosed();
         Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest, messageSelector);
-
+        consumer.setAddressType(AMQDestination.QUEUE_TYPE);
         return new QueueReceiverAdaptor(dest, consumer);
     }
 
@@ -2590,7 +2590,7 @@ public abstract class AMQSession<C exten
                 ("Cannot create a durable subscription with a temporary topic: " + topic);
         }
 
-        if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
+        if (!(topic instanceof AMQDestination))
         {
             throw new javax.jms.InvalidDestinationException(
                     "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Aug 28 14:31:52 2014
@@ -774,7 +774,8 @@ public class AMQSession_0_8 extends AMQS
                     // Bounced message is processed here, away from the mina thread
                     AbstractJMSMessage bouncedMessage =
                             getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
-                                    msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
+                                    msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache,
+                                    _topicDestinationCache, AMQDestination.UNKNOWN_TYPE);
                     AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
                     AMQShortString reason = msg.getReplyText();
                     _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Aug 28 14:31:52 2014
@@ -20,16 +20,16 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.url.BindingURL;
+import java.net.URISyntaxException;
 
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Topic;
-import java.net.URISyntaxException;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
+import org.apache.qpid.url.BindingURL;
 
 public class AMQTopic extends AMQDestination implements Topic
 {
@@ -40,7 +40,7 @@ public class AMQTopic extends AMQDestina
         super(address);
     }
 
-    public AMQTopic(Address address) throws Exception
+    public AMQTopic(Address address)
     {
         super(address);
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Aug 28 14:31:52 2014
@@ -136,6 +136,7 @@ public abstract class BasicMessageConsum
     private List<StackTraceElement> _closedStack = null;
 
     private boolean _isDurableSubscriber = false;
+    private int _addressType = AMQDestination.UNKNOWN_TYPE;
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
                                    String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -203,7 +204,7 @@ public abstract class BasicMessageConsum
         }
 
         _arguments = ft;
-
+        _addressType = _destination.getAddressType();
     }
 
     public AMQDestination getDestination()
@@ -1066,4 +1067,14 @@ public abstract class BasicMessageConsum
     {
         _isDurableSubscriber = true;
     }
+
+    void setAddressType(final int addressType)
+    {
+        _addressType = addressType;
+    }
+
+    int getAddressType()
+    {
+        return _addressType;
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Aug 28 14:31:52 2014
@@ -151,7 +151,7 @@ public class BasicMessageConsumer_0_8 ex
         return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
                 messageFrame.isRedelivered(), messageFrame.getExchange() == null ? AMQShortString.EMPTY_STRING : messageFrame.getExchange(),
                 messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
-                _queueDestinationCache, _topicDestinationCache);
+                _queueDestinationCache, _topicDestinationCache, getAddressType());
 
     }
 
@@ -164,4 +164,6 @@ public class BasicMessageConsumer_0_8 ex
     {
         return _rejectBehaviour;
     }
+
+
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Thu Aug 28 14:31:52 2014
@@ -20,14 +20,14 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.AMQException;
-
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
+import org.apache.qpid.AMQException;
+
 /**
  * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
  *
@@ -43,6 +43,7 @@ class TopicSubscriberAdaptor<C extends B
         _topic = topic;
         _consumer = consumer;
         _noLocal = noLocal;
+        consumer.setAddressType(AMQDestination.TOPIC_TYPE);
     }
     
     TopicSubscriberAdaptor(Topic topic, C consumer)

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Aug 28 14:31:52 2014
@@ -133,7 +133,7 @@ public class AMQMessageDelegate_0_10 ext
                 }
             }
             dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
-                    _deliveryProps.getRoutingKey(), subject);
+                    _deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE);
         }
         
         setJMSDestination(dest);        
@@ -280,7 +280,8 @@ public class AMQMessageDelegate_0_10 ext
                 }
                 else
                 {
-                    dest = convertToAddressBasedDestination(exchange,routingKey,null);
+                    dest = convertToAddressBasedDestination(exchange,routingKey,null, false,
+                                                            AMQDestination.UNKNOWN_TYPE);
                 }
                 _destinationCache.put(replyTo, dest);
             }
@@ -288,49 +289,6 @@ public class AMQMessageDelegate_0_10 ext
             return dest;
         }
     }
-    
-    private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
-    {
-        String addr;
-        boolean isQueue = true;
-        if ("".equals(exchange)) // type Queue
-        {
-            subject = (subject == null) ? "" : "/" + subject;
-            addr = routingKey + subject;
-        }
-        else
-        {
-            addr = exchange + "/" + routingKey;
-            isQueue = false;
-        }
-        
-        try
-        {
-            AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr);
-            if (isQueue)
-            {
-                dest.setQueueName(new AMQShortString(routingKey));
-                dest.setRoutingKey(new AMQShortString(routingKey));
-                dest.setExchangeName(new AMQShortString(""));
-            }
-            else
-            {
-                dest.setRoutingKey(new AMQShortString(routingKey));
-                dest.setExchangeName(new AMQShortString(exchange));
-            }
-            return dest;
-        }
-        catch(Exception e)
-        {
-            // An exception is only thrown here if the address syntax is invalid.
-            // Logging the exception, but not throwing as this is only important to Qpid developers.
-            // An exception here means a bug in the code.
-            _logger.error("Exception when constructing an address string from the ReplyTo struct");
-            
-            // falling back to the old way of doing it to ensure the application continues.
-            return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
-        } 
-    }
 
     public void setJMSReplyTo(Destination destination) throws JMSException
     {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Thu Aug 28 14:31:52 2014
@@ -21,6 +21,18 @@
 
 package org.apache.qpid.client.message;
 
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
+
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
@@ -32,17 +44,6 @@ import org.apache.qpid.framing.BasicCont
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.Queue;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
 
 public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
 {
@@ -63,6 +64,7 @@ public class AMQMessageDelegate_0_8 exte
     });
 
     public static final String JMS_TYPE = "x-jms-type";
+    public static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms");
 
 
     private boolean _readableProperties = false;
@@ -96,7 +98,8 @@ public class AMQMessageDelegate_0_8 exte
     // Used when generating a received message object
     protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
                                      AMQShortString routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
-                                                         AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+                                                         AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+                                    int addressType)
     {
         this(contentHeader, deliveryTag);
 
@@ -104,28 +107,53 @@ public class AMQMessageDelegate_0_8 exte
 
         AMQDestination dest = null;
 
-        // If we have a type set the attempt to use that.
-        if (type != null)
+        if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
         {
-            switch (type.intValue())
+            // If we have a type set the attempt to use that.
+            if (type != null)
             {
-                case AMQDestination.QUEUE_TYPE:
-                    dest = queueDestinationCache.getDestination(exchange, routingKey);
-                    break;
-                case AMQDestination.TOPIC_TYPE:
-                    dest = topicDestinationCache.getDestination(exchange, routingKey);
-                    break;
-                default:
-                    // Use the generateDestination method
-                    dest = null;
+                switch (type.intValue())
+                {
+                    case AMQDestination.QUEUE_TYPE:
+                        dest = queueDestinationCache.getDestination(exchange, routingKey);
+                        break;
+                    case AMQDestination.TOPIC_TYPE:
+                        dest = topicDestinationCache.getDestination(exchange, routingKey);
+                        break;
+                    default:
+                        // Use the generateDestination method
+                        dest = null;
+                }
             }
-        }
 
-        if (dest == null)
+            if (dest == null)
+            {
+                dest = generateDestination(exchange, routingKey);
+            }
+        }
+        else
         {
-            dest = generateDestination(exchange, routingKey);
+            String subject = null;
+            if (contentHeader.getHeaders() != null
+                && contentHeader.getHeaders().containsKey(QpidMessageProperties.QPID_SUBJECT)
+                    && STRICT_JMS)
+            {
+                subject = contentHeader.getHeaders().getString(QpidMessageProperties.QPID_SUBJECT);
+                if (subject != null)
+                {
+                    contentHeader.getHeaders().remove(QpidMessageProperties.QPID_SUBJECT);
+                    contentHeader.getHeaders().setString("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER,
+                                                         subject);
+                }
+            }
+            if(type == null)
+            {
+                type = addressType;
+            }
+            dest = (AMQDestination) convertToAddressBasedDestination(AMQShortString.toString(exchange),
+                                                                     AMQShortString.toString(routingKey), subject,
+                                                                     true, type);
         }
-
         setJMSDestination(dest);
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java Thu Aug 28 14:31:52 2014
@@ -20,6 +20,16 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
@@ -28,11 +38,6 @@ import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * This abstract class provides exchange lookup functionality that is shared
  * between all MessageDelegates. Update facilities are provided so that the 0-10
@@ -45,6 +50,7 @@ import java.util.concurrent.ConcurrentHa
  */
 public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
 {
+    private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate.class);
 
     private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();
     private static Map<String,ExchangeInfo> _exchangeMap = new  ConcurrentHashMap<String, ExchangeInfo>();
@@ -222,6 +228,76 @@ public abstract class AbstractAMQMessage
     {
         return _session;
     }
+
+    protected Destination convertToAddressBasedDestination(String exchange,
+                                                           String routingKey,
+                                                           String subject,
+                                                           boolean useNodeTypeForDestinationType,
+                                                           int type)
+    {
+        String addr;
+        boolean isQueue = true;
+        if ("".equals(exchange)) // type Queue
+        {
+            subject = (subject == null) ? "" : "/" + subject;
+            addr = routingKey + subject;
+
+        }
+        else
+        {
+            addr = exchange + "/" + routingKey;
+            isQueue = false;
+        }
+
+        if(useNodeTypeForDestinationType)
+        {
+            if(type == AMQDestination.UNKNOWN_TYPE && "".equals(exchange))
+            {
+                type = AMQDestination.QUEUE_TYPE;
+            }
+
+            switch(type)
+            {
+                case AMQDestination.QUEUE_TYPE:
+                    addr = addr + " ; { node: { type: queue } } ";
+                    break;
+                case AMQDestination.TOPIC_TYPE:
+                    addr = addr + " ; { node: { type: topic } } ";
+                    break;
+                default:
+                    // do nothing
+            }
+        }
+
+
+        try
+        {
+            AMQDestination dest = (AMQDestination)AMQDestination.createDestination("ADDR:" + addr,
+                                                                                   useNodeTypeForDestinationType);
+            if (isQueue)
+            {
+                dest.setQueueName(new AMQShortString(routingKey));
+                dest.setRoutingKey(new AMQShortString(routingKey));
+                dest.setExchangeName(new AMQShortString(""));
+            }
+            else
+            {
+                dest.setRoutingKey(new AMQShortString(routingKey));
+                dest.setExchangeName(new AMQShortString(exchange));
+            }
+            return dest;
+        }
+        catch(Exception e)
+        {
+            // An exception is only thrown here if the address syntax is invalid.
+            // Logging the exception, but not throwing as this is only important to Qpid developers.
+            // An exception here means a bug in the code.
+            _logger.error("Exception when constructing an address string from the ReplyTo struct");
+
+            // falling back to the old way of doing it to ensure the application continues.
+            return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+        }
+    }
 }
 
 class ExchangeInfo

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Aug 28 14:31:52 2014
@@ -47,11 +47,14 @@ public abstract class AbstractJMSMessage
 {
     private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
 
-    protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
-                                                         AMQShortString exchange, AMQShortString routingKey,
+    protected AbstractJMSMessage create08MessageWithBody(long messageNbr,
+                                                         ContentHeaderBody contentHeader,
+                                                         AMQShortString exchange,
+                                                         AMQShortString routingKey,
                                                          List bodies,
                                                          AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
-                                                         AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException
+                                                         AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+                                                         final int addressType) throws AMQException
     {
         ByteBuffer data;
         final boolean debug = _logger.isDebugEnabled();
@@ -117,7 +120,8 @@ public abstract class AbstractJMSMessage
 
         AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
                                                                  contentHeader.getProperties(),
-                                                                 exchange, routingKey, queueDestinationCache, topicDestinationCache);
+                                                                 exchange, routingKey, queueDestinationCache,
+                                                                 topicDestinationCache, addressType);
 
         return createMessage(delegate, data);
     }
@@ -162,13 +166,15 @@ public abstract class AbstractJMSMessage
         return message;
     }
 
+    @Override
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, ContentHeaderBody contentHeader,
                                             AMQShortString exchange, AMQShortString routingKey, List bodies,
                                                          AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
-                                                         AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+                                                         AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+                                                         int addressType)
             throws JMSException, AMQException
     {
-        final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+        final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
         msg.setJMSRedelivered(redelivered);
         msg.setReceivedFromServer();
         return msg;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Aug 28 14:31:52 2014
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.List;
+
+import javax.jms.JMSException;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession_0_8;
@@ -29,16 +33,18 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.MessageProperties;
 
-import javax.jms.JMSException;
-import java.util.List;
-
 
 public interface MessageFactory
 {
-    AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+    AbstractJMSMessage createMessage(long deliveryTag,
+                                     boolean redelivered,
                                      ContentHeaderBody contentHeader,
-                                     AMQShortString exchange, AMQShortString routingKey,
-                                     List bodies, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+                                     AMQShortString exchange,
+                                     AMQShortString routingKey,
+                                     List bodies,
+                                     AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+                                     AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+                                     final int addressType)
         throws JMSException, AMQException;
 
      AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Aug 28 14:31:52 2014
@@ -20,6 +20,12 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,11 +40,6 @@ import org.apache.qpid.transport.Deliver
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.MessageTransfer;
 
-import javax.jms.JMSException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class MessageFactoryRegistry
 {
     /**
@@ -95,19 +96,24 @@ public class MessageFactoryRegistry
      * Create a message. This looks up the MIME type from the content header and instantiates the appropriate
      * concrete message type.
      *
-     *
-     * @param deliveryTag   the AMQ message id
+     *  @param deliveryTag   the AMQ message id
      * @param redelivered   true if redelivered
      * @param contentHeader the content header that was received
      * @param bodies        a list of ContentBody instances @return the message.
      * @param queueDestinationCache
-     *@param topicDestinationCache @throws AMQException
+     * @param topicDestinationCache @throws AMQException
+     * @param addressType
      * @throws JMSException
      */
-    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
-                                            AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies,
+    public AbstractJMSMessage createMessage(long deliveryTag,
+                                            boolean redelivered,
+                                            AMQShortString exchange,
+                                            AMQShortString routingKey,
+                                            ContentHeaderBody contentHeader,
+                                            List bodies,
                                             AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
-                                            AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
+                                            AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+                                            final int addressType)
             throws AMQException, JMSException
     {
         BasicContentHeaderProperties properties = contentHeader.getProperties();
@@ -124,7 +130,7 @@ public class MessageFactoryRegistry
             mf = _default;
         }
 
-        return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache);
+        return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
     }
 
     public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Thu Aug 28 14:31:52 2014
@@ -159,7 +159,7 @@ public class AddressHelper
         }
     }
 
-    public int getNodeType() throws Exception
+    public int getNodeType()
     {
         if (_nodePropAccess == null || _nodePropAccess.getString(TYPE) == null)
         {
@@ -176,7 +176,7 @@ public class AddressHelper
         }
         else
         {
-            throw new Exception("unkown exchange type");
+            throw new IllegalArgumentException("unknown exchange type");
         }
     }
 
@@ -212,7 +212,7 @@ public class AddressHelper
         return (result == null) ? defaultValue : result.booleanValue();
     }
 
-    public Link getLink() throws Exception
+    public Link getLink()
     {
         Link link = new Link();
         link.setSubscription(new Subscription());
@@ -235,7 +235,7 @@ public class AddressHelper
                 }
                 else
                 {
-                    throw new Exception("The reliability mode '" + 
+                    throw new IllegalArgumentException("The reliability mode '" +
                             reliability + "' is not yet supported");
                 }
             }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1621143&r1=1621142&r2=1621143&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Thu Aug 28 14:31:52 2014
@@ -255,7 +255,7 @@ public class PropertiesFileInitialContex
     {
         try
         {
-            return AMQDestination.createDestination(str);
+            return AMQDestination.createDestination(str, false);
         }
         catch (Exception e)
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org