You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/19 18:18:18 UTC

qpid-jms git commit: support stripping prefixes from to/reply-to fields on messages when creating JMSDestination/JMSReplyTo value if the connection has prefixes configured

Repository: qpid-jms
Updated Branches:
  refs/heads/master 0e568a4f7 -> 0e590e7ce


support stripping prefixes from to/reply-to fields on messages when creating JMSDestination/JMSReplyTo value if the connection has prefixes configured


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0e590e7c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0e590e7c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0e590e7c

Branch: refs/heads/master
Commit: 0e590e7ce8face2fd3a43bfda66df12e6f400de2
Parents: 0e568a4
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Dec 19 17:16:01 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Dec 19 17:16:48 2014 +0000

----------------------------------------------------------------------
 .../amqp/message/AmqpDestinationHelper.java     |  37 ++-
 .../jms/integration/MessageIntegrationTest.java | 245 +++++++++++++++++++
 .../amqp/message/AmqpDestinationHelperTest.java |  38 +++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |   3 +-
 4 files changed, 320 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
index 03a8e57..0f770fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -24,6 +24,7 @@ import org.apache.qpid.jms.JmsQueue;
 import org.apache.qpid.jms.JmsTemporaryQueue;
 import org.apache.qpid.jms.JmsTemporaryTopic;
 import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 
 /**
  * A set of static utility method useful when mapping JmsDestination types to / from the AMQP
@@ -69,15 +70,47 @@ public class AmqpDestinationHelper {
     public JmsDestination getJmsDestination(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
         String to = message.getToAddress();
         Byte typeByte = getTypeByte(message, TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        String name = stripPrefixIfNecessary(to, message.getConnection(), typeByte, consumerDestination);
 
-        return createDestination(to, typeByte, consumerDestination, false);
+        return createDestination(name, typeByte, consumerDestination, false);
     }
 
     public JmsDestination getJmsReplyTo(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
         String replyTo = message.getReplyToAddress();
         Byte typeByte = getTypeByte(message, REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        String name = stripPrefixIfNecessary(replyTo, message.getConnection(), typeByte, consumerDestination);
 
-        return createDestination(replyTo, typeByte, consumerDestination, true);
+        return createDestination(name, typeByte, consumerDestination, true);
+    }
+
+    private String stripPrefixIfNecessary(String address, AmqpConnection conn, Byte typeByte, JmsDestination consumerDestination) {
+        if (address == null) {
+            return null;
+        }
+
+        if (typeByte == null) {
+            String queuePrefix = conn.getQueuePrefix();
+            if (queuePrefix != null && address.startsWith(queuePrefix)) {
+                return address.substring(queuePrefix.length());
+            }
+
+            String topicPrefix = conn.getTopicPrefix();
+            if (topicPrefix != null && address.startsWith(topicPrefix)) {
+                return address.substring(topicPrefix.length());
+            }
+        } else if (typeByte == QUEUE_TYPE) {
+            String queuePrefix = conn.getQueuePrefix();
+            if (queuePrefix != null && address.startsWith(queuePrefix)) {
+                return address.substring(queuePrefix.length());
+            }
+        } else if (typeByte == TOPIC_TYPE) {
+            String topicPrefix = conn.getTopicPrefix();
+            if (topicPrefix != null && address.startsWith(topicPrefix)) {
+                return address.substring(topicPrefix.length());
+            }
+        }
+
+        return address;
     }
 
     private JmsDestination createDestination(String address, Byte typeByte, JmsDestination consumerDestination, boolean useConsumerDestForTypeOnly) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 5b54b51..0734718 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.qpid.jms.integration;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -37,9 +38,12 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 
 import org.apache.qpid.jms.JmsClientProperties;
+import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageIdHelper;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
@@ -49,6 +53,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescri
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -269,6 +274,246 @@ public class MessageIntegrationTest extends QpidJmsTestCase
         }
     }
 
+    /**
+     * Tests that the a connection with a 'topic prefix' set on it strips the
+     * prefix from the content of the to/reply-to fields for incoming messages.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws Exception {
+        Class<? extends Destination> destType = Topic.class;
+        String destPrefix = "t12321-";
+        String destName = "myTopic";
+        String replyName = "myReplyTopic";
+        String destAddress = destPrefix + destName;
+        String replyAddress = destPrefix + replyName;
+        String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+        Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+        String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+        Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+
+        doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+                                                        destAddress, replyAddress, annotationName,
+                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    /**
+     * Tests that the a connection with a 'topic prefix' set on it strips the
+     * prefix from the content of the to/reply-to fields for incoming messages
+     * if they don't have the 'destination type annotation' set.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithNoTypeAnnotationAndTopicDestinationsOnConnectionWithTopicPrefix() throws Exception {
+        Class<? extends Destination> destType = Topic.class;
+        String destPrefix = "t12321-";
+        String destName = "myTopic";
+        String replyName = "myReplyTopic";
+        String destAddress = destPrefix + destName;
+        String replyAddress = destPrefix + replyName;
+        String annotationName = null;
+        Byte annotationValue = null;
+        String replyAnnotationName = null;
+        Byte replyAnnotationValue = null;
+
+        doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+                                                        destAddress, replyAddress, annotationName,
+                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    /**
+     * Tests that the a connection with a 'queue prefix' set on it strips the
+     * prefix from the content of the to/reply-to fields for incoming messages.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws Exception {
+        Class<? extends Destination> destType = Queue.class;
+        String destPrefix = "q12321-";
+        String destName = "myQueue";
+        String replyName = "myReplyQueue";
+        String destAddress = destPrefix + destName;
+        String replyAddress = destPrefix + replyName;
+        String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+        Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+        String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+        Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+
+        doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+                                                        destAddress, replyAddress, annotationName,
+                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    /**
+     * Tests that the a connection with a 'queue prefix' set on it strips the
+     * prefix from the content of the to/reply-to fields for incoming messages
+     * if they don't have the 'destination type annotation' set.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithNoTypeAnnotationAndQueueDestinationsOnConnectionWithQueuePrefix() throws Exception {
+        Class<? extends Destination> destType = Queue.class;
+        String destPrefix = "q12321-";
+        String destName = "myQueue";
+        String replyName = "myReplyQueue";
+        String destAddress = destPrefix + destName;
+        String replyAddress = destPrefix + replyName;
+        String annotationName = null;
+        Byte annotationValue = null;
+        String replyAnnotationName = null;
+        Byte replyAnnotationValue = null;
+
+        doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+                                                        destAddress, replyAddress, annotationName,
+                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    /**
+     * Tests that a connection with a 'prefixes' set on its does not alter the
+     * address for a temporary queue in the to/reply-to fields for incoming messages.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithTemporaryQueueDestinationsOnConnectionWithPrefixes() throws Exception {
+        Class<? extends Destination> destType = TemporaryQueue.class;
+        String destPrefix = "q12321-";
+        String destName = "temp-queue://myTempQueue";
+        String replyName = "temp-queue://myReplyTempQueue";
+        String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker
+        String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker
+        String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+        Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
+        String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+        Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
+
+        doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+                                                        destAddress, replyAddress, annotationName,
+                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    /**
+     * Tests that a connection with a 'prefixes' set on its does not alter the
+     * address for a temporary queue in the to/reply-to fields for incoming messages.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithTemporaryTopicDestinationsOnConnectionWithPrefixes() throws Exception {
+        Class<? extends Destination> destType = TemporaryTopic.class;
+        String destPrefix = "q12321-";
+        String destName = "temp-topic://myTempTopic";
+        String replyName = "temp-topic://myReplyTempTopic";
+        String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker
+        String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker
+        String annotationName = AmqpMessageSupport.AMQP_TO_ANNOTATION;
+        Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
+        String replyAnnotationName = AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION;
+        Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
+
+        doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
+                                                        destAddress, replyAddress, annotationName,
+                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    private void doReceivedMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType,
+                                                                 String destPrefix,
+                                                                 String destName,
+                                                                 String replyName,
+                                                                 String destAddress,
+                                                                 String replyAddress,
+                                                                 String annotationName,
+                                                                 Object annotationValue,
+                                                                 String replyAnnotationName,
+                                                                 Object replyAnnotationValue) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = null;
+            if (destType == Topic.class) {
+                connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix);
+            } else if (destType == Queue.class) {
+                connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix);
+            } else {
+                //Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything
+                connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix);
+            }
+
+            connection.start();
+
+            // Set the prefix if Topic or Queue dest type.
+            if (destType == Topic.class) {
+                ((JmsConnection) connection).setTopicPrefix(destPrefix);
+            } else if (destType == Queue.class) {
+                ((JmsConnection) connection).setQueuePrefix(destPrefix);
+            }
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create the destination
+            Destination dest = null;
+            if (destType == Topic.class) {
+                dest= session.createTopic(destName);
+            } else if (destType == Queue.class) {
+                dest = session.createQueue(destName);
+            } else if (destType == TemporaryTopic.class) {
+                //TODO:add method to expect temp topic creation
+                testPeer.expectTempQueueCreationAttach(destAddress);
+                dest = session.createTemporaryTopic();
+            } else if (destType == TemporaryQueue.class) {
+                testPeer.expectTempQueueCreationAttach(destAddress);
+                dest = session.createTemporaryQueue();
+            }
+
+            MessageAnnotationsDescribedType msgAnnotations = null;
+            if (annotationName != null || replyAnnotationName != null) {
+                msgAnnotations = new MessageAnnotationsDescribedType();
+                if (annotationName != null) {
+                    msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
+                }
+
+                if (replyAnnotationName != null) {
+                    msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
+                }
+            }
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setTo(destAddress);
+            props.setReplyTo(replyAddress);
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            SourceMatcher sourceMatcher = new SourceMatcher();
+            sourceMatcher.withAddress(equalTo(destAddress));
+
+            testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(dest);
+            Message receivedMessage = messageConsumer.receive(1000);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+            assertNotNull(receivedMessage);
+
+            Destination jmsDest = receivedMessage.getJMSDestination();
+            Destination jmsReplyTo = receivedMessage.getJMSReplyTo();
+
+            assertNotNull("Expected JMSDestination but got null", jmsDest);
+            assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);
+
+            // Verify destination/replyto names on received message
+            String recievedName = null;
+            String recievedReplyName = null;
+            if (destType == Topic.class || destType == TemporaryTopic.class) {
+                recievedName = ((Topic) jmsDest).getTopicName();
+                recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
+            } else if (destType == Queue.class || destType == TemporaryQueue.class) {
+                recievedName = ((Queue) jmsDest).getQueueName();
+                recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
+            }
+
+            assertEquals("Unexpected name for JMSDestination", destName, recievedName);
+            assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);
+
+            if (destType == TemporaryQueue.class || destType == TemporaryTopic.class) {
+                assertEquals("Temporary destination name and address should be equal", destName, destAddress);
+                assertEquals("Temporary replyto name and address should be equal", replyName, replyAddress);
+            }
+        }
+    }
+
     // --- byte type annotation values --- //
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
index ed1353a..2f516de 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
@@ -41,6 +41,7 @@ import org.apache.qpid.jms.JmsQueue;
 import org.apache.qpid.jms.JmsTemporaryQueue;
 import org.apache.qpid.jms.JmsTemporaryTopic;
 import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -78,6 +79,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithEmptyTypeAnnotationWithQueueConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("");
         JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -93,6 +96,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithUnknownTypeAnnotationWithQueueConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("jms.queue");
         JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -108,6 +113,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithoutTypeAnnotationWithAnonymousConsumerDest() {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
 
@@ -125,6 +132,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithoutTypeAnnotationWithQueueConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -140,6 +149,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithoutTypeAnnotationWithTopicConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsDestination consumerDestination = new JmsTopic("ConsumerDestination");
@@ -156,6 +167,8 @@ public class AmqpDestinationHelperTest {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsDestination consumerDestination = new JmsTemporaryQueue("ConsumerDestination");
 
@@ -170,6 +183,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithoutTypeAnnotationWithTempTopicConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsDestination consumerDestination = new JmsTemporaryTopic("ConsumerDestination");
@@ -185,6 +200,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithQueueTypeAnnotationNoConsumerDestination() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(QUEUE_ATTRIBUTES_STRING);
 
@@ -199,6 +216,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsDestinationWithTopicTypeAnnotationNoConsumerDestination() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(TOPIC_ATTRIBUTES_STRING);
 
@@ -264,6 +283,9 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplyToWithoutTypeAnnotationWithQueueConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
+
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -279,6 +301,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplyToWithoutTypeAnnotationWithAnonymousConsumerDest() {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
 
@@ -296,6 +320,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplyToWithEmptyTypeAnnotationWithQueueConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("");
         JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -311,6 +337,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplyToWithUnknownTypeAnnotationWithQueueConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn("jms.queue");
         JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
@@ -326,6 +354,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplyToWithoutTypeAnnotationWithTopicConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsTopic consumerDestination = new JmsTopic("ConsumerDestination");
@@ -341,6 +371,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplyToWithoutTypeAnnotationWithTempQueueConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsTemporaryQueue consumerDestination = new JmsTemporaryQueue("ConsumerDestination");
@@ -356,6 +388,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplyToWithoutTypeAnnotationWithTempTopicConsumerDest() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(null);
         JmsTemporaryTopic consumerDestination = new JmsTemporaryTopic("ConsumerDestination");
@@ -371,6 +405,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplToWithQueueTypeAnnotationNoConsumerDestination() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(QUEUE_ATTRIBUTES_STRING);
 
@@ -385,6 +421,8 @@ public class AmqpDestinationHelperTest {
     public void testGetJmsReplToWithTopicTypeAnnotationNoConsumerDestination() throws Exception {
         String testAddress = "testAddress";
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
         Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
         Mockito.when(message.getMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME)).thenReturn(TOPIC_ATTRIBUTES_STRING);
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0e590e7c/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index efac92c..51f3825 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -101,6 +101,7 @@ public class TestAmqpPeer implements AutoCloseable
     private CountDownLatch _handlersCompletedLatch;
 
     private volatile int _nextLinkHandle = LINK_HANDLE_OFFSET;
+    private volatile int _tempDestLinkHandle = LINK_HANDLE_OFFSET;
 
     private byte[] _deferredBytes;
 
@@ -445,7 +446,7 @@ public class TestAmqpPeer implements AutoCloseable
                 .withSource(notNullValue())
                 .withTarget(targetMatcher);
 
-        UnsignedInteger linkHandle = UnsignedInteger.valueOf(_nextLinkHandle++);
+        UnsignedInteger linkHandle = UnsignedInteger.valueOf(_tempDestLinkHandle++);
         final AttachFrame attachResponse = new AttachFrame()
                             .setHandle(linkHandle)
                             .setRole(Role.RECEIVER)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org