You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/19 18:18:18 UTC
qpid-jms git commit: support stripping prefixes from to/reply-to
fields on messages when creating JMSDestination/JMSReplyTo value if the
connection has prefixes configured
Repository: qpid-jms
Updated Branches:
refs/heads/master 0e568a4f7 -> 0e590e7ce
support stripping prefixes from to/reply-to fields on messages when creating JMSDestination/JMSReplyTo value if the connection has prefixes configured
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0e590e7c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0e590e7c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0e590e7c
Branch: refs/heads/master
Commit: 0e590e7ce8face2fd3a43bfda66df12e6f400de2
Parents: 0e568a4
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Dec 19 17:16:01 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Dec 19 17:16:48 2014 +0000
----------------------------------------------------------------------
.../amqp/message/AmqpDestinationHelper.java | 37 ++-
.../jms/integration/MessageIntegrationTest.java | 245 +++++++++++++++++++
.../amqp/message/AmqpDestinationHelperTest.java | 38 +++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 3 +-
4 files changed, 320 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
index 03a8e57..0f770fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -24,6 +24,7 @@ import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsTemporaryQueue;
import org.apache.qpid.jms.JmsTemporaryTopic;
import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
/**
* A set of static utility method useful when mapping JmsDestination types to / from the AMQP
@@ -69,15 +70,47 @@ public class AmqpDestinationHelper {
public JmsDestination getJmsDestination(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
String to = message.getToAddress();
Byte typeByte = getTypeByte(message, TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ String name = stripPrefixIfNecessary(to, message.getConnection(), typeByte, consumerDestination);
- return createDestination(to, typeByte, consumerDestination, false);
+ return createDestination(name, typeByte, consumerDestination, false);
}
public JmsDestination getJmsReplyTo(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
String replyTo = message.getReplyToAddress();
Byte typeByte = getTypeByte(message, REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ String name = stripPrefixIfNecessary(replyTo, message.getConnection(), typeByte, consumerDestination);
- return createDestination(replyTo, typeByte, consumerDestination, true);
+ return createDestination(name, typeByte, consumerDestination, true);
+ }
+
+ private String stripPrefixIfNecessary(String address, AmqpConnection conn, Byte typeByte, JmsDestination consumerDestination) {
+ if (address == null) {
+ return null;
+ }
+
+ if (typeByte == null) {
+ String queuePrefix = conn.getQueuePrefix();
+ if (queuePrefix != null && address.startsWith(queuePrefix)) {
+ return address.substring(queuePrefix.length());
+ }
+
+ String topicPrefix = conn.getTopicPrefix();
+ if (topicPrefix != null && address.startsWith(topicPrefix)) {
+ return address.substring(topicPrefix.length());
+ }
+ } else if (typeByte == QUEUE_TYPE) {
+ String queuePrefix = conn.getQueuePrefix();
+ if (queuePrefix != null && address.startsWith(queuePrefix)) {
+ return address.substring(queuePrefix.length());
+ }
+ } else if (typeByte == TOPIC_TYPE) {
+ String topicPrefix = conn.getTopicPrefix();
+ if (topicPrefix != null && address.startsWith(topicPrefix)) {
+ return address.substring(topicPrefix.length());
+ }
+ }
+
+ return address;
}
private JmsDestination createDestination(String address, Byte typeByte, JmsDestination consumerDestination, boolean useConsumerDestForTypeOnly) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 5b54b51..0734718 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -37,9 +38,12 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.apache.qpid.jms.JmsClientProperties;
+import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageIdHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
@@ -49,6 +53,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescri
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -269,6 +274,246 @@ public class MessageIntegrationTest extends QpidJmsTestCase
}
}
+ /**
+ * Tests that the a connection with a 'topic prefix' set on it strips the
+ * prefix from the content of the to/reply-to fields for incoming messages.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws Exception {
+ Class<? extends Destination> destType = Topic.class;
+ String destPrefix = "t12321-";
+ String destName = "myTopic";
+ String replyName = "myReplyTopic";
+ String destAddress = destPrefix + destName;
+ String replyAddress = destPrefix + replyName;
+ String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+ Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+ String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+ Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+
+ doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ /**
+ * Tests that the a connection with a 'topic prefix' set on it strips the
+ * prefix from the content of the to/reply-to fields for incoming messages
+ * if they don't have the 'destination type annotation' set.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithNoTypeAnnotationAndTopicDestinationsOnConnectionWithTopicPrefix() throws Exception {
+ Class<? extends Destination> destType = Topic.class;
+ String destPrefix = "t12321-";
+ String destName = "myTopic";
+ String replyName = "myReplyTopic";
+ String destAddress = destPrefix + destName;
+ String replyAddress = destPrefix + replyName;
+ String annotationName = null;
+ Byte annotationValue = null;
+ String replyAnnotationName = null;
+ Byte replyAnnotationValue = null;
+
+ doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ /**
+ * Tests that the a connection with a 'queue prefix' set on it strips the
+ * prefix from the content of the to/reply-to fields for incoming messages.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws Exception {
+ Class<? extends Destination> destType = Queue.class;
+ String destPrefix = "q12321-";
+ String destName = "myQueue";
+ String replyName = "myReplyQueue";
+ String destAddress = destPrefix + destName;
+ String replyAddress = destPrefix + replyName;
+ String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+ Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+ String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+ Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+
+ doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ /**
+ * Tests that the a connection with a 'queue prefix' set on it strips the
+ * prefix from the content of the to/reply-to fields for incoming messages
+ * if they don't have the 'destination type annotation' set.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithNoTypeAnnotationAndQueueDestinationsOnConnectionWithQueuePrefix() throws Exception {
+ Class<? extends Destination> destType = Queue.class;
+ String destPrefix = "q12321-";
+ String destName = "myQueue";
+ String replyName = "myReplyQueue";
+ String destAddress = destPrefix + destName;
+ String replyAddress = destPrefix + replyName;
+ String annotationName = null;
+ Byte annotationValue = null;
+ String replyAnnotationName = null;
+ Byte replyAnnotationValue = null;
+
+ doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ /**
+ * Tests that a connection with a 'prefixes' set on its does not alter the
+ * address for a temporary queue in the to/reply-to fields for incoming messages.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithTemporaryQueueDestinationsOnConnectionWithPrefixes() throws Exception {
+ Class<? extends Destination> destType = TemporaryQueue.class;
+ String destPrefix = "q12321-";
+ String destName = "temp-queue://myTempQueue";
+ String replyName = "temp-queue://myReplyTempQueue";
+ String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker
+ String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker
+ String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+ Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
+ String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+ Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
+
+ doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ /**
+ * Tests that a connection with a 'prefixes' set on its does not alter the
+ * address for a temporary queue in the to/reply-to fields for incoming messages.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithTemporaryTopicDestinationsOnConnectionWithPrefixes() throws Exception {
+ Class<? extends Destination> destType = TemporaryTopic.class;
+ String destPrefix = "q12321-";
+ String destName = "temp-topic://myTempTopic";
+ String replyName = "temp-topic://myReplyTempTopic";
+ String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker
+ String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker
+ String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+ Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
+ String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+ Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
+
+ doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ private void doReceivedMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType,
+ String destPrefix,
+ String destName,
+ String replyName,
+ String destAddress,
+ String replyAddress,
+ String annotationName,
+ Object annotationValue,
+ String replyAnnotationName,
+ Object replyAnnotationValue) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = null;
+ if (destType == Topic.class) {
+ connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix);
+ } else if (destType == Queue.class) {
+ connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix);
+ } else {
+ //Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything
+ connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix);
+ }
+
+ connection.start();
+
+ // Set the prefix if Topic or Queue dest type.
+ if (destType == Topic.class) {
+ ((JmsConnection) connection).setTopicPrefix(destPrefix);
+ } else if (destType == Queue.class) {
+ ((JmsConnection) connection).setQueuePrefix(destPrefix);
+ }
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the destination
+ Destination dest = null;
+ if (destType == Topic.class) {
+ dest= session.createTopic(destName);
+ } else if (destType == Queue.class) {
+ dest = session.createQueue(destName);
+ } else if (destType == TemporaryTopic.class) {
+ //TODO:add method to expect temp topic creation
+ testPeer.expectTempQueueCreationAttach(destAddress);
+ dest = session.createTemporaryTopic();
+ } else if (destType == TemporaryQueue.class) {
+ testPeer.expectTempQueueCreationAttach(destAddress);
+ dest = session.createTemporaryQueue();
+ }
+
+ MessageAnnotationsDescribedType msgAnnotations = null;
+ if (annotationName != null || replyAnnotationName != null) {
+ msgAnnotations = new MessageAnnotationsDescribedType();
+ if (annotationName != null) {
+ msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
+ }
+
+ if (replyAnnotationName != null) {
+ msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
+ }
+ }
+
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setTo(destAddress);
+ props.setReplyTo(replyAddress);
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ SourceMatcher sourceMatcher = new SourceMatcher();
+ sourceMatcher.withAddress(equalTo(destAddress));
+
+ testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(dest);
+ Message receivedMessage = messageConsumer.receive(1000);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ assertNotNull(receivedMessage);
+
+ Destination jmsDest = receivedMessage.getJMSDestination();
+ Destination jmsReplyTo = receivedMessage.getJMSReplyTo();
+
+ assertNotNull("Expected JMSDestination but got null", jmsDest);
+ assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);
+
+ // Verify destination/replyto names on received message
+ String recievedName = null;
+ String recievedReplyName = null;
+ if (destType == Topic.class || destType == TemporaryTopic.class) {
+ recievedName = ((Topic) jmsDest).getTopicName();
+ recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
+ } else if (destType == Queue.class || destType == TemporaryQueue.class) {
+ recievedName = ((Queue) jmsDest).getQueueName();
+ recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
+ }
+
+ assertEquals("Unexpected name for JMSDestination", destName, recievedName);
+ assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);
+
+ if (destType == TemporaryQueue.class || destType == TemporaryTopic.class) {
+ assertEquals("Temporary destination name and address should be equal", destName, destAddress);
+ assertEquals("Temporary replyto name and address should be equal", replyName, replyAddress);
+ }
+ }
+ }
+
// --- byte type annotation values --- //
/**
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
index ed1353a..2f516de 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
@@ -41,6 +41,7 @@ import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsTemporaryQueue;
import org.apache.qpid.jms.JmsTemporaryTopic;
import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.junit.Test;
import org.mockito.Mockito;
@@ -78,6 +79,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithEmptyTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -93,6 +96,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithUnknownTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("jms.queue");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -108,6 +113,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithoutTypeAnnotationWithAnonymousConsumerDest() {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
@@ -125,6 +132,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithoutTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -140,6 +149,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithoutTypeAnnotationWithTopicConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsDestination consumerDestination = new JmsTopic("ConsumerDestination");
@@ -156,6 +167,8 @@ public class AmqpDestinationHelperTest {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsDestination consumerDestination = new JmsTemporaryQueue("ConsumerDestination");
@@ -170,6 +183,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithoutTypeAnnotationWithTempTopicConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsDestination consumerDestination = new JmsTemporaryTopic("ConsumerDestination");
@@ -185,6 +200,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithQueueTypeAnnotationNoConsumerDestination() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(QUEUE_ATTRIBUTES_STRING);
@@ -199,6 +216,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsDestinationWithTopicTypeAnnotationNoConsumerDestination() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(TOPIC_ATTRIBUTES_STRING);
@@ -264,6 +283,9 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplyToWithoutTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
+
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -279,6 +301,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplyToWithoutTypeAnnotationWithAnonymousConsumerDest() {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
@@ -296,6 +320,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplyToWithEmptyTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -311,6 +337,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplyToWithUnknownTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("jms.queue");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -326,6 +354,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplyToWithoutTypeAnnotationWithTopicConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsTopic consumerDestination = new JmsTopic("ConsumerDestination");
@@ -341,6 +371,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplyToWithoutTypeAnnotationWithTempQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsTemporaryQueue consumerDestination = new JmsTemporaryQueue("ConsumerDestination");
@@ -356,6 +388,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplyToWithoutTypeAnnotationWithTempTopicConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
JmsTemporaryTopic consumerDestination = new JmsTemporaryTopic("ConsumerDestination");
@@ -371,6 +405,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplToWithQueueTypeAnnotationNoConsumerDestination() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(QUEUE_ATTRIBUTES_STRING);
@@ -385,6 +421,8 @@ public class AmqpDestinationHelperTest {
public void testGetJmsReplToWithTopicTypeAnnotationNoConsumerDestination() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+ AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+ Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(TOPIC_ATTRIBUTES_STRING);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index efac92c..51f3825 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -101,6 +101,7 @@ public class TestAmqpPeer implements AutoCloseable
private CountDownLatch _handlersCompletedLatch;
private volatile int _nextLinkHandle = LINK_HANDLE_OFFSET;
+ private volatile int _tempDestLinkHandle = LINK_HANDLE_OFFSET;
private byte[] _deferredBytes;
@@ -445,7 +446,7 @@ public class TestAmqpPeer implements AutoCloseable
.withSource(notNullValue())
.withTarget(targetMatcher);
- UnsignedInteger linkHandle = UnsignedInteger.valueOf(_nextLinkHandle++);
+ UnsignedInteger linkHandle = UnsignedInteger.valueOf(_tempDestLinkHandle++);
final AttachFrame attachResponse = new AttachFrame()
.setHandle(linkHandle)
.setRole(Role.RECEIVER)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org