You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/12/04 17:42:42 UTC
svn commit: r1547839 - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/engine/
test/java/org/apache/qpid/jms/impl/
Author: robbie
Date: Wed Dec 4 16:42:41 2013
New Revision: 1547839
URL: http://svn.apache.org/r1547839
Log:
QPIDJMS-9: add support for JMSReplyTo, add use of consumer destination to aid JMSDestination/JMSReplyTo destination type/value discovery when we cant get the type from the type-related annotations or the 'to' field isnt set on the message
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Wed Dec 4 16:42:41 2013
@@ -217,6 +217,11 @@ public abstract class AmqpMessage
//===== Properties ======
+ public String getContentType()
+ {
+ return _message.getContentType();
+ }
+
public void setContentType(String contentType)
{
_message.setContentType(contentType);
@@ -242,6 +247,16 @@ public abstract class AmqpMessage
_message.setCreationTime(timeInMillis);
}
+ public String getReplyTo()
+ {
+ return _message.getReplyTo();
+ }
+
+ public void setReplyTo(String replyTo)
+ {
+ _message.setReplyTo(replyTo);
+ }
+
//===== Application Properties ======
private void createApplicationProperties()
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java Wed Dec 4 16:42:41 2013
@@ -25,6 +25,7 @@ import java.io.EOFException;
import java.io.IOException;
import javax.jms.BytesMessage;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
@@ -40,15 +41,15 @@ public class BytesMessageImpl extends Me
//message to be sent
public BytesMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(new AmqpBytesMessage(), sessionImpl, connectionImpl);
+ super(new AmqpBytesMessage(), sessionImpl, connectionImpl);
_bytesOut = new ByteArrayOutputStream();
_dataAsOutput = new DataOutputStream(_bytesOut);
}
//message just received
- public BytesMessageImpl(AmqpBytesMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public BytesMessageImpl(AmqpBytesMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
{
- super(amqpMessage, sessionImpl, connectionImpl);
+ super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
_dataIn = new DataInputStream(amqpMessage.getByteArrayInputStream());
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/DestinationHelper.java Wed Dec 4 16:42:41 2013
@@ -33,6 +33,7 @@ import javax.jms.Topic;
public class DestinationHelper
{
public static final String TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-to-type";
+ public static final String REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-reply-type";
static final String QUEUE_ATTRIBUTE = "queue";
static final String TOPIC_ATTRIBUTE = "topic";
@@ -57,7 +58,23 @@ public class DestinationHelper
return new TopicImpl(address);
}
- public Destination decodeDestination(String address, String typeString)
+ private Destination createBaseDestination(String address)
+ {
+ return new DestinationImpl(address);
+ }
+
+ /**
+ * Decode the provided address, type description, and consumer destination information such that
+ * an appropriate Destination object can be returned.
+ *
+ * If an address and type description is provided then this will be used to create the Destination. If
+ * the type information is missing, it will be derived from the consumer destination if present, or
+ * default to a generic destination if not.
+ *
+ * If the address is null then the consumer destination is returned, unless the
+ * useConsumerDestForTypeOnly flag is true, in which case null will be returned.
+ */
+ public Destination decodeDestination(String address, String typeString, Destination consumerDestination, boolean useConsumerDestForTypeOnly)
{
Set<String> typeSet = null;
@@ -66,21 +83,17 @@ public class DestinationHelper
typeSet = splitAttributes(typeString);
}
- return createDestination(address, typeSet);
+ return createDestination(address, typeSet, consumerDestination, useConsumerDestForTypeOnly);
}
- private Destination createDestination(String address, Set<String> typeSet)
+ private Destination createDestination(String address, Set<String> typeSet, Destination consumerDestination, boolean useConsumerDestForTypeOnly)
{
if(address == null)
{
- return null;
+ return useConsumerDestForTypeOnly ? null : consumerDestination;
}
- if(typeSet == null || typeSet.isEmpty())
- {
- //TODO: characterise Destination used to create the receiver, and create that type
- }
- else
+ if(typeSet != null && !typeSet.isEmpty())
{
if(typeSet.contains(QUEUE_ATTRIBUTE))
{
@@ -108,8 +121,27 @@ public class DestinationHelper
}
}
+ if(consumerDestination instanceof TemporaryQueue)
+ {
+ //TODO
+ throw new IllegalArgumentException("Unsupported Destination type: " + consumerDestination.getClass().getName());
+ }
+ else if(consumerDestination instanceof TemporaryTopic)
+ {
+ //TODO
+ throw new IllegalArgumentException("Unsupported Destination type: " + consumerDestination.getClass().getName());
+ }
+ else if(consumerDestination instanceof Queue)
+ {
+ return createQueue(address);
+ }
+ else if(consumerDestination instanceof Topic)
+ {
+ return createTopic(address);
+ }
+
//fall back to a straight Destination
- return new DestinationImpl(address);
+ return createBaseDestination(address);
}
public Destination convertToQpidDestination(Destination dest) throws JMSException
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java Wed Dec 4 16:42:41 2013
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.jms.impl;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -28,13 +29,13 @@ public class GenericAmqpMessageImpl exte
//message to be sent
public GenericAmqpMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(new AmqpGenericMessage(), sessionImpl, connectionImpl);
+ super(new AmqpGenericMessage(), sessionImpl, connectionImpl);
}
//message just received
- public GenericAmqpMessageImpl(AmqpGenericMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public GenericAmqpMessageImpl(AmqpGenericMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
{
- super(amqpMessage, sessionImpl, connectionImpl);
+ super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java Wed Dec 4 16:42:41 2013
@@ -20,6 +20,7 @@ package org.apache.qpid.jms.impl;
import java.util.Enumeration;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
@@ -30,13 +31,13 @@ public class MapMessageImpl extends Mess
//message to be sent
public MapMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(new AmqpMapMessage(), sessionImpl, connectionImpl);
+ super(new AmqpMapMessage(), sessionImpl, connectionImpl);
}
//message just received
- public MapMessageImpl(AmqpMapMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public MapMessageImpl(AmqpMapMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
{
- super(amqpMessage, sessionImpl, connectionImpl);
+ super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java Wed Dec 4 16:42:41 2013
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms.impl;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -33,31 +34,31 @@ import org.apache.qpid.jms.engine.AmqpTe
public class MessageFactoryImpl
{
- Message createJmsMessage(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ Message createJmsMessage(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
{
if(amqpMessage instanceof AmqpTextMessage)
{
- return new TextMessageImpl((AmqpTextMessage) amqpMessage, sessionImpl, connectionImpl);
+ return new TextMessageImpl((AmqpTextMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
else if(amqpMessage instanceof AmqpBytesMessage)
{
- return new BytesMessageImpl((AmqpBytesMessage) amqpMessage, sessionImpl, connectionImpl);
+ return new BytesMessageImpl((AmqpBytesMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
else if(amqpMessage instanceof AmqpObjectMessage)
{
- return new ObjectMessageImpl((AmqpObjectMessage) amqpMessage, sessionImpl, connectionImpl);
+ return new ObjectMessageImpl((AmqpObjectMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
else if(amqpMessage instanceof AmqpListMessage)
{
- return new StreamMessageImpl((AmqpListMessage) amqpMessage, sessionImpl, connectionImpl);
+ return new StreamMessageImpl((AmqpListMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
else if(amqpMessage instanceof AmqpMapMessage)
{
- return new MapMessageImpl((AmqpMapMessage) amqpMessage, sessionImpl, connectionImpl);
+ return new MapMessageImpl((AmqpMapMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
else if(amqpMessage instanceof AmqpGenericMessage)
{
- return new GenericAmqpMessageImpl((AmqpGenericMessage) amqpMessage, sessionImpl, connectionImpl);
+ return new GenericAmqpMessageImpl((AmqpGenericMessage) amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
else
{
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Wed Dec 4 16:42:41 2013
@@ -34,26 +34,28 @@ public abstract class MessageImpl<T exte
private final T _amqpMessage;
private final SessionImpl _sessionImpl;
private Destination _destination;
+ private Destination _replyTo;
+ //message to be sent
public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
{
_amqpMessage = amqpMessage;
_sessionImpl = sessionImpl;
+ }
+
+ //message just received
+ public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination)
+ {
+ _amqpMessage = amqpMessage;
+ _sessionImpl = sessionImpl;
String to = _amqpMessage.getTo();
- if(to != null)
- {
- String typeString = (String) _amqpMessage.getMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
- _destination = sessionImpl.getDestinationHelper().decodeDestination(to, typeString);
- }
- else
- {
- //TODO:
- //Message doesn't have a To. If this message was received via a
- //consumer (i.e we aren't creating this message to send), as a fallback
- //we could set the Destination used to create the consumer itself. That
- //responsibility might fall to the consumer though.
- }
+ String toTypeString = (String) _amqpMessage.getMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ _destination = sessionImpl.getDestinationHelper().decodeDestination(to, toTypeString, consumerDestination, false);
+
+ String replyTo = _amqpMessage.getReplyTo();
+ String replyToTypeString = (String) _amqpMessage.getMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ _replyTo = sessionImpl.getDestinationHelper().decodeDestination(replyTo, replyToTypeString, consumerDestination, true);
}
T getUnderlyingAmqpMessage(boolean prepareForSending)
@@ -175,15 +177,27 @@ public abstract class MessageImpl<T exte
@Override
public Destination getJMSReplyTo() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ return _replyTo;
}
@Override
public void setJMSReplyTo(Destination replyTo) throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ _replyTo = replyTo;
+
+ String replyToAddress = _sessionImpl.getDestinationHelper().decodeAddress(_replyTo);
+ String typeString = _sessionImpl.getDestinationHelper().decodeTypeString(_replyTo);
+
+ _amqpMessage.setReplyTo(replyToAddress);
+
+ if(replyToAddress == null || typeString == null)
+ {
+ _amqpMessage.clearMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ }
+ else
+ {
+ _amqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
+ }
}
@Override
@@ -210,8 +224,6 @@ public abstract class MessageImpl<T exte
{
_amqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
}
-
-
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java Wed Dec 4 16:42:41 2013
@@ -20,6 +20,7 @@ package org.apache.qpid.jms.impl;
import java.io.Serializable;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
@@ -30,13 +31,13 @@ public class ObjectMessageImpl extends M
//message to be sent
public ObjectMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(new AmqpObjectMessage(), sessionImpl, connectionImpl);
+ super(new AmqpObjectMessage(), sessionImpl, connectionImpl);
}
//message just received
- public ObjectMessageImpl(AmqpObjectMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public ObjectMessageImpl(AmqpObjectMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
{
- super(amqpMessage, sessionImpl, connectionImpl);
+ super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Wed Dec 4 16:42:41 2013
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms.impl;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -33,12 +34,14 @@ public class ReceiverImpl extends LinkIm
{
private final AmqpReceiver _amqpReceiver;
private final SessionImpl _sessionImpl;
+ private final Destination _recieverDestination;
- public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver)
+ public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver, Destination recieverDestination)
{
super(connectionImpl, amqpReceiver);
_sessionImpl = sessionImpl;
_amqpReceiver = amqpReceiver;
+ _recieverDestination = recieverDestination;
}
@Override
@@ -64,7 +67,7 @@ public class ReceiverImpl extends LinkIm
AmqpMessage receivedAmqpMessage = messageReceievedCondition.getReceivedMessage();
//TODO: don't create a new factory for every message
- Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl());
+ Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl(), _recieverDestination);
//TODO: accepting/settling will be acknowledge-mode dependent
if(_sessionImpl.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE)
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Wed Dec 4 16:42:41 2013
@@ -98,13 +98,13 @@ public class SessionImpl implements Sess
}
}
- private ReceiverImpl createReceiver(String address) throws JMSException
+ private ReceiverImpl createReceiver(String address, Destination recieverDestination) throws JMSException
{
_connectionImpl.lock();
try
{
AmqpReceiver amqpReceiver = _amqpSession.createAmqpReceiver(address);
- ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver);
+ ReceiverImpl receiver = new ReceiverImpl(_connectionImpl, this, amqpReceiver, recieverDestination);
_connectionImpl.stateChanged();
receiver.establish();
@@ -199,7 +199,7 @@ public class SessionImpl implements Sess
else if (destination instanceof Queue)
{
Queue queue = (Queue) destination;
- return createReceiver(queue.getQueueName());
+ return createReceiver(queue.getQueueName(), destination);
}
else if(destination instanceof Topic)
{
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java Wed Dec 4 16:42:41 2013
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.jms.impl;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.StreamMessage;
@@ -28,13 +29,13 @@ public class StreamMessageImpl extends M
//message to be sent
public StreamMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(new AmqpListMessage(), sessionImpl, connectionImpl);
+ super(new AmqpListMessage(), sessionImpl, connectionImpl);
}
//message just received
- public StreamMessageImpl(AmqpListMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public StreamMessageImpl(AmqpListMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
{
- super(amqpMessage, sessionImpl, connectionImpl);
+ super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java Wed Dec 4 16:42:41 2013
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.jms.impl;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@@ -28,13 +29,13 @@ public class TextMessageImpl extends Mes
//message to be sent
public TextMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(new AmqpTextMessage(), sessionImpl, connectionImpl);
+ super(new AmqpTextMessage(), sessionImpl, connectionImpl);
}
//message just received
- public TextMessageImpl(AmqpTextMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public TextMessageImpl(AmqpTextMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination) throws JMSException
{
- super(amqpMessage, sessionImpl, connectionImpl);
+ super(amqpMessage, sessionImpl, connectionImpl, consumerDestination);
}
@Override
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Wed Dec 4 16:42:41 2013
@@ -26,16 +26,21 @@ import static org.junit.Assert.assertNul
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.qpid.jms.impl.DestinationHelper;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -181,4 +186,198 @@ public class MessageIntegrationTest exte
assertEquals(DOUBLE_PROP_VALUE, receivedMessage.getDoubleProperty(DOUBLE_PROP), 0.0);
}
}
+
+ /**
+ * Tests that the {@link DestinationHelper#TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
+ * indicate its 'to' address represents a Topic results in the JMSDestination object being a
+ * Topic. Ensure the consumers destination is not used by consuming from a Queue.
+ */
+ @Test
+ public void testReceivedMessageFromQueueWithToTypeAnnotationForTopic() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ String myTopicAddress = "myTopicAddress";
+ props.setTo(myTopicAddress );
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+
+ Destination dest = receivedMessage.getJMSDestination();
+ assertTrue(dest instanceof Topic);
+ assertEquals(myTopicAddress, ((Topic)dest).getTopicName());
+ }
+ }
+
+ /**
+ * Tests that the lack of a 'to' in the Properties section of the incoming message (e.g
+ * one sent by a non-JMS client) is handled by making the JMSDestination method simply
+ * return the Destination used to create the consumer that received the message.
+ */
+ @Test
+ public void testReceivedMessageFromQueueWithoutToResultsInUseOfConsumerDestination() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+
+ Destination dest = receivedMessage.getJMSDestination();
+ assertTrue(dest instanceof Queue);
+ assertEquals(queueName, ((Queue)dest).getQueueName());
+ }
+ }
+
+
+ /**
+ * Tests that the {@link DestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
+ * indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a
+ * Topic. Ensure the consumers destination is not used by consuming from a Queue.
+ */
+ @Test
+ public void testReceivedMessageFromQueueWithReplyToTypeAnnotationForTopic() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ String myTopicAddress = "myTopicAddress";
+ props.setReplyTo(myTopicAddress);
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+
+ Destination dest = receivedMessage.getJMSReplyTo();
+ assertTrue(dest instanceof Topic);
+ assertEquals(myTopicAddress, ((Topic)dest).getTopicName());
+ }
+ }
+
+ /**
+ * Tests that lack of the {@link DestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a
+ * message to indicate type of its 'reply-to' address results in it being classed as the same
+ * type as the destination used to create the consumer.
+ */
+ @Test
+ public void testReceivedMessageFromQueueWithReplyToWithoutTypeAnnotationResultsInUseOfConsumerDestinationType() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ String myOtherQueueAddress = "myOtherQueueAddress";
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setReplyTo(myOtherQueueAddress);
+
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+
+ Destination dest = receivedMessage.getJMSReplyTo();
+ assertTrue(dest instanceof Queue);
+ assertEquals(myOtherQueueAddress, ((Queue)dest).getQueueName());
+ }
+ }
+
+ /**
+ * Tests that lack of the reply-to set on a message results in it returning null for JMSReplyTo
+ * and not the consumer destination as happens for JMSDestination.
+ */
+ @Test
+ public void testReceivedMessageFromQueueWithNoReplyToReturnsNull() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertNull(receivedMessage.getJMSReplyTo());
+ }
+ }
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageTest.java Wed Dec 4 16:42:41 2013
@@ -207,7 +207,7 @@ public class AmqpMessageTest extends Qpi
String toAddress = testAmqpMessage.getTo();
assertNotNull(toAddress);
- assertEquals(testToAddress, testAmqpMessage.getTo());
+ assertEquals(testToAddress, toAddress);
}
@Test
@@ -241,6 +241,80 @@ public class AmqpMessageTest extends Qpi
assertEquals(testToAddress, testAmqpMessage.getTo());
}
+ @Test
+ public void testGetReplyToWithReceivedMessageWithNoProperties()
+ {
+ Message message = Proton.message();
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+ String replyToAddress = testAmqpMessage.getReplyTo();
+ assertNull(replyToAddress);
+ }
+
+ @Test
+ public void testGetReplyToWithReceivedMessageWithPropertiesButNoReplyTo()
+ {
+ Message message = Proton.message();
+
+ Properties props = new Properties();
+ props.setContentType(Symbol.valueOf("content-type"));
+ message.setProperties(props);
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+ String replyToAddress = testAmqpMessage.getReplyTo();
+ assertNull(replyToAddress);
+ }
+
+ @Test
+ public void testGetReplyToWithReceivedMessage()
+ {
+ String testReplyToAddress = "myTestAddress";
+
+ Message message = Proton.message();
+
+ Properties props = new Properties();
+ props.setReplyTo(testReplyToAddress);
+ message.setProperties(props);
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage(message, _mockDelivery, _mockAmqpConnection);
+
+ String replyToAddress = testAmqpMessage.getReplyTo();
+ assertNotNull(replyToAddress);
+ assertEquals(testReplyToAddress, replyToAddress);
+ }
+
+ @Test
+ public void testSetReplyTo()
+ {
+ String testReplyToAddress = "myTestAddress";
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ Message underlyingMessage = testAmqpMessage.getMessage();
+ assertNull(underlyingMessage.getReplyTo());
+
+ testAmqpMessage.setReplyTo(testReplyToAddress);
+
+ assertNotNull(underlyingMessage.getReplyTo());
+ assertEquals(testReplyToAddress, underlyingMessage.getReplyTo());
+ }
+
+ @Test
+ public void testSetGetReplyTo()
+ {
+ String testReplyToAddress = "myTestAddress";
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+
+ assertNull(testAmqpMessage.getReplyTo());
+
+ testAmqpMessage.setReplyTo(testReplyToAddress);
+
+ assertNotNull(testAmqpMessage.getReplyTo());
+ assertEquals(testReplyToAddress, testAmqpMessage.getReplyTo());
+ }
+
// ====== Message Annotations =======
@Test
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/BytesMessageImplTest.java Wed Dec 4 16:42:41 2013
@@ -53,6 +53,7 @@ public class BytesMessageImplTest extend
_mockAmqpConnection = Mockito.mock(AmqpConnection.class);
_mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
_mockSessionImpl = Mockito.mock(SessionImpl.class);
+ Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
}
@Test
@@ -60,7 +61,7 @@ public class BytesMessageImplTest extend
{
Message message = Proton.message();
AmqpBytesMessage testAmqpMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
- BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl);
+ BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl, null);
assertEquals(0, bytesMessageImpl.getBodyLength());
}
@@ -70,7 +71,7 @@ public class BytesMessageImplTest extend
{
Message message = Proton.message();
AmqpBytesMessage testAmqpMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
- BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl);
+ BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(testAmqpMessage, _mockSessionImpl,_mockConnectionImpl, null);
//verify attempting to read bytes returns -1, i.e EOS
assertEquals(-1, bytesMessageImpl.readBytes(new byte[2]));
@@ -85,7 +86,7 @@ public class BytesMessageImplTest extend
message.setBody(new Data(new Binary(bytes)));
AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
- BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl);
+ BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl, null);
//retrieve the expected bytes, check they match
byte[] receivedBytes = new byte[bytes.length];
@@ -107,7 +108,7 @@ public class BytesMessageImplTest extend
message.setBody(new AmqpValue(new Binary(bytes)));
AmqpBytesMessage amqpBytesMessage = new AmqpBytesMessage(_mockDelivery, message, _mockAmqpConnection);
- BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl);
+ BytesMessageImpl bytesMessageImpl = new BytesMessageImpl(amqpBytesMessage, _mockSessionImpl,_mockConnectionImpl, null);
//retrieve the expected bytes, check they match
byte[] receivedBytes = new byte[bytes.length];
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/DestinationHelperTest.java Wed Dec 4 16:42:41 2013
@@ -54,23 +54,83 @@ public class DestinationHelperTest exten
}
@Test
- public void testDecodeDestinationWithoutTypeAnnotation() throws Exception
+ public void testDecodeDestinationWithNullAddressAndNullConsumerDestReturnsNull() throws Exception
+ {
+ assertNull(_helper.decodeDestination(null, DestinationHelper.QUEUE_ATTRIBUTES_STRING, null, false));
+ }
+
+ @Test
+ public void testDecodeDestinationWithNullAddressWithConsumerDestReturnsSameConsumerDestObject() throws Exception
+ {
+ String consumerDestString = "consumerDest";
+ Queue consumerDest = _helper.createQueue(consumerDestString);
+ assertSame(consumerDest,_helper.decodeDestination(null, DestinationHelper.QUEUE_ATTRIBUTES_STRING, consumerDest, false));
+ }
+
+ @Test
+ public void testDecodeDestinationWithNullAddressWithConsumerDestReturnsNullWhenUsingConsumerDestForTypeOnly() throws Exception
+ {
+ String consumerDestString = "consumerDest";
+ Queue consumerDest = _helper.createQueue(consumerDestString);
+ assertNull(_helper.decodeDestination(null, DestinationHelper.QUEUE_ATTRIBUTES_STRING, consumerDest, true));
+ }
+
+ @Test
+ public void testDecodeDestinationWithoutTypeAnnotationWithoutConsumerDest() throws Exception
+ {
+ String testAddress = "testAddress";
+ Destination dest = _helper.decodeDestination(testAddress, null, null, false);
+ assertNotNull(dest);
+
+ assertTrue(dest instanceof DestinationImpl);
+ assertEquals(testAddress, ((DestinationImpl) dest).getAddress());
+ }
+
+ @Test
+ public void testDecodeDestinationWithoutTypeAnnotationWithQueueConsumerDest() throws Exception
+ {
+ String testAddress = "testAddress";
+ String consumerDestString = "consumerDest";
+ Queue consumerDest = _helper.createQueue(consumerDestString);
+ Destination dest = _helper.decodeDestination(testAddress, null, consumerDest, false);
+ assertNotNull(dest);
+
+ assertTrue(dest instanceof QueueImpl);
+ assertEquals(testAddress, ((Queue)dest).getQueueName());
+ }
+
+ @Test
+ public void testDecodeDestinationWithoutTypeAnnotationWithTopicConsumerDest() throws Exception
+ {
+ String testAddress = "testAddress";
+ String consumerDestString = "consumerDest";
+ Topic consumerDest = _helper.createTopic(consumerDestString);
+ Destination dest = _helper.decodeDestination(testAddress, null, consumerDest, false);
+ assertNotNull(dest);
+
+ assertTrue(dest instanceof TopicImpl);
+ assertEquals(testAddress, ((Topic)dest).getTopicName());
+ }
+
+ @Test
+ public void testDecodeDestinationWithoutTypeAnnotationWithDestinationConsumerDest() throws Exception
{
String testAddress = "testAddress";
- Destination dest = _helper.decodeDestination(testAddress, null);
+ Destination consumerDest = new DestinationImpl(testAddress);
+ Destination dest = _helper.decodeDestination(testAddress, null, consumerDest, false);
assertNotNull(dest);
- //TODO: this test will need to expand for classification of receiver type in future
assertTrue(dest instanceof DestinationImpl);
+ assertEquals(testAddress, ((DestinationImpl) dest).getAddress());
}
@Test
- public void testDecodeDestinationWithQueueTypeAnnotation() throws Exception
+ public void testDecodeDestinationWithQueueTypeAnnotationWithoutConsumerDest() throws Exception
{
String testAddress = "testAddress";
String testTypeAnnotation = "queue";
- Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
+ Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
assertNotNull(dest);
assertTrue(dest instanceof DestinationImpl);
assertTrue(dest instanceof QueueImpl);
@@ -79,12 +139,12 @@ public class DestinationHelperTest exten
}
@Test
- public void testDecodeDestinationWithTopicTypeAnnotation() throws Exception
+ public void testDecodeDestinationWithTopicTypeAnnotationWithoutConsumerDest() throws Exception
{
String testAddress = "testAddress";
String testTypeAnnotation = "topic";
- Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
+ Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
assertNotNull(dest);
assertTrue(dest instanceof DestinationImpl);
assertTrue(dest instanceof TopicImpl);
@@ -102,8 +162,8 @@ public class DestinationHelperTest exten
try
{
- Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
- fail("expected exceptionnow thrown");
+ _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
+ fail("expected exception not thrown");
}
catch(IllegalArgumentException iae)
{
@@ -112,8 +172,8 @@ public class DestinationHelperTest exten
try
{
- Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotationBackwards);
- fail("expected exceptionnow thrown");
+ _helper.decodeDestination(testAddress, testTypeAnnotationBackwards, null, false);
+ fail("expected exception not thrown");
}
catch(IllegalArgumentException iae)
{
@@ -131,8 +191,8 @@ public class DestinationHelperTest exten
try
{
- Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotation);
- fail("expected exceptionnow thrown");
+ _helper.decodeDestination(testAddress, testTypeAnnotation, null, false);
+ fail("expected exception not thrown");
}
catch(IllegalArgumentException iae)
{
@@ -141,8 +201,8 @@ public class DestinationHelperTest exten
try
{
- Destination dest = _helper.decodeDestination(testAddress, testTypeAnnotationBackwards);
- fail("expected exceptionnow thrown");
+ _helper.decodeDestination(testAddress, testTypeAnnotationBackwards, null, false);
+ fail("expected exception not thrown");
}
catch(IllegalArgumentException iae)
{
@@ -203,7 +263,7 @@ public class DestinationHelperTest exten
try
{
- Destination dest = _helper.convertToQpidDestination(mockTempQueue);
+ _helper.convertToQpidDestination(mockTempQueue);
fail("excepted exception not thrown");
}
catch(IllegalArgumentException iae)
@@ -222,7 +282,7 @@ public class DestinationHelperTest exten
try
{
- Destination dest = _helper.convertToQpidDestination(mockTempTopic);
+ _helper.convertToQpidDestination(mockTempTopic);
fail("excepted exception not thrown");
}
catch(IllegalArgumentException iae)
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java Wed Dec 4 16:42:41 2013
@@ -51,6 +51,7 @@ public class MessageFactoryImplTest exte
super.setUp();
_mockConnection = Mockito.mock(ConnectionImpl.class);
_mockSession = Mockito.mock(SessionImpl.class);
+ Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
_messageFactoryImpl = new MessageFactoryImpl();
}
@@ -58,7 +59,7 @@ public class MessageFactoryImplTest exte
public void testCreateJmsMessageWithAmqpGenericMessage() throws Exception
{
AmqpMessage amqpMessage = Mockito.mock(AmqpGenericMessage.class);
- Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+ Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
assertEquals(GenericAmqpMessageImpl.class, jmsMessage.getClass());
}
@@ -66,7 +67,7 @@ public class MessageFactoryImplTest exte
public void testCreateJmsMessageWithAmqpTextMessage() throws Exception
{
AmqpMessage amqpMessage = Mockito.mock(AmqpTextMessage.class);
- Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+ Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
assertEquals(TextMessageImpl.class, jmsMessage.getClass());
}
@@ -74,7 +75,7 @@ public class MessageFactoryImplTest exte
public void testCreateJmsMessageWithAmqpBytesMessage() throws Exception
{
AmqpMessage amqpMessage = Mockito.mock(AmqpBytesMessage.class);
- Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+ Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
assertEquals(BytesMessageImpl.class, jmsMessage.getClass());
}
@@ -82,7 +83,7 @@ public class MessageFactoryImplTest exte
public void testCreateJmsMessageWithAmqpObjectMessage() throws Exception
{
AmqpMessage amqpMessage = Mockito.mock(AmqpObjectMessage.class);
- Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+ Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
assertEquals(ObjectMessageImpl.class, jmsMessage.getClass());
}
@@ -90,7 +91,7 @@ public class MessageFactoryImplTest exte
public void testCreateJmsMessageWithAmqpListMessage() throws Exception
{
AmqpMessage amqpMessage = Mockito.mock(AmqpListMessage.class);
- Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+ Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
assertEquals(StreamMessageImpl.class, jmsMessage.getClass());
}
@@ -98,7 +99,7 @@ public class MessageFactoryImplTest exte
public void testCreateJmsMessageWithAmqpMapMessage() throws Exception
{
AmqpMessage amqpMessage = Mockito.mock(AmqpMapMessage.class);
- Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+ Message jmsMessage = _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
assertEquals(MapMessageImpl.class, jmsMessage.getClass());
}
@@ -109,7 +110,7 @@ public class MessageFactoryImplTest exte
AmqpMessage amqpMessage = Mockito.mock(AmqpMessage.class);
try
{
- _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection);
+ _messageFactoryImpl.createJmsMessage(amqpMessage, _mockSession, _mockConnection, null);
fail("expected exception was not thrown");
}
catch(JMSException jmse)
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java Wed Dec 4 16:42:41 2013
@@ -52,8 +52,11 @@ public class MessageImplTest extends Qpi
public void setUp() throws Exception
{
super.setUp();
+
_mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
_mockSessionImpl = Mockito.mock(SessionImpl.class);
+ Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
+
_testAmqpMessage = new TestAmqpMessage();
_testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
@@ -460,8 +463,6 @@ public class MessageImplTest extends Qpi
@Test
public void testSetJMSDestinationOnNewMessageUsingQueue() throws Exception
{
- Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
-
assertNull(_testAmqpMessage.getTo());
_testMessage.setJMSDestination(_mockQueue);
@@ -477,8 +478,6 @@ public class MessageImplTest extends Qpi
@Test
public void testSetJMSDestinationOnNewMessageUsingTopic() throws Exception
{
- Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
-
assertNull(_testAmqpMessage.getTo());
_testMessage.setJMSDestination(_mockTopic);
@@ -494,11 +493,10 @@ public class MessageImplTest extends Qpi
@Test
public void testSetJMSDestinationNullOnRecievedMessageWithToAndTypeAnnotationClearsTheAnnotation() throws Exception
{
- Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
_testAmqpMessage.setTo(_mockTopicName);
_testAmqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
DestinationHelper.TOPIC_ATTRIBUTES_STRING);
- _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
@@ -512,7 +510,6 @@ public class MessageImplTest extends Qpi
@Test
public void testSetGetJMSDestinationOnNewMessage() throws Exception
{
- Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
_testMessage.setJMSDestination(_mockQueue);
assertNotNull(_testMessage.getJMSDestination());
assertSame(_mockQueue, _testMessage.getJMSDestination());
@@ -521,9 +518,8 @@ public class MessageImplTest extends Qpi
@Test
public void testGetJMSDestinationOnRecievedMessageWithToButWithoutToTypeAnnotation() throws Exception
{
- Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
_testAmqpMessage.setTo(_mockQueueName);
- _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
@@ -534,11 +530,10 @@ public class MessageImplTest extends Qpi
@Test
public void testGetJMSDestinationOnRecievedMessageWithToAndTypeAnnotationForTopic() throws Exception
{
- Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
_testAmqpMessage.setTo(_mockTopicName);
_testAmqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
DestinationHelper.TOPIC_ATTRIBUTES_STRING);
- _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
@@ -549,11 +544,10 @@ public class MessageImplTest extends Qpi
@Test
public void testGetJMSDestinationOnRecievedMessageWithToAndTypeAnnotationForQueue() throws Exception
{
- Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
_testAmqpMessage.setTo(_mockQueueName);
_testAmqpMessage.setMessageAnnotation(DestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
DestinationHelper.QUEUE_ATTRIBUTES_STRING);
- _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
assertNotNull("expected JMSDestination value not present", _testMessage.getJMSDestination());
@@ -561,6 +555,112 @@ public class MessageImplTest extends Qpi
assertEquals(newDestinationExpected, _testMessage.getJMSDestination());
}
+ //TODO:new
+ // ====== JMSReplyTo =======
+
+ @Test
+ public void testGetJMSReplyToOnNewMessage() throws Exception
+ {
+ //Should be null as it has not been set explicitly, and
+ // the message has not been received from anywhere
+ assertNull(_testMessage.getJMSReplyTo());
+ }
+
+ @Test
+ public void testSetJMSJMSReplyToOnNewMessageUsingQueue() throws Exception
+ {
+ assertNull(_testAmqpMessage.getReplyTo());
+
+ _testMessage.setJMSReplyTo(_mockQueue);
+
+ assertNotNull(_testAmqpMessage.getReplyTo());
+ assertEquals(_mockQueueName, _testAmqpMessage.getReplyTo());
+
+ assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+ assertEquals(DestinationHelper.QUEUE_ATTRIBUTES_STRING,
+ _testAmqpMessage.getMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+ }
+
+ @Test
+ public void testSetJMSReplyToOnNewMessageUsingTopic() throws Exception
+ {
+ assertNull(_testAmqpMessage.getReplyTo());
+
+ _testMessage.setJMSReplyTo(_mockTopic);
+
+ assertNotNull(_testAmqpMessage.getReplyTo());
+ assertEquals(_mockTopicName, _testAmqpMessage.getReplyTo());
+
+ assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+ assertEquals(DestinationHelper.TOPIC_ATTRIBUTES_STRING,
+ _testAmqpMessage.getMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+ }
+
+ @Test
+ public void testSetJMSReplyToNullOnRecievedMessageWithReplyToAndTypeAnnotationClearsTheAnnotation() throws Exception
+ {
+ _testAmqpMessage.setReplyTo(_mockTopicName);
+ _testAmqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
+ DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+ assertTrue(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+
+ _testMessage.setJMSReplyTo(null);
+
+ assertNull("expected JMSReplyTo value to be null", _testMessage.getJMSReplyTo());
+ assertFalse(_testAmqpMessage.messageAnnotationExists(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME));
+ }
+
+ @Test
+ public void testSetGetJMSReplyToNewMessage() throws Exception
+ {
+ _testMessage.setJMSReplyTo(_mockQueue);
+ assertNotNull(_testMessage.getJMSReplyTo());
+ assertSame(_mockQueue, _testMessage.getJMSReplyTo());
+ }
+
+ @Test
+ public void testGetJMSReplyToRecievedMessageWithReplyToButWithoutReplyToTypeAnnotation() throws Exception
+ {
+ _testAmqpMessage.setReplyTo(_mockQueueName);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+
+ Destination newDestinationExpected = new DestinationImpl(_mockQueueName);
+ assertEquals(newDestinationExpected, _testMessage.getJMSReplyTo());
+ }
+
+ @Test
+ public void testGetJMSReplyToOnRecievedMessageWithReplyToAndTypeAnnotationForTopic() throws Exception
+ {
+ _testAmqpMessage.setReplyTo(_mockTopicName);
+ _testAmqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
+ DestinationHelper.TOPIC_ATTRIBUTES_STRING);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+
+ Topic newDestinationExpected = new DestinationHelper().createTopic(_mockTopicName);
+ assertEquals(newDestinationExpected, _testMessage.getJMSReplyTo());
+ }
+
+ @Test
+ public void testGetJMSReplyToRecievedMessageWithReplyToAndTypeAnnotationForQueue() throws Exception
+ {
+ _testAmqpMessage.setReplyTo(_mockQueueName);
+ _testAmqpMessage.setMessageAnnotation(DestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME,
+ DestinationHelper.QUEUE_ATTRIBUTES_STRING);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
+
+ assertNotNull("expected JMSReplyTo value not present", _testMessage.getJMSReplyTo());
+
+ Queue newDestinationExpected = new DestinationHelper().createQueue(_mockQueueName);
+ assertEquals(newDestinationExpected, _testMessage.getJMSReplyTo());
+ }
+
// ====== JMSTimestamp =======
@Test
@@ -594,7 +694,7 @@ public class MessageImplTest extends Qpi
{
long timestamp = System.currentTimeMillis();
_testAmqpMessage.setCreationTime(timestamp);
- _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+ _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl, null);
assertEquals("expected JMSTimestamp value not present", timestamp, _testMessage.getJMSTimestamp());
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java Wed Dec 4 16:42:41 2013
@@ -51,6 +51,7 @@ public class ReceiverImplTest extends Qp
_mockConnection = Mockito.mock(ConnectionImpl.class);
_mockAmqpReceiver = Mockito.mock(AmqpReceiver.class);
_mockSession = Mockito.mock(SessionImpl.class);
+ Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
_mockAmqpMessage = Mockito.mock(AmqpGenericMessage.class);
}
@@ -62,7 +63,7 @@ public class ReceiverImplTest extends Qp
ImmediateWaitUntil.mockWaitUntil(_mockConnection);
- ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+ ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver, null);
assertNull("Should not receive a message when connection is not started", receiver.receive(1));
}
@@ -77,7 +78,7 @@ public class ReceiverImplTest extends Qp
ImmediateWaitUntil.mockWaitUntil(_mockConnection);
- ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver);
+ ReceiverImpl receiver = new ReceiverImpl(_mockConnection, _mockSession, _mockAmqpReceiver, null);
Message message = receiver.receive(1);
assertNotNull("Should receive a message when connection is started", message);
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TestMessageImpl.java Wed Dec 4 16:42:41 2013
@@ -1,12 +1,21 @@
package org.apache.qpid.jms.impl;
+import javax.jms.Destination;
+
import org.apache.qpid.jms.engine.TestAmqpMessage;
public class TestMessageImpl extends MessageImpl<TestAmqpMessage>
{
+ //message to be sent
public TestMessageImpl(TestAmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
{
- super(amqpMessage,sessionImpl,connectionImpl);
+ super(amqpMessage, sessionImpl, connectionImpl);
+ }
+
+ //message just recieved
+ public TestMessageImpl(TestAmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl, Destination consumerDestination)
+ {
+ super(amqpMessage, sessionImpl, connectionImpl, null);
}
@Override
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java?rev=1547839&r1=1547838&r2=1547839&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/TextMessageImplTest.java Wed Dec 4 16:42:41 2013
@@ -49,6 +49,7 @@ public class TextMessageImplTest extends
_mockAmqpConnection = Mockito.mock(AmqpConnection.class);
_mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
_mockSessionImpl = Mockito.mock(SessionImpl.class);
+ Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new DestinationHelper());
}
@Test
@@ -77,7 +78,7 @@ public class TextMessageImplTest extends
{
Message message = Proton.message();
AmqpTextMessage testAmqpMessage1 = new AmqpTextMessage(_mockDelivery, message, _mockAmqpConnection);
- TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl);
+ TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl, null);
assertNull("expected null string", textMessageImpl.getText());
}
@@ -89,7 +90,7 @@ public class TextMessageImplTest extends
String value = "myAmqpValueString";
message.setBody(new AmqpValue(value));
AmqpTextMessage testAmqpMessage1 = new AmqpTextMessage(_mockDelivery, message, _mockAmqpConnection);
- TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl);
+ TextMessageImpl textMessageImpl = new TextMessageImpl(testAmqpMessage1, _mockSessionImpl,_mockConnectionImpl, null);
assertEquals(value, textMessageImpl.getText());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org