You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/12/07 09:50:24 UTC

svn commit: r1418220 - in /qpid/branches/0.20/qpid/java/amqp-1-0-client-jms: ./ src/main/java/org/apache/qpid/amqp_1_0/jms/impl/

Author: rgodfrey
Date: Fri Dec  7 08:50:21 2012
New Revision: 1418220

URL: http://svn.apache.org/viewvc?rev=1418220&view=rev
Log:
QPID-4454 : Fixes to allow to/reply-to JMS type to be carried as message annotations in AMQP 1.0 JMS client
Merged from trunk r1417368

Added:
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java
      - copied unchanged from r1417368, qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java
Modified:
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/   (props changed)
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java

Propchange: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/amqp-1-0-client-jms:r1417368

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Fri Dec  7 08:50:21 2012
@@ -43,6 +43,8 @@ public class ConnectionFactoryImpl imple
     private String _remoteHost;
     private boolean _ssl;
 
+    private String _queuePrefix;
+    private String _topicPrefix;
 
     public ConnectionFactoryImpl(final String host,
                                  final int port,
@@ -90,12 +92,15 @@ public class ConnectionFactoryImpl imple
 
     public ConnectionImpl createConnection() throws JMSException
     {
-        return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl);
+        return createConnection(_username, _password);
     }
 
     public ConnectionImpl createConnection(final String username, final String password) throws JMSException
     {
-        return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+        ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+        connection.setQueuePrefix(_queuePrefix);
+        connection.setTopicPrefix(_topicPrefix);
+        return connection;
     }
 
     public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException
@@ -211,4 +216,23 @@ public class ConnectionFactoryImpl imple
         return connection;
     }
 
+    public String getTopicPrefix()
+    {
+        return _topicPrefix;
+    }
+
+    public void setTopicPrefix(String topicPrefix)
+    {
+        _topicPrefix = topicPrefix;
+    }
+
+    public String getQueuePrefix()
+    {
+        return _queuePrefix;
+    }
+
+    public void setQueuePrefix(String queuePrefix)
+    {
+        _queuePrefix = queuePrefix;
+    }
 }

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Fri Dec  7 08:50:21 2012
@@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transpor
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import javax.jms.Queue;
+import java.util.*;
 
 public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
 {
@@ -50,6 +49,8 @@ public class ConnectionImpl implements C
     private final String _remoteHost;
     private final boolean _ssl;
     private String _clientId;
+    private String _queuePrefix;
+    private String _topicPrefix;
 
 
     private static enum State
@@ -379,4 +380,78 @@ public class ConnectionImpl implements C
     {
         _isTopicConnection = topicConnection;
     }
+
+    public String getTopicPrefix()
+    {
+        return _topicPrefix;
+    }
+
+    public void setTopicPrefix(String topicPrefix)
+    {
+        _topicPrefix = topicPrefix;
+    }
+
+    public String getQueuePrefix()
+    {
+        return _queuePrefix;
+    }
+
+    public void setQueuePrefix(String queueprefix)
+    {
+        _queuePrefix = queueprefix;
+    }
+
+    DecodedDestination toDecodedDestination(DestinationImpl dest)
+    {
+        String address = dest.getAddress();
+        Set<String> kind = null;
+        Class clazz = dest.getClass();
+        if( clazz==QueueImpl.class )
+        {
+            kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+            if( _queuePrefix!=null )
+            {
+                // Avoid double prefixing..
+                if( !address.startsWith(_queuePrefix) )
+                {
+                    address = _queuePrefix+address;
+                }
+            }
+        }
+        else if( clazz==TopicImpl.class )
+        {
+            kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+            if( _topicPrefix!=null )
+            {
+                // Avoid double prefixing..
+                if( !address.startsWith(_topicPrefix) )
+                {
+                    address = _topicPrefix+address;
+                }
+            }
+        }
+        else if( clazz==TemporaryQueueImpl.class )
+        {
+            kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+        }
+        else if( clazz==TemporaryTopicImpl.class )
+        {
+            kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+        }
+        return new DecodedDestination(address, kind);
+    }
+
+    DecodedDestination toDecodedDestination(String address, Set<String> kind)
+    {
+        if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+        {
+            return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+        }
+        if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+        {
+            return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+        }
+        return new DecodedDestination(address, kind);
+    }
+
 }

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Fri Dec  7 08:50:21 2012
@@ -127,7 +127,7 @@ public class MessageConsumerImpl impleme
     {
         try
         {
-            return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+            return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
                                                                _linkName, _durable, getFilters(), null);
         }
         catch (AmqpErrorException e)

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java Fri Dec  7 08:50:21 2012
@@ -50,14 +50,24 @@ public abstract class MessageImpl implem
     static final Set<Class> _supportedClasses =
                 new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
                                                  Float.class, Double.class, Character.class, String.class, byte[].class));
-    private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+    static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+    static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type");
+    static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type");
+
+    static final String QUEUE_ATTRIBUTE = "queue";
+    static final String TOPIC_ATTRIBUTE = "topic";
+    static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+    static final Set<String> JMS_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE);
+    static final Set<String> JMS_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE);
+    static final Set<String> JMS_TEMP_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
+    static final Set<String> JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
 
     private Header _header;
     private Properties _properties;
     private ApplicationProperties _applicationProperties;
     private Footer _footer;
-    public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-    private SessionImpl _sessionImpl;
+    private final SessionImpl _sessionImpl;
     private boolean _readOnly;
     private MessageAnnotations _messageAnnotations;
 
@@ -171,45 +181,53 @@ public abstract class MessageImpl implem
 
     public DestinationImpl getJMSReplyTo() throws JMSException
     {
-        return DestinationImpl.valueOf(getReplyTo());
+        return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE)));
     }
 
     public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException
     {
-        if(destination == null)
+        if( destination==null )
         {
             setReplyTo(null);
-        }
-        else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
-        {
-            setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+            messageAnnotationMap().remove(REPLY_TO_TYPE);
         }
         else
         {
-            throw new NonAMQPDestinationException(destination);
+            DecodedDestination dd = toDecodedDestination(destination);
+            setReplyTo(dd.getAddress());
+            messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes()));
         }
     }
 
     public DestinationImpl getJMSDestination() throws JMSException
     {
-        return _isFromQueue ? QueueImpl.valueOf(getTo())
-                            : _isFromTopic ? TopicImpl.valueOf(getTo())
-                                           : DestinationImpl.valueOf(getTo());
+        Set<String> type = splitCommaSeparateSet((String) getMessageAnnotation(TO_TYPE));
+        if( type==null )
+        {
+            if( _isFromQueue )
+            {
+                type = JMS_QUEUE_ATTRIBUTES;
+            }
+            else if( _isFromTopic )
+            {
+                type = JMS_TOPIC_ATTRIBUTES;
+            }
+        }
+        return toDestination(getTo(), type);
     }
 
     public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
     {
-        if(destination == null)
+        if( destination==null )
         {
             setTo(null);
-        }
-        else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
-        {
-            setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+            messageAnnotationMap().remove(TO_TYPE);
         }
         else
         {
-            throw new NonAMQPDestinationException(destination);
+            DecodedDestination dd = toDecodedDestination(destination);
+            setTo(dd.getAddress());
+            messageAnnotationMap().put(TO_TYPE, join(",", dd.getAttributes()));
         }
     }
 
@@ -264,22 +282,13 @@ public abstract class MessageImpl implem
 
     public String getJMSType() throws JMSException
     {
-        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
-        final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
-
+        final Object attrValue = getMessageAnnotation(JMS_TYPE);
         return attrValue instanceof String ? attrValue.toString() : null;
     }
 
     public void setJMSType(String s) throws JMSException
     {
-        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
-        if(messageAttrs == null)
-        {
-            messageAttrs = new HashMap();
-            _messageAnnotations = new MessageAnnotations(messageAttrs);
-        }
-
-        messageAttrs.put(JMS_TYPE, s);
+        messageAnnotationMap().put(JMS_TYPE, s);
     }
 
     public long getJMSExpiration() throws JMSException
@@ -1206,4 +1215,118 @@ public abstract class MessageImpl implem
     }
 
     abstract Collection<Section> getSections();
+
+    DecodedDestination toDecodedDestination(Destination destination) throws NonAMQPDestinationException
+    {
+        if(destination == null)
+        {
+            return null;
+        }
+        if (destination instanceof DestinationImpl)
+        {
+            return _sessionImpl.getConnection().toDecodedDestination((DestinationImpl) destination);
+        }
+        throw new NonAMQPDestinationException(destination);
+    }
+
+    DestinationImpl toDestination(String address, Set<String> kind)
+    {
+        if( address == null )
+        {
+            return null;
+        }
+
+        // If destination prefixes are in play, we have to strip the the prefix, and we might
+        // be able to infer the kind, if we don't know it yet.
+        DecodedDestination decoded = _sessionImpl.getConnection().toDecodedDestination(address, kind);
+        address = decoded.getAddress();
+        kind = decoded.getAttributes();
+
+        if( kind == null )
+        {
+            return DestinationImpl.valueOf(address);
+        }
+        if( kind.contains(QUEUE_ATTRIBUTE) )
+        {
+            if( kind.contains(TEMPORARY_ATTRIBUTE) )
+            {
+                return new TemporaryQueueImpl(address, null, _sessionImpl);
+            }
+            else
+            {
+                return QueueImpl.valueOf(address);
+            }
+        }
+        else if ( kind.contains(TOPIC_ATTRIBUTE) )
+        {
+            if( kind.contains(TEMPORARY_ATTRIBUTE) )
+            {
+                return new TemporaryTopicImpl(address, null, _sessionImpl);
+            }
+            else
+            {
+                return TopicImpl.valueOf(address);
+            }
+        }
+
+        return DestinationImpl.valueOf(address);
+    }
+
+    private Object getMessageAnnotation(Symbol key)
+    {
+        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+        return messageAttrs == null ? null : messageAttrs.get(key);
+    }
+
+    private Map messageAnnotationMap()
+    {
+        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+        if(messageAttrs == null)
+        {
+            messageAttrs = new HashMap();
+            _messageAnnotations = new MessageAnnotations(messageAttrs);
+        }
+        return messageAttrs;
+    }
+
+    Set<String> splitCommaSeparateSet(String value)
+    {
+        if( value == null )
+        {
+            return null;
+        }
+        HashSet<String> rc = new HashSet<String>();
+        for( String x: value.split("\\s*,\\s*") )
+        {
+            rc.add(x);
+        }
+        return rc;
+    }
+
+    private static Set<String> set(String ...args)
+    {
+        HashSet<String> s = new HashSet<String>();
+        for (String arg : args)
+        {
+            s.add(arg);
+        }
+        return Collections.unmodifiableSet(s);
+    }
+
+    static final String join(String sep, Iterable items)
+    {
+        StringBuilder result = new StringBuilder();
+
+        for (Object o : items)
+        {
+            if (result.length() > 0)
+            {
+                result.append(sep);
+            }
+            result.append(o.toString());
+        }
+
+        return result.toString();
+    }
+
 }

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Fri Dec  7 08:50:21 2012
@@ -20,7 +20,6 @@ package org.apache.qpid.amqp_1_0.jms.imp
 
 import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.MessageProducer;
-import org.apache.qpid.amqp_1_0.jms.Queue;
 import org.apache.qpid.amqp_1_0.jms.QueueSender;
 import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
 import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
@@ -61,7 +60,7 @@ public class MessageProducerImpl impleme
         {
             try
             {
-                _sender = _session.getClientSession().createSender(_destination.getAddress());
+                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
             }
             catch (Sender.SenderCreationException e)
             {
@@ -297,7 +296,7 @@ public class MessageProducerImpl impleme
             try
             {
                 _destination = (DestinationImpl) destination;
-                _sender = _session.getClientSession().createSender(_destination.getAddress());
+                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
 
                 send(message, deliveryMode, priority, ttl);
 

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java Fri Dec  7 08:50:21 2012
@@ -100,7 +100,7 @@ public class QueueBrowserImpl implements
         {
             try
             {
-                _receiver = _session.getClientSession().createReceiver(_queue.getAddress(),
+                _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
                         StdDistMode.COPY,
                         AcknowledgeMode.AMO, null,
                         false,

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Fri Dec  7 08:50:21 2012
@@ -41,7 +41,7 @@ public class QueueReceiverImpl extends M
     {
         try
         {
-            return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+            return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
         }
         catch (AmqpErrorException e)
         {

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Fri Dec  7 08:50:21 2012
@@ -899,4 +899,10 @@ public class SessionImpl implements Sess
     {
         _isTopicSession = topicSession;
     }
+
+    String toAddress(DestinationImpl dest)
+    {
+        return _connection.toDecodedDestination(dest).getAddress();
+    }
+
 }

Modified: qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java?rev=1418220&r1=1418219&r2=1418220&view=diff
==============================================================================
--- qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java (original)
+++ qpid/branches/0.20/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java Fri Dec  7 08:50:21 2012
@@ -66,7 +66,7 @@ public class TopicSubscriberImpl extends
     {
         try
         {
-            String address = getDestination().getAddress();
+            String address = getSession().toAddress(getDestination());
             Receiver receiver = getSession().getClientSession().createReceiver(address,
                                                                                StdDistMode.COPY, AcknowledgeMode.ALO,
                                                                                getLinkName(), isDurable(), getFilters(),



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org