You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/12/04 17:42:42 UTC

svn commit: r1547839 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/engine/ test/java/org/apache/qpid/jms/impl/

Author: robbie
Date: Wed Dec  4 16:42:41 2013
New Revision: 1547839

URL: http://svn.apache.org/r1547839
Log:
QPIDJMS-9: add support for JMSReplyTo, add use of consumer destination to aid JMSDestination/JMSReplyTo destination type/value discovery when we cant get the type from the type-related annotations or the 'to' field isnt set on the message

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Wed Dec  4 16:42:41 2013
@@ -217,6 +217,11 @@ public abstract class AmqpMessage
 
     //===== Properties ======
 
+    public String getContentType()
+    {
+        return _message.getContentType();
+    }
+
     public void setContentType(String contentType)
     {
         _message.setContentType(contentType);
@@ -242,6 +247,16 @@ public abstract class AmqpMessage
         _message.setCreationTime(timeInMillis);
     }
 
+    public String getReplyTo()
+    {
+        return _message.getReplyTo();
+    }
+
+    public void setReplyTo(String replyTo)
+    {
+        _message.setReplyTo(replyTo);
+    }
+
     //===== Application Properties ======
 
     private void createApplicationProperties()

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java Wed Dec  4 16:42:41 2013
@@ -25,6 +25,7 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import javax.jms.BytesMessage;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
@@ -40,15 +41,15 @@ public class BytesMessageImpl extends Me
     //message to be sent
     public BytesMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(new AmqpBytesMessage(), sessionImpl, connectionImpl);
+        super(new AmqpBytesMessage(), sessionImpl, connectionImpl);
         _bytesOut = new ByteArrayOutputStream();
         _dataAsOutput = new DataOutputStream(_bytesOut);
     }
 
     //message just received
-    public BytesMessageImpl(AmqpBytesMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public BytesMessageImpl(AmqpBytesMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
     {
-        super(amqpMessage, sessionImpl, connectionImpl);
+        super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
         _dataIn = new DataInputStream(amqpMessage.getByteArrayInputStream());
     }
 

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java Wed Dec  4 16:42:41 2013
@@ -33,6 +33,7 @@ import javax.jms.Topic;
 public class DestinationHelper
 {
     public static final String TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-to-type";
+    public static final String REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-reply-type";
 
     static final String QUEUE_ATTRIBUTE = "queue";
     static final String TOPIC_ATTRIBUTE = "topic";
@@ -57,7 +58,23 @@ public class DestinationHelper
         return new TopicImpl(address);
     }
 
-    public Destination decodeDestination(String address, String typeString)
+    private Destination createBaseDestination(String address)
+    {
+        return new DestinationImpl(address);
+    }
+
+    /**
+     * Decode the provided address, type description, and consumer destination information such that
+     * an appropriate Destination object can be returned.
+     *
+     * If an address and type description is provided then this will be used to create the Destination. If
+     * the type information is missing, it will be derived from the consumer destination if present, or
+     * default to a generic destination if not.
+     *
+     * If the address is null then the consumer destination is returned, unless the
+     * useConsumerDestForTypeOnly flag is true, in which case null will be returned.
+     */
+    public Destination decodeDestination(String address, String typeString, Destination consumerDestination, boolean useConsumerDestForTypeOnly)
     {
         Set<String> typeSet = null;
 
@@ -66,21 +83,17 @@ public class DestinationHelper
             typeSet = splitAttributes(typeString);
         }
 
-        return createDestination(address, typeSet);
+        return createDestination(address, typeSet, consumerDestination, useConsumerDestForTypeOnly);
     }
 
-    private Destination createDestination(String address, Set<String> typeSet)
+    private Destination createDestination(String address, Set<String> typeSet, Destination consumerDestination, boolean useConsumerDestForTypeOnly)
     {
         if(address == null)
         {
-            return null;
+            return useConsumerDestForTypeOnly ? null : consumerDestination;
         }
 
-        if(typeSet == null || typeSet.isEmpty())
-        {
-            //TODO: characterise Destination used to create the receiver, and create that type
-        }
-        else
+        if(typeSet != null && !typeSet.isEmpty())
         {
             if(typeSet.contains(QUEUE_ATTRIBUTE))
             {
@@ -108,8 +121,27 @@ public class DestinationHelper
             }
         }
 
+        if(consumerDestination instanceof TemporaryQueue)
+        {
+            //TODO
+            throw new IllegalArgumentException("Unsupported Destination type: " + consumerDestination.getClass().getName());
+        }
+        else if(consumerDestination instanceof TemporaryTopic)
+        {
+            //TODO
+            throw new IllegalArgumentException("Unsupported Destination type: " + consumerDestination.getClass().getName());
+        }
+        else if(consumerDestination instanceof Queue)
+        {
+            return createQueue(address);
+        }
+        else if(consumerDestination instanceof Topic)
+        {
+            return createTopic(address);
+        }
+
         //fall back to a straight Destination
-        return new DestinationImpl(address);
+        return createBaseDestination(address);
     }
 
     public Destination convertToQpidDestination(Destination dest) throws JMSException

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java Wed Dec  4 16:42:41 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.jms.impl;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
@@ -28,13 +29,13 @@ public class GenericAmqpMessageImpl exte
     //message to be sent
     public GenericAmqpMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(new AmqpGenericMessage(), sessionImpl, connectionImpl);
+        super(new AmqpGenericMessage(), sessionImpl, connectionImpl);
     }
 
     //message just received
-    public GenericAmqpMessageImpl(AmqpGenericMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public GenericAmqpMessageImpl(AmqpGenericMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
     {
-        super(amqpMessage, sessionImpl, connectionImpl);
+        super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java Wed Dec  4 16:42:41 2013
@@ -20,6 +20,7 @@ package org.apache.qpid.jms.impl;
 
 import java.util.Enumeration;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 
@@ -30,13 +31,13 @@ public class MapMessageImpl extends Mess
     //message to be sent
     public MapMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(new AmqpMapMessage(), sessionImpl, connectionImpl);
+        super(new AmqpMapMessage(), sessionImpl, connectionImpl);
     }
 
     //message just received
-    public MapMessageImpl(AmqpMapMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public MapMessageImpl(AmqpMapMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
     {
-        super(amqpMessage, sessionImpl, connectionImpl);
+        super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java Wed Dec  4 16:42:41 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.impl;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
@@ -33,31 +34,31 @@ import org.apache.qpid.jms.engine.AmqpTe
 
 public class MessageFactoryImpl
 {
-    Message createJmsMessage(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    Message createJmsMessage(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
     {
         if(amqpMessage instanceof AmqpTextMessage)
         {
-            return new TextMessageImpl((AmqpTextMessage) amqpMessage, sessionImpl, connectionImpl);
+            return new TextMessageImpl((AmqpTextMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
         }
         else if(amqpMessage instanceof AmqpBytesMessage)
         {
-            return new BytesMessageImpl((AmqpBytesMessage) amqpMessage, sessionImpl, connectionImpl);
+            return new BytesMessageImpl((AmqpBytesMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
         }
         else if(amqpMessage instanceof AmqpObjectMessage)
         {
-            return new ObjectMessageImpl((AmqpObjectMessage) amqpMessage, sessionImpl, connectionImpl);
+            return new ObjectMessageImpl((AmqpObjectMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
         }
         else if(amqpMessage instanceof AmqpListMessage)
         {
-            return new StreamMessageImpl((AmqpListMessage) amqpMessage, sessionImpl, connectionImpl);
+            return new StreamMessageImpl((AmqpListMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
         }
         else if(amqpMessage instanceof AmqpMapMessage)
         {
-            return new MapMessageImpl((AmqpMapMessage) amqpMessage, sessionImpl, connectionImpl);
+            return new MapMessageImpl((AmqpMapMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
         }
         else if(amqpMessage instanceof AmqpGenericMessage)
         {
-            return new GenericAmqpMessageImpl((AmqpGenericMessage) amqpMessage, sessionImpl, connectionImpl);
+            return new GenericAmqpMessageImpl((AmqpGenericMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
         }
         else
         {

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Wed Dec  4 16:42:41 2013
@@ -34,26 +34,28 @@ public abstract class MessageImpl<T exte
     private final T _amqpMessage;
     private final SessionImpl _sessionImpl;
     private Destination _destination;
+    private Destination _replyTo;
 
+    //message to be sent
     public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
     {
         _amqpMessage = amqpMessage;
         _sessionImpl = sessionImpl;
+    }
+
+    //message just received
+    public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination)
+    {
+        _amqpMessage = amqpMessage;
+        _sessionImpl = sessionImpl;
 
         String to = _amqpMessage.getTo();
-        if(to != null)
-        {
-            String typeString = (String) _amqpMessage.getMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
-            _destination = sessionImpl.getDestinationHelper().decodeDestination(to, typeString);
-        }
-        else
-        {
-            //TODO:
-            //Message doesn't have a To. If this message was received via a
-            //consumer (i.e we aren't creating this message to send), as a fallback
-            //we could set the Destination used to create the consumer itself. That
-            //responsibility might fall to the consumer though.
-        }
+        String toTypeString = (String) _amqpMessage.getMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        _destination = sessionImpl.getDestinationHelper().decodeDestination(to, toTypeString, consumerDestination, false);
+
+        String replyTo = _amqpMessage.getReplyTo();
+        String replyToTypeString = (String) _amqpMessage.getMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        _replyTo = sessionImpl.getDestinationHelper().decodeDestination(replyTo, replyToTypeString, consumerDestination, true);
     }
 
     T getUnderlyingAmqpMessage(boolean prepareForSending)
@@ -175,15 +177,27 @@ public abstract class MessageImpl<T exte
     @Override
     public Destination getJMSReplyTo() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        return _replyTo;
     }
 
     @Override
     public void setJMSReplyTo(Destination replyTo) throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        _replyTo = replyTo;
+
+        String replyToAddress = _sessionImpl.getDestinationHelper().decodeAddress(_replyTo);
+        String typeString = _sessionImpl.getDestinationHelper().decodeTypeString(_replyTo);
+
+        _amqpMessage.setReplyTo(replyToAddress);
+
+        if(replyToAddress == null || typeString == null)
+        {
+            _amqpMessage.clearMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        }
+        else
+        {
+            _amqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
+        }
     }
 
     @Override
@@ -210,8 +224,6 @@ public abstract class MessageImpl<T exte
         {
             _amqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
         }
-
-
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java Wed Dec  4 16:42:41 2013
@@ -20,6 +20,7 @@ package org.apache.qpid.jms.impl;
 
 import java.io.Serializable;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
 
@@ -30,13 +31,13 @@ public class ObjectMessageImpl extends M
     //message to be sent
     public ObjectMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(new AmqpObjectMessage(), sessionImpl, connectionImpl);
+        super(new AmqpObjectMessage(), sessionImpl, connectionImpl);
     }
 
     //message just received
-    public ObjectMessageImpl(AmqpObjectMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public ObjectMessageImpl(AmqpObjectMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
     {
-        super(amqpMessage, sessionImpl, connectionImpl);
+        super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Wed Dec  4 16:42:41 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.impl;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -33,12 +34,14 @@ public class ReceiverImpl extends LinkIm
 {
     private final AmqpReceiver _amqpReceiver;
     private final SessionImpl _sessionImpl;
+    private final Destination _recieverDestination;
 
-    public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver)
+    public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver, Destination recieverDestination)
     {
         super(connectionImpl, amqpReceiver);
         _sessionImpl = sessionImpl;
         _amqpReceiver = amqpReceiver;
+        _recieverDestination = recieverDestination;
     }
 
     @Override
@@ -64,7 +67,7 @@ public class ReceiverImpl extends LinkIm
             AmqpMessage receivedAmqpMessage = messageReceievedCondition.getReceivedMessage();
 
             //TODO: don't create a new factory for every message
-            Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl());
+            Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl(), _recieverDestination);
 
             //TODO: accepting/settling will be acknowledge-mode dependent
             if(_sessionImpl.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE)

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Wed Dec  4 16:42:41 2013
@@ -98,13 +98,13 @@ public class SessionImpl implements Sess
         }
     }
 
-    private ReceiverImpl createReceiver(String address) throws JMSException
+    private ReceiverImpl createReceiver(String address, Destination recieverDestination) throws JMSException
     {
         _connectionImpl.lock();
         try
         {
             AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(address);
-            ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver);
+            ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver, recieverDestination);
             _connectionImpl.stateChanged();
             receiver.establish();
 
@@ -199,7 +199,7 @@ public class SessionImpl implements Sess
         else if (destination instanceof Queue)
         {
             Queue queue = (Queue) destination;
-            return createReceiver(queue.getQueueName());
+            return createReceiver(queue.getQueueName(), destination);
         }
         else if(destination instanceof Topic)
         {

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java Wed Dec  4 16:42:41 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.jms.impl;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.StreamMessage;
 
@@ -28,13 +29,13 @@ public class StreamMessageImpl extends M
     //message to be sent
     public StreamMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(new AmqpListMessage(), sessionImpl, connectionImpl);
+        super(new AmqpListMessage(), sessionImpl, connectionImpl);
     }
 
     //message just received
-    public StreamMessageImpl(AmqpListMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public StreamMessageImpl(AmqpListMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
     {
-        super(amqpMessage, sessionImpl, connectionImpl);
+        super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java Wed Dec  4 16:42:41 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.jms.impl;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.TextMessage;
 
@@ -28,13 +29,13 @@ public class TextMessageImpl extends Mes
     //message to be sent
     public TextMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(new AmqpTextMessage(), sessionImpl, connectionImpl);
+        super(new AmqpTextMessage(), sessionImpl, connectionImpl);
     }
 
     //message just received
-    public TextMessageImpl(AmqpTextMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public TextMessageImpl(AmqpTextMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
     {
-        super(amqpMessage, sessionImpl, connectionImpl);
+        super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
     }
 
     @Override

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Wed Dec  4 16:42:41 2013
@@ -26,16 +26,21 @@ import static org.junit.Assert.assertNul
 import static org.junit.Assert.assertTrue;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
+import org.apache.qpid.jms.impl.DestinationHelper;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -181,4 +186,198 @@ public class MessageIntegrationTest exte
             assertEquals(DOUBLE_PROP_VALUE, receivedMessage.getDoubleProperty(DOUBLE_PROP), 0.0);
         }
     }
+
+    /**
+     * Tests that the {@link DestinationHelper#TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
+     * indicate its 'to' address represents a Topic results in the JMSDestination object being a
+     * Topic. Ensure the consumers destination is not used by consuming from a Queue.
+     */
+    @Test
+    public void testReceivedMessageFromQueueWithToTypeAnnotationForTopic() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            String myTopicAddress = "myTopicAddress";
+            props.setTo(myTopicAddress );
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+
+            Destination dest = receivedMessage.getJMSDestination();
+            assertTrue(dest instanceof Topic);
+            assertEquals(myTopicAddress, ((Topic)dest).getTopicName());
+        }
+    }
+
+    /**
+     * Tests that the lack of a 'to' in the Properties section of the incoming message (e.g
+     * one sent by a non-JMS client) is handled by making the JMSDestination method simply
+     * return the Destination used to create the consumer that received the message.
+     */
+    @Test
+    public void testReceivedMessageFromQueueWithoutToResultsInUseOfConsumerDestination() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+
+            Destination dest = receivedMessage.getJMSDestination();
+            assertTrue(dest instanceof Queue);
+            assertEquals(queueName, ((Queue)dest).getQueueName());
+        }
+    }
+
+
+    /**
+     * Tests that the {@link DestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
+     * indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a
+     * Topic. Ensure the consumers destination is not used by consuming from a Queue.
+     */
+    @Test
+    public void testReceivedMessageFromQueueWithReplyToTypeAnnotationForTopic() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            String myTopicAddress = "myTopicAddress";
+            props.setReplyTo(myTopicAddress);
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+
+            Destination dest = receivedMessage.getJMSReplyTo();
+            assertTrue(dest instanceof Topic);
+            assertEquals(myTopicAddress, ((Topic)dest).getTopicName());
+        }
+    }
+
+    /**
+     * Tests that lack of the {@link DestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a
+     * message to indicate type of its 'reply-to' address results in it being classed as the same
+     * type as the destination used to create the consumer.
+     */
+    @Test
+    public void testReceivedMessageFromQueueWithReplyToWithoutTypeAnnotationResultsInUseOfConsumerDestinationType() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            String myOtherQueueAddress = "myOtherQueueAddress";
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setReplyTo(myOtherQueueAddress);
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+
+            Destination dest = receivedMessage.getJMSReplyTo();
+            assertTrue(dest instanceof Queue);
+            assertEquals(myOtherQueueAddress, ((Queue)dest).getQueueName());
+        }
+    }
+
+    /**
+     * Tests that lack of the reply-to set on a message results in it returning null for JMSReplyTo
+     * and not the consumer destination as happens for JMSDestination.
+     */
+    @Test
+    public void testReceivedMessageFromQueueWithNoReplyToReturnsNull() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertNull(receivedMessage.getJMSReplyTo());
+        }
+    }
 }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java Wed Dec  4 16:42:41 2013
@@ -207,7 +207,7 @@ public class AmqpMessageTest extends Qpi
 
         String toAddress = testAmqpMessage.getTo();
         assertNotNull(toAddress);
-        assertEquals(testToAddress, testAmqpMessage.getTo());
+        assertEquals(testToAddress, toAddress);
     }
 
     @Test
@@ -241,6 +241,80 @@ public class AmqpMessageTest extends Qpi
         assertEquals(testToAddress, testAmqpMessage.getTo());
     }
 
+    @Test
+    public void testGetReplyToWithReceivedMessageWithNoProperties()
+    {
+        Message message = Proton.message();
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+        String replyToAddress = testAmqpMessage.getReplyTo();
+        assertNull(replyToAddress);
+    }
+
+    @Test
+    public void testGetReplyToWithReceivedMessageWithPropertiesButNoReplyTo()
+    {
+        Message message = Proton.message();
+
+        Properties props = new Properties();
+        props.setContentType(Symbol.valueOf("content-type"));
+        message.setProperties(props);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+        String replyToAddress = testAmqpMessage.getReplyTo();
+        assertNull(replyToAddress);
+    }
+
+    @Test
+    public void testGetReplyToWithReceivedMessage()
+    {
+        String testReplyToAddress = "myTestAddress";
+
+        Message message = Proton.message();
+
+        Properties props = new Properties();
+        props.setReplyTo(testReplyToAddress);
+        message.setProperties(props);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+        String replyToAddress = testAmqpMessage.getReplyTo();
+        assertNotNull(replyToAddress);
+        assertEquals(testReplyToAddress, replyToAddress);
+    }
+
+    @Test
+    public void testSetReplyTo()
+    {
+        String testReplyToAddress = "myTestAddress";
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        Message underlyingMessage = testAmqpMessage.getMessage();
+        assertNull(underlyingMessage.getReplyTo());
+
+        testAmqpMessage.setReplyTo(testReplyToAddress);
+
+        assertNotNull(underlyingMessage.getReplyTo());
+        assertEquals(testReplyToAddress, underlyingMessage.getReplyTo());
+    }
+
+    @Test
+    public void testSetGetReplyTo()
+    {
+        String testReplyToAddress = "myTestAddress";
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+        assertNull(testAmqpMessage.getReplyTo());
+
+        testAmqpMessage.setReplyTo(testReplyToAddress);
+
+        assertNotNull(testAmqpMessage.getReplyTo());
+        assertEquals(testReplyToAddress, testAmqpMessage.getReplyTo());
+    }
+
     // ====== Message Annotations =======
 
     @Test

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java Wed Dec  4 16:42:41 2013
@@ -53,6 +53,7 @@ public class BytesMessageImplTest extend
         _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
         _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
         _mockSessionImpl = Mockito.mock(SessionImpl.class);
+        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
     }
 
     @Test
@@ -60,7 +61,7 @@ public class BytesMessageImplTest extend
     {
         Message message = Proton.message();
         AmqpBytesMessage testAmqpMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
-        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl, null);
 
         assertEquals(0, bytesMessageImpl.getBodyLength());
     }
@@ -70,7 +71,7 @@ public class BytesMessageImplTest extend
     {
         Message message = Proton.message();
         AmqpBytesMessage testAmqpMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
-        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl, null);
 
         //verify attempting to read bytes returns -1, i.e EOS
         assertEquals(-1, bytesMessageImpl.readBytes(new byte[2]));
@@ -85,7 +86,7 @@ public class BytesMessageImplTest extend
         message.setBody(new Data(new Binary(bytes)));
 
         AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
-        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl, null);
 
         //retrieve the expected bytes, check they match
         byte[] receivedBytes = new byte[bytes.length];
@@ -107,7 +108,7 @@ public class BytesMessageImplTest extend
         message.setBody(new AmqpValue(new Binary(bytes)));
 
         AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
-        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl);
+        BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl, null);
 
         //retrieve the expected bytes, check they match
         byte[] receivedBytes = new byte[bytes.length];

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java Wed Dec  4 16:42:41 2013
@@ -54,23 +54,83 @@ public class DestinationHelperTest exten
     }
 
     @Test
-    public void testDecodeDestinationWithoutTypeAnnotation() throws Exception
+    public void testDecodeDestinationWithNullAddressAndNullConsumerDestReturnsNull() throws Exception
+    {
+        assertNull(_helper.decodeDestination(null, DestinationHelper.QUEUE_ATTRIBUTES_STRING, null, false));
+    }
+
+    @Test
+    public void testDecodeDestinationWithNullAddressWithConsumerDestReturnsSameConsumerDestObject() throws Exception
+    {
+        String consumerDestString = "consumerDest";
+        Queue consumerDest = _helper.createQueue(consumerDestString);
+        assertSame(consumerDest,_helper.decodeDestination(null, DestinationHelper.QUEUE_ATTRIBUTES_STRING, consumerDest, false));
+    }
+
+    @Test
+    public void testDecodeDestinationWithNullAddressWithConsumerDestReturnsNullWhenUsingConsumerDestForTypeOnly() throws Exception
+    {
+        String consumerDestString = "consumerDest";
+        Queue consumerDest = _helper.createQueue(consumerDestString);
+        assertNull(_helper.decodeDestination(null, DestinationHelper.QUEUE_ATTRIBUTES_STRING, consumerDest, true));
+    }
+
+    @Test
+    public void testDecodeDestinationWithoutTypeAnnotationWithoutConsumerDest() throws Exception
+    {
+        String testAddress = "testAddress";
+        Destination dest = _helper.decodeDestination(testAddress, null, null, false);
+        assertNotNull(dest);
+
+        assertTrue(dest instanceof DestinationImpl);
+        assertEquals(testAddress, ((DestinationImpl) dest).getAddress());
+    }
+
+    @Test
+    public void testDecodeDestinationWithoutTypeAnnotationWithQueueConsumerDest() throws Exception
+    {
+        String testAddress = "testAddress";
+        String consumerDestString = "consumerDest";
+        Queue consumerDest = _helper.createQueue(consumerDestString);
+        Destination dest = _helper.decodeDestination(testAddress, null, consumerDest, false);
+        assertNotNull(dest);
+
+        assertTrue(dest instanceof QueueImpl);
+        assertEquals(testAddress, ((Queue)dest).getQueueName());
+    }
+
+    @Test
+    public void testDecodeDestinationWithoutTypeAnnotationWithTopicConsumerDest() throws Exception
+    {
+        String testAddress = "testAddress";
+        String consumerDestString = "consumerDest";
+        Topic consumerDest = _helper.createTopic(consumerDestString);
+        Destination dest = _helper.decodeDestination(testAddress, null, consumerDest, false);
+        assertNotNull(dest);
+
+        assertTrue(dest instanceof TopicImpl);
+        assertEquals(testAddress, ((Topic)dest).getTopicName());
+    }
+
+    @Test
+    public void testDecodeDestinationWithoutTypeAnnotationWithDestinationConsumerDest() throws Exception
     {
         String testAddress = "testAddress";
-        Destination dest = _helper.decodeDestination(testAddress, null);
+        Destination consumerDest = new DestinationImpl(testAddress);
+        Destination dest = _helper.decodeDestination(testAddress, null, consumerDest, false);
         assertNotNull(dest);
 
-        //TODO: this test will need to expand for classification of receiver type in future
         assertTrue(dest instanceof DestinationImpl);
+        assertEquals(testAddress, ((DestinationImpl) dest).getAddress());
     }
 
     @Test
-    public void testDecodeDestinationWithQueueTypeAnnotation() throws Exception
+    public void testDecodeDestinationWithQueueTypeAnnotationWithoutConsumerDest() throws Exception
     {
         String testAddress = "testAddress";
         String testTypeAnnotation = "queue";
 
-        Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
+        Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
         assertNotNull(dest);
         assertTrue(dest instanceof DestinationImpl);
         assertTrue(dest instanceof QueueImpl);
@@ -79,12 +139,12 @@ public class DestinationHelperTest exten
     }
 
     @Test
-    public void testDecodeDestinationWithTopicTypeAnnotation() throws Exception
+    public void testDecodeDestinationWithTopicTypeAnnotationWithoutConsumerDest() throws Exception
     {
         String testAddress = "testAddress";
         String testTypeAnnotation = "topic";
 
-        Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
+        Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
         assertNotNull(dest);
         assertTrue(dest instanceof DestinationImpl);
         assertTrue(dest instanceof TopicImpl);
@@ -102,8 +162,8 @@ public class DestinationHelperTest exten
 
         try
         {
-            Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
-            fail("expected exceptionnow thrown");
+            _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
+            fail("expected exception not thrown");
         }
         catch(IllegalArgumentException iae)
         {
@@ -112,8 +172,8 @@ public class DestinationHelperTest exten
 
         try
         {
-            Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotationBackwards);
-            fail("expected exceptionnow thrown");
+            _helper.decodeDestination(testAddress, testTypeAnnotationBackwards, null, false);
+            fail("expected exception not thrown");
         }
         catch(IllegalArgumentException iae)
         {
@@ -131,8 +191,8 @@ public class DestinationHelperTest exten
 
         try
         {
-            Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
-            fail("expected exceptionnow thrown");
+            _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
+            fail("expected exception not thrown");
         }
         catch(IllegalArgumentException iae)
         {
@@ -141,8 +201,8 @@ public class DestinationHelperTest exten
 
         try
         {
-            Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotationBackwards);
-            fail("expected exceptionnow thrown");
+            _helper.decodeDestination(testAddress, testTypeAnnotationBackwards, null, false);
+            fail("expected exception not thrown");
         }
         catch(IllegalArgumentException iae)
         {
@@ -203,7 +263,7 @@ public class DestinationHelperTest exten
 
         try
         {
-            Destination dest = _helper.convertToQpidDestination(mockTempQueue);
+            _helper.convertToQpidDestination(mockTempQueue);
             fail("excepted exception not thrown");
         }
         catch(IllegalArgumentException iae)
@@ -222,7 +282,7 @@ public class DestinationHelperTest exten
 
         try
         {
-            Destination dest = _helper.convertToQpidDestination(mockTempTopic);
+            _helper.convertToQpidDestination(mockTempTopic);
             fail("excepted exception not thrown");
         }
         catch(IllegalArgumentException iae)

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java Wed Dec  4 16:42:41 2013
@@ -51,6 +51,7 @@ public class MessageFactoryImplTest exte
         super.setUp();
         _mockConnection = Mockito.mock(ConnectionImpl.class);
         _mockSession = Mockito.mock(SessionImpl.class);
+        Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
         _messageFactoryImpl = new MessageFactoryImpl();
     }
 
@@ -58,7 +59,7 @@ public class MessageFactoryImplTest exte
     public void testCreateJmsMessageWithAmqpGenericMessage() throws Exception
     {
         AmqpMessage amqpMessage = Mockito.mock(AmqpGenericMessage.class);
-        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
         assertEquals(GenericAmqpMessageImpl.class, jmsMessage.getClass());
     }
 
@@ -66,7 +67,7 @@ public class MessageFactoryImplTest exte
     public void testCreateJmsMessageWithAmqpTextMessage() throws Exception
     {
         AmqpMessage amqpMessage = Mockito.mock(AmqpTextMessage.class);
-        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
         assertEquals(TextMessageImpl.class, jmsMessage.getClass());
     }
 
@@ -74,7 +75,7 @@ public class MessageFactoryImplTest exte
     public void testCreateJmsMessageWithAmqpBytesMessage() throws Exception
     {
         AmqpMessage amqpMessage = Mockito.mock(AmqpBytesMessage.class);
-        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
         assertEquals(BytesMessageImpl.class, jmsMessage.getClass());
     }
 
@@ -82,7 +83,7 @@ public class MessageFactoryImplTest exte
     public void testCreateJmsMessageWithAmqpObjectMessage() throws Exception
     {
         AmqpMessage amqpMessage = Mockito.mock(AmqpObjectMessage.class);
-        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
         assertEquals(ObjectMessageImpl.class, jmsMessage.getClass());
     }
 
@@ -90,7 +91,7 @@ public class MessageFactoryImplTest exte
     public void testCreateJmsMessageWithAmqpListMessage() throws Exception
     {
         AmqpMessage amqpMessage = Mockito.mock(AmqpListMessage.class);
-        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
         assertEquals(StreamMessageImpl.class, jmsMessage.getClass());
     }
 
@@ -98,7 +99,7 @@ public class MessageFactoryImplTest exte
     public void testCreateJmsMessageWithAmqpMapMessage() throws Exception
     {
         AmqpMessage amqpMessage = Mockito.mock(AmqpMapMessage.class);
-        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+        Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
         assertEquals(MapMessageImpl.class, jmsMessage.getClass());
     }
 
@@ -109,7 +110,7 @@ public class MessageFactoryImplTest exte
         AmqpMessage amqpMessage = Mockito.mock(AmqpMessage.class);
         try
         {
-            _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+            _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
             fail("expected exception was not thrown");
         }
         catch(JMSException jmse)

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java Wed Dec  4 16:42:41 2013
@@ -52,8 +52,11 @@ public class MessageImplTest extends Qpi
     public void setUp() throws Exception
     {
         super.setUp();
+
         _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
         _mockSessionImpl = Mockito.mock(SessionImpl.class);
+        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
+
         _testAmqpMessage = new TestAmqpMessage();
         _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
 
@@ -460,8 +463,6 @@ public class MessageImplTest extends Qpi
     @Test
     public void testSetJMSDestinationOnNewMessageUsingQueue() throws Exception
     {
-        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
-
         assertNull(_testAmqpMessage.getTo());
 
         _testMessage.setJMSDestination(_mockQueue);
@@ -477,8 +478,6 @@ public class MessageImplTest extends Qpi
     @Test
     public void testSetJMSDestinationOnNewMessageUsingTopic() throws Exception
     {
-        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
-
         assertNull(_testAmqpMessage.getTo());
 
         _testMessage.setJMSDestination(_mockTopic);
@@ -494,11 +493,10 @@ public class MessageImplTest extends Qpi
     @Test
     public void testSetJMSDestinationNullOnRecievedMessageWithToAndTypeAnnotationClearsTheAnnotation() throws Exception
     {
-        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
         _testAmqpMessage.setTo(_mockTopicName);
         _testAmqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
                                               DestinationHelper.TOPIC_ATTRIBUTES_STRING);
-        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
 
         assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
         assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
@@ -512,7 +510,6 @@ public class MessageImplTest extends Qpi
     @Test
     public void testSetGetJMSDestinationOnNewMessage() throws Exception
     {
-        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
         _testMessage.setJMSDestination(_mockQueue);
         assertNotNull(_testMessage.getJMSDestination());
         assertSame(_mockQueue, _testMessage.getJMSDestination());
@@ -521,9 +518,8 @@ public class MessageImplTest extends Qpi
     @Test
     public void testGetJMSDestinationOnRecievedMessageWithToButWithoutToTypeAnnotation() throws Exception
     {
-        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
         _testAmqpMessage.setTo(_mockQueueName);
-        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
 
         assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
 
@@ -534,11 +530,10 @@ public class MessageImplTest extends Qpi
     @Test
     public void testGetJMSDestinationOnRecievedMessageWithToAndTypeAnnotationForTopic() throws Exception
     {
-        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
         _testAmqpMessage.setTo(_mockTopicName);
         _testAmqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
                                               DestinationHelper.TOPIC_ATTRIBUTES_STRING);
-        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
 
         assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
 
@@ -549,11 +544,10 @@ public class MessageImplTest extends Qpi
     @Test
     public void testGetJMSDestinationOnRecievedMessageWithToAndTypeAnnotationForQueue() throws Exception
     {
-        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
         _testAmqpMessage.setTo(_mockQueueName);
         _testAmqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
                                               DestinationHelper.QUEUE_ATTRIBUTES_STRING);
-        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
 
         assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
 
@@ -561,6 +555,112 @@ public class MessageImplTest extends Qpi
         assertEquals(newDestinationExpected, _testMessage.getJMSDestination());
     }
 
+    //TODO:new
+    // ====== JMSReplyTo =======
+
+    @Test
+    public void testGetJMSReplyToOnNewMessage() throws Exception
+    {
+        //Should be null as it has not been set explicitly, and
+        // the message has not been received from anywhere
+        assertNull(_testMessage.getJMSReplyTo());
+    }
+
+    @Test
+    public void testSetJMSJMSReplyToOnNewMessageUsingQueue() throws Exception
+    {
+        assertNull(_testAmqpMessage.getReplyTo());
+
+        _testMessage.setJMSReplyTo(_mockQueue);
+
+        assertNotNull(_testAmqpMessage.getReplyTo());
+        assertEquals(_mockQueueName, _testAmqpMessage.getReplyTo());
+
+        assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+        assertEquals(DestinationHelper.QUEUE_ATTRIBUTES_STRING,
+                     _testAmqpMessage.getMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+    }
+
+    @Test
+    public void testSetJMSReplyToOnNewMessageUsingTopic() throws Exception
+    {
+        assertNull(_testAmqpMessage.getReplyTo());
+
+        _testMessage.setJMSReplyTo(_mockTopic);
+
+        assertNotNull(_testAmqpMessage.getReplyTo());
+        assertEquals(_mockTopicName, _testAmqpMessage.getReplyTo());
+
+        assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+        assertEquals(DestinationHelper.TOPIC_ATTRIBUTES_STRING,
+                     _testAmqpMessage.getMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+    }
+
+    @Test
+    public void testSetJMSReplyToNullOnRecievedMessageWithReplyToAndTypeAnnotationClearsTheAnnotation() throws Exception
+    {
+        _testAmqpMessage.setReplyTo(_mockTopicName);
+        _testAmqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
+                                              DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+        assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+
+        _testMessage.setJMSReplyTo(null);
+
+        assertNull("expected JMSReplyTo value to be null", _testMessage.getJMSReplyTo());
+        assertFalse(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+    }
+
+    @Test
+    public void testSetGetJMSReplyToNewMessage() throws Exception
+    {
+        _testMessage.setJMSReplyTo(_mockQueue);
+        assertNotNull(_testMessage.getJMSReplyTo());
+        assertSame(_mockQueue, _testMessage.getJMSReplyTo());
+    }
+
+    @Test
+    public void testGetJMSReplyToRecievedMessageWithReplyToButWithoutReplyToTypeAnnotation() throws Exception
+    {
+        _testAmqpMessage.setReplyTo(_mockQueueName);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+
+        Destination newDestinationExpected = new DestinationImpl(_mockQueueName);
+        assertEquals(newDestinationExpected, _testMessage.getJMSReplyTo());
+    }
+
+    @Test
+    public void testGetJMSReplyToOnRecievedMessageWithReplyToAndTypeAnnotationForTopic() throws Exception
+    {
+        _testAmqpMessage.setReplyTo(_mockTopicName);
+        _testAmqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
+                                              DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+
+        Topic newDestinationExpected = new DestinationHelper().createTopic(_mockTopicName);
+        assertEquals(newDestinationExpected, _testMessage.getJMSReplyTo());
+    }
+
+    @Test
+    public void testGetJMSReplyToRecievedMessageWithReplyToAndTypeAnnotationForQueue() throws Exception
+    {
+        _testAmqpMessage.setReplyTo(_mockQueueName);
+        _testAmqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
+                                              DestinationHelper.QUEUE_ATTRIBUTES_STRING);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+        assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+
+        Queue newDestinationExpected = new DestinationHelper().createQueue(_mockQueueName);
+        assertEquals(newDestinationExpected, _testMessage.getJMSReplyTo());
+    }
+
     // ====== JMSTimestamp =======
 
     @Test
@@ -594,7 +694,7 @@ public class MessageImplTest extends Qpi
     {
         long timestamp = System.currentTimeMillis();
         _testAmqpMessage.setCreationTime(timestamp);
-        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
 
         assertEquals("expected JMSTimestamp value not present", timestamp, _testMessage.getJMSTimestamp());
     }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java Wed Dec  4 16:42:41 2013
@@ -51,6 +51,7 @@ public class ReceiverImplTest extends Qp
         _mockConnection = Mockito.mock(ConnectionImpl.class);
         _mockAmqpReceiver = Mockito.mock(AmqpReceiver.class);
         _mockSession = Mockito.mock(SessionImpl.class);
+        Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
         _mockAmqpMessage = Mockito.mock(AmqpGenericMessage.class);
     }
 
@@ -62,7 +63,7 @@ public class ReceiverImplTest extends Qp
 
         ImmediateWaitUntil.mockWaitUntil(_mockConnection);
 
-        ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+        ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver, null);
 
         assertNull("Should not receive a message when connection is not started", receiver.receive(1));
     }
@@ -77,7 +78,7 @@ public class ReceiverImplTest extends Qp
 
         ImmediateWaitUntil.mockWaitUntil(_mockConnection);
 
-        ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+        ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver, null);
 
         Message message = receiver.receive(1);
         assertNotNull("Should receive a message when connection is started", message);

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java Wed Dec  4 16:42:41 2013
@@ -1,12 +1,21 @@
 package org.apache.qpid.jms.impl;
 
+import javax.jms.Destination;
+
 import org.apache.qpid.jms.engine.TestAmqpMessage;
 
 public class TestMessageImpl extends MessageImpl<TestAmqpMessage>
 {
+    //message to be sent
     public TestMessageImpl(TestAmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
     {
-        super(amqpMessage,sessionImpl,connectionImpl);
+        super(amqpMessage, sessionImpl, connectionImpl);
+    }
+
+    //message just recieved
+    public TestMessageImpl(TestAmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination)
+    {
+        super(amqpMessage, sessionImpl, connectionImpl, null);
     }
 
     @Override

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java Wed Dec  4 16:42:41 2013
@@ -49,6 +49,7 @@ public class TextMessageImplTest extends
         _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
         _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
         _mockSessionImpl = Mockito.mock(SessionImpl.class);
+        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
     }
 
     @Test
@@ -77,7 +78,7 @@ public class TextMessageImplTest extends
     {
         Message message = Proton.message();
         AmqpTextMessage testAmqpMessage1 = new AmqpTextMessage(_mockDelivery, message, _mockAmqpConnection);
-        TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl);
+        TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl, null);
 
         assertNull("expected null string", textMessageImpl.getText());
     }
@@ -89,7 +90,7 @@ public class TextMessageImplTest extends
         String value = "myAmqpValueString";
         message.setBody(new AmqpValue(value));
         AmqpTextMessage testAmqpMessage1 = new AmqpTextMessage(_mockDelivery, message, _mockAmqpConnection);
-        TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl);
+        TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl, null);
 
         assertEquals(value, textMessageImpl.getText());
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org