You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/11 17:50:08 UTC

[2/3] qpid-jms git commit: QPIDJMS-220, QPIDJMS-207: initial work on support for shared topic subscriptions

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
new file mode 100644
index 0000000..3e08b56
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
@@ -0,0 +1,1480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    // -------------------------------------- //
+
+    @Test
+    public void testConstants() throws Exception {
+        assertEquals(Symbol.valueOf("SHARED-SUBS"), AmqpSupport.SHARED_SUBS);
+        assertEquals("|", AmqpSupport.SUB_NAME_DELIMITER);
+        assertEquals(Symbol.valueOf("shared"), AmqpSupport.SHARED);
+        assertEquals(Symbol.valueOf("global"), AmqpSupport.GLOBAL);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber detaches links with closed = false.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCloseSharedDurableTopicSubscriberDetachesWithCloseFalse() throws Exception {
+        doSharedTopicSubscriberDetachTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber detaches links with closed = true.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCloseSharedVolatileTopicSubscriberDetachesWithCloseTrue() throws Exception {
+        doSharedTopicSubscriberDetachTestImpl(false);
+    }
+
+    private void doSharedTopicSubscriberDetachTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber;
+            if (durable) {
+                subscriber = session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                subscriber = session.createSharedConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectDetach(!durable, true, !durable);
+            subscriber.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscribers name their links such that the first link is the
+     * bare subscription name, and subsquent links use a counter suffix to ensure they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubscriberLinkNames() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscribers names their links such that the subscription name
+     * is suffixed with a counter to ensure they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffix() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, true);
+    }
+
+    /**
+     * Verifies that on a connection without a ClientID, shared durable subscribers name their links
+     * such that the first link is the subscription name with 'global' suffix, and subsequent links
+     * additionally append a counter suffix to ensure they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubscriberLinkNamesNoClientID() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, false);
+    }
+
+    /**
+     * Verifies that on a connection without a ClientID, shared volatile subscribers names their links
+     * such that the subscription name is suffixed with a 'global' qualifier an counter to ensure
+     * they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffixNoClientID() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, false);
+    }
+
+    private void doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable, boolean useClientID) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection;
+            if(useClientID) {
+                connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            } else {
+                connection = testFixture.establishConnectonWithoutClientID(testPeer, serverCapabilities);
+            }
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver
+            if (durable) {
+                String linkName = subscriptionName;
+                if(!useClientID) {
+                    linkName += SUB_NAME_DELIMITER + "global";
+                }
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            } else {
+                String linkName;
+                if(useClientID) {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile1";
+                } else {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile1";
+                }
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName);
+            }
+
+            // Attach the second shared receiver, expect the link name to differ
+            if (durable) {
+                String linkName = subscriptionName;
+                if(useClientID) {
+                    linkName += SUB_NAME_DELIMITER + "2";
+                } else {
+                    linkName += SUB_NAME_DELIMITER + "global2";
+                }
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            } else {
+                String linkName;
+                if(useClientID) {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile2";
+                } else {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile2";
+                }
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscribers names their links on a per-connection basis, such that
+     * suffix counter on one connection is not impacted by subscriptions on another connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubsriberLinkNamesMultipleConnectionSubs() throws Exception {
+        doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscribers names their links on a per-connection basis, such that
+     * suffix counter on one connection is not impacted by subscriptions on another connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleConnectionSubs() throws Exception {
+        doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
+    }
+
+    private void doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer peer1 = new TestAmqpPeer();
+             TestAmqpPeer peer2 = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish first connection
+            Connection connection1 = testFixture.establishConnecton(peer1, serverCapabilities);
+            connection1.start();
+
+            peer1.expectBegin();
+            Session sessionConn1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Establish second connection
+            Connection connection2 = testFixture.establishConnecton(peer2, serverCapabilities);
+            connection2.start();
+
+            peer2.expectBegin();
+            Session sessionConn2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = sessionConn1.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            Matcher<?> durSubLinkNameMatcher = equalTo(subscriptionName);
+            Matcher<?> volatileSubLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+
+            // Attach the first connections shared receiver
+            if (durable) {
+                peer1.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true);
+            } else {
+                peer1.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true);
+            }
+            peer1.expectLinkFlow();
+
+            if (durable) {
+                sessionConn1.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                sessionConn1.createSharedConsumer(dest, subscriptionName);
+            }
+
+            // Attach the second connections shared receiver, expect the link name to be the same since its per-connection
+            if (durable) {
+                peer2.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true);
+            } else {
+                peer2.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true);
+            }
+            peer2.expectLinkFlow();
+
+            if (durable) {
+                sessionConn2.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                sessionConn2.createSharedConsumer(dest, subscriptionName);
+            }
+
+            peer1.expectClose();
+            connection1.close();
+
+            peer2.expectClose();
+            connection2.close();
+
+            peer1.waitForAllHandlersToComplete(1000);
+            peer2.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscribers names their links on a per-connection basis, such that
+     * suffix counter for one session is impacted by subscriptions on another session on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubsriberLinkNamesMultipleSessionSubs() throws Exception {
+        doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscribers names their links on a per-connection basis, such that
+     * suffix counter for one session is impacted by subscriptions on another session on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleSessionSubs() throws Exception {
+        doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
+    }
+
+    private void doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the first sessions shared receiver
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session1.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session1.createSharedConsumer(dest, subscriptionName);
+            }
+
+            // Attach the second sessions shared receiver, expect the link name to be different since its per-connection
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile2");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session2.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session2.createSharedConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that shared durable subscribers name their links on a per-subscription name basis, such that
+     * suffix counter for one subscription nameis not impacted by those for another subscription on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubsriberLinkNamesMultipleNamedSubs() throws Exception {
+        doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
+    }
+
+    /**
+     * Verifies that shared volatile subscribers name their links on a per-subscription name basis, such that
+     * suffix counter for one subscription name is not impacted by those for another subscription on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleNamedSubs() throws Exception {
+        doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
+    }
+
+    private void doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName1 = "mySubscription1";
+            String subscriptionName2 = "mySubscription2";
+
+            // Attach the first subscriptions shared receivers
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName1);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "2");
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile2");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName1);
+                session.createSharedDurableConsumer(dest, subscriptionName1);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName1);
+                session.createSharedConsumer(dest, subscriptionName1);
+            }
+
+            // Attach the first subscriptions shared receivers
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName2);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "2");
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile2");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName2);
+                session.createSharedDurableConsumer(dest, subscriptionName2);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName2);
+                session.createSharedConsumer(dest, subscriptionName2);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared volatile subscriber and shared durable subscriber with the same subscription name
+     * can be active on the same connection at the same time and names their links appropriately to distinguish
+     * themselves from each other.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableAndVolatileSubsCoexistUsingDistinctLinkNames() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the durable shared receiver
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            session.createSharedDurableConsumer(dest, subscriptionName);
+
+            // Attach the volatile shared receiver
+            Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+            testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            session.createSharedConsumer(dest, subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+     * attempt to create a shared durable subscriber fails.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedDurableTopicSubscriberFailsIfNotSupported() throws Exception {
+        doSharedTopicSubscriberSupportedTestImpl(true);
+    }
+
+    /**
+     * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+     * attempt to create a shared volatile subscriber fails.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupported() throws Exception {
+        doSharedTopicSubscriberSupportedTestImpl(true);
+    }
+
+    private void doSharedTopicSubscriberSupportedTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // DONT include server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            testPeer.expectClose();
+
+            try {
+                if (durable) {
+                    session.createSharedDurableConsumer(dest, subscriptionName);
+                } else {
+                    session.createSharedConsumer(dest, subscriptionName);
+                }
+
+                fail("Expected an exception to be thrown");
+            } catch (JMSException jmse) {
+                // expected
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name
+     * can't be active on the same connection at the same time (shared first, then exclusive).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedAndExclusiveDurableSubsCantCoexistSharedFirst() throws Exception {
+        doSharedAndExclusiveDurableSubsCantCoexistTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name
+     * can't be active on the same connection at the same time (exclusive first, then shared).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedAndExclusiveDurableSubsCantCoexistExclusiveFirst() throws Exception {
+        doSharedAndExclusiveDurableSubsCantCoexistTestImpl(false);
+    }
+
+    private void doSharedAndExclusiveDurableSubsCantCoexistTestImpl(boolean sharedFirst) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            if(sharedFirst) {
+                // Attach the durable shared receiver
+                Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+                testPeer.expectLinkFlow();
+
+                session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                // Attach the durable exclusive receiver
+                testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.expectLinkFlow();
+
+                session.createDurableConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectClose();
+
+            try {
+                if (sharedFirst) {
+                    // Now try to attach a durable exclusive receiver
+                    session.createDurableConsumer(dest, subscriptionName);
+                } else {
+                    // Now try to attach a durable shared receiver
+                    session.createSharedDurableConsumer(dest, subscriptionName);
+                }
+                fail("Expected to fail due to concurrent shared + non-shared durable sub attempt");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    @Test(timeout = 20000)
+    public void testExclusiveDurableSubCanOnlyBeActiveOnceAtATime() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the durable exclusive receiver
+            testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session.createDurableConsumer(dest, subscriptionName);
+
+            try {
+                // Now try to attach a second active durable exclusive receiver
+                session.createDurableConsumer(dest, subscriptionName);
+                fail("Expected to fail due to concurrent active subscriber attempt");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            // Now try to attach a new active durable exclusive receiver
+            testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2 = session.createDurableConsumer(dest, subscriptionName);
+            assertNotNull(subscriber2);
+
+            testPeer.expectClose();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a upon failure to locate a subscription link during an
+     * unsubscribe attempt an [InvalidDestination] exception is thrown.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeNonExistingSubscription() throws Exception {
+        doUnsubscribeNonExistingSubscriptionTestImpl(true);
+    }
+
+    /**
+     * Verifies that a upon failure to locate a subscription link during an
+     * unsubscribe attempt an [InvalidDestination] exception is thrown, but
+     * this time using a connection without a ClientID set (and thus
+     * expecting a different link name to be used).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeNonExistingSubscriptionWithoutClientID() throws Exception {
+        doUnsubscribeNonExistingSubscriptionTestImpl(false);
+    }
+
+    private void doUnsubscribeNonExistingSubscriptionTestImpl(boolean hasClientID) throws JMSException, InterruptedException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection;
+            if(hasClientID) {
+                connection = testFixture.establishConnecton(testPeer);
+            } else {
+                connection = testFixture.establishConnectonWithoutClientID(testPeer, null);
+            }
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String subscriptionName = "myInvalidSubscriptionName";
+            // Try to unsubscribe non-existing subscription
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(true, false, subscriptionName, null, hasClientID);
+            testPeer.expectDetach(true, true, true);
+
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a InvalidDestinationException");
+            } catch (InvalidDestinationException ide) {
+                // Expected
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /**
+     * Verifies that a upon attempt to unsubscribe a subscription that is
+     * currently in active use by a subscriber, an exception is thrown,
+     * and that once the subscriber is closed a further unsubscribe attempt
+     * is successfully undertaken.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeExclusiveDurableSubWhileActiveThenInactive() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the durable exclusive receiver
+            testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+            testPeer.expectLinkFlow();
+
+            TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
+            assertNotNull("TopicSubscriber object was null", subscriber);
+
+            // Now try to unsubscribe, should fail
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+            }
+
+            // Now close the subscriber
+            testPeer.expectDetach(false, true, false);
+
+            subscriber.close();
+
+            // Try to unsubscribe again, should work now
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session.unsubscribe(subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /**
+     * Verifies that a upon attempt to unsubscribe a shared durable subscription that is
+     * currently in active use by multiple subscribers, an exception is thrown, and that
+     * only once the last subscriber is closed is an unsubscribe attempt successful.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeSharedDurableSubWhileActiveThenInactive() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the first durable shared receiver
+            Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session.createSharedDurableConsumer(dest, subscriptionName);
+
+            // Attach the second durable shared receiver
+            linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2 = session.createSharedDurableConsumer(dest, subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to unsubscribe, should fail
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again, should still fail
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now close the second subscriber
+            testPeer.expectDetach(false, true, false);
+
+            subscriber2.close();
+
+            // Try to unsubscribe again, should now work
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session.unsubscribe(subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a upon a failed attempt to create a shared subscriber that it
+     * is possible to unsubscribe (not that it should be required), i.e that the
+     * failed creation does not leave an incorrect recording of an active subscriber.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeAfterFailedCreation() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Fail to attach a shared durable receiver
+            Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, true);
+            testPeer.expectDetach(true, false, true);
+
+            try {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Try to unsubscribe, should be able to (strictly speaking an unsub attempt
+            // would probably fail normally, due to no subscription, but this test
+            // doesn't care about that, just that the attempt proceeds, so overlook that.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session.unsubscribe(subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /**
+     * Verifies that subscriber cleanup occurs when the subscriber is remotely closed (after creation).
+     */
+    @Test(timeout = 20000)
+    public void testRemotelyDetachLinkWithDurableSharedConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            final CountDownLatch subscriberClosed = new CountDownLatch(1);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConsumerClosed(MessageConsumer consumer, Exception exception) {
+                    subscriberClosed.countDown();
+                }
+            });
+
+            // Create first session
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver on the first session
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest,  subscriptionName);
+
+            // Create second session
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Attach the second shared receiver on the second session
+            durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+
+            testPeer.expectLinkFlow();
+
+            // Then remotely detach the link after the flow is received.
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, false, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteDetach");
+
+            MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest,  subscriptionName);
+            assertNotNull(subscriber2);
+
+            assertTrue("Consumer closed callback didn't trigger", subscriberClosed.await(5, TimeUnit.SECONDS));
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to unsubscribe (using first session). It should fail due to sub still
+            // being in use on the first session. No frames should be sent.
+            try {
+                session1.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again (using first session), should now work.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session1.unsubscribe(subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscriber creation using the same subscription
+     * name but different topic than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubChangeOfTopic() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(true, true, false);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber creation using the same subscription
+     * name but different topic than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubChangeOfTopic() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(false, true, false);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber creation using the same subscription
+     * name but different selector than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubChangeOfSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(true, false, true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber creation using the same subscription
+     * name but different selector than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubChangeOfSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(false, false, true);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber creation using the same subscription
+     * name but different topic and selector than an existing subscriber can't be active
+     * on the same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubChangeOfTopicAndSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(true, true, true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber creation using the same subscription
+     * name but different topic and selector than an existing subscriber can't be active
+     * on the same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubChangeOfTopicAndSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(false, true, true);
+    }
+
+    private void doSharedSubChangeOfDetailsTestImpl(boolean durable, boolean changeTopic, boolean changeSelector) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName1 = "myTopic";
+            String topicName2 = topicName1;
+            if (changeTopic) {
+                topicName2 += "2";
+            }
+
+            String noSelectorNull = null;
+            String selector2 = null;
+            if (changeSelector) {
+                selector2 = "someProperty = 2";
+            }
+
+            Topic dest1 = session.createTopic(topicName1);
+            Topic dest2 = session.createTopic(topicName2);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver
+            if (durable) {
+                Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName1, subscriptionName, durableLinkNameMatcher, true);
+            } else {
+                Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName1, subscriptionName, volatileLinkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1;
+            if (durable) {
+                subscriber1 = session.createSharedDurableConsumer(dest1, subscriptionName, noSelectorNull);
+            } else {
+                subscriber1 = session.createSharedConsumer(dest1, subscriptionName, noSelectorNull);
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to create another subscriber, same name but different topic. Should fail. No frames should be sent.
+            try {
+                if (durable) {
+                    session.createSharedDurableConsumer(dest2, subscriptionName, selector2);
+                } else {
+                    session.createSharedConsumer(dest2, subscriptionName, selector2);
+                }
+
+                fail("Expected to fail due to attempting change of subscription details while subscriber is active");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            if (durable) {
+                testPeer.expectDetach(false, true, false);
+            } else {
+                testPeer.expectDetach(true, true, true);
+            }
+            subscriber1.close();
+
+            // Now try a new subscriber again, with changed topic, it should succeed (note also the verified reuse of link names).
+            if (durable) {
+                Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName2, subscriptionName, durableLinkNameMatcher, true);
+            } else {
+                Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName2, subscriptionName, volatileLinkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2;
+            if (durable) {
+                subscriber2 = session.createSharedDurableConsumer(dest2, subscriptionName, selector2);
+            } else {
+                subscriber2 = session.createSharedConsumer(dest2, subscriptionName, selector2);
+            }
+            assertNotNull(subscriber2);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that subscriber cleanup occurs when the session it is on is remotely closed.
+     */
+    @Test(timeout = 20000)
+    public void testRemotelyEndSessionWithDurableSharedConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Exception exception) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            // Create first session
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver on the first session
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest,  subscriptionName);
+
+            // Create second session
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Attach the second shared receiver on the second session
+            durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+
+            testPeer.expectLinkFlow();
+
+            // Then remotely end the session (and thus the subscriber along with it) after the flow is received.
+            testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteClosure");
+
+            MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest,  subscriptionName);
+            assertNotNull(subscriber2);
+
+            assertTrue("Session closed callback didn't trigger", sessionClosed.await(5, TimeUnit.SECONDS));
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to unsubscribe (using first session, still open). It should fail due to sub still
+            // being in use on the first session. No frames should be sent.
+            try {
+                session1.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again (using first session, still open), should now work.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session1.unsubscribe(subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    /**
+     * Verifies that subscriber cleanup occurs when the session it is on is locally closed.
+     */
+    @Test(timeout = 20000)
+    public void testLocallyEndSessionWithSharedConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Exception exception) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            // Create first session
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver on the first session
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest,  subscriptionName);
+
+            // Create second session
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Attach the second shared receiver on the second session
+            durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest,  subscriptionName);
+            assertNotNull(subscriber2);
+
+            // Now close the second session (and thus the subscriber along with it).
+            testPeer.expectEnd();
+            session2.close();
+
+            // Now try to unsubscribe (using first session, still open). It should fail due to sub still
+            // being in use on the first session. No frames should be sent.
+            try {
+                session1.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again (using first session, still open), should now work.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session1.unsubscribe(subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that subscription name passed is not allowed to have the subscription name
+     * delimiter used in the receiver link naming to separate the subscription name from
+     * a suffix used to ensure unique link names are used on a connection.
+     */
+    @Test(timeout = 20000)
+    public void testSubscriptionNameNotAllowedToHaveNameSeparator() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            // Create session
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Try the various methods that take subscription name with a name that
+            // contains the delimiter, they should fail. No frames should be sent.
+            String subscriptionName = "thisName|hasTheDelimiterAlready";
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            try {
+                session.createDurableSubscriber(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.createDurableConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.createSharedConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now close connection, should work.
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
index 0511840..7ac99f8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
@@ -68,7 +68,7 @@ public class JmsConnectionInfoTest {
         JmsConnectionInfo info = new JmsConnectionInfo(firstId);
 
         info.setForceSyncSend(true);
-        info.setClientId("test");
+        info.setClientId("test", true);
         info.setCloseTimeout(100);
         info.setConnectTimeout(200);
         info.setForceAsyncSends(true);
@@ -89,6 +89,7 @@ public class JmsConnectionInfoTest {
 
         assertEquals(true, copy.isForceSyncSend());
         assertEquals("test", copy.getClientId());
+        assertEquals(true, copy.isExplicitClientID());
         assertEquals(100, copy.getCloseTimeout());
         assertEquals(200, copy.getConnectTimeout());
         assertEquals(true, copy.isForceAsyncSend());
@@ -161,6 +162,14 @@ public class JmsConnectionInfoTest {
     }
 
     @Test
+    public void testIsExplicitClientId() {
+        final JmsConnectionInfo info = new JmsConnectionInfo(firstId);
+        assertFalse(info.isExplicitClientID());
+        info.setClientId("something", true);
+        assertTrue(info.isExplicitClientID());
+    }
+
+    @Test
     public void testGetEncodedUsername() {
         final JmsConnectionInfo info = new JmsConnectionInfo(firstId);
         info.setUsername("user");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
index 9bca41e..d4fdcc5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
@@ -85,12 +86,14 @@ public class JmsConsumerInfoTest {
 
         info.setAcknowledgementMode(1);
         info.setBrowser(true);
-        info.setClientId("test");
+        info.setExplicitClientID(true);
         info.setDestination(new JmsTopic("Test"));
         info.setLastDeliveredSequenceId(42);
         info.setNoLocal(true);
         info.setPrefetchSize(123456);
         info.setSelector("select");
+        info.setDurable(true);
+        info.setShared(true);
         info.setSubscriptionName("name");
         info.setRedeliveryPolicy(new JmsDefaultRedeliveryPolicy());
         info.setListener(true);
@@ -99,10 +102,12 @@ public class JmsConsumerInfoTest {
 
         assertEquals(1, copy.getAcknowledgementMode());
         assertEquals(true, copy.isBrowser());
-        assertEquals("test", copy.getClientId());
+        assertEquals(true, copy.isExplicitClientID());
         assertEquals(new JmsTopic("Test"), copy.getDestination());
         assertEquals(42, copy.getLastDeliveredSequenceId());
         assertEquals(true, copy.isNoLocal());
+        assertEquals(true, copy.isDurable());
+        assertEquals(true, copy.isShared());
         assertEquals(123456, copy.getPrefetchSize());
         assertEquals("select", copy.getSelector());
         assertEquals("name", copy.getSubscriptionName());
@@ -116,11 +121,37 @@ public class JmsConsumerInfoTest {
     public void testIsDurable() {
         JmsConsumerInfo info = new JmsConsumerInfo(firstId);
         assertFalse(info.isDurable());
-        info.setSubscriptionName("name");
+        info.setDurable(true);
         assertTrue(info.isDurable());
     }
 
     @Test
+    public void testIsExplicitClientID() {
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        assertFalse(info.isExplicitClientID());
+        info.setExplicitClientID(true);
+        assertTrue(info.isExplicitClientID());
+    }
+
+    @Test
+    public void testIsShared() {
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        assertFalse(info.isShared());
+        info.setShared(true);
+        assertTrue(info.isShared());
+    }
+
+    @Test
+    public void testGetSubscriptionName() {
+        String subName = "name";
+
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        assertNull(info.getSubscriptionName());
+        info.setSubscriptionName(subName);
+        assertEquals(subName, info.getSubscriptionName());
+    }
+
+    @Test
     public void testCompareTo() {
         JmsConsumerInfo first = new JmsConsumerInfo(firstId);
         JmsConsumerInfo second = new JmsConsumerInfo(secondId);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
new file mode 100644
index 0000000..2a7c893
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.junit.Test;
+
+public class AmqpSubscriptionTrackerTest {
+
+    private AtomicInteger consumerIdCounter = new AtomicInteger();
+
+    private JmsConsumerInfo createConsumerInfo(String subscriptionName, String topicName, boolean shared, boolean durable, boolean hasClientID) {
+        return createConsumerInfo(subscriptionName, topicName, shared, durable, null, hasClientID);
+    }
+
+    private JmsConsumerInfo createConsumerInfo(String subscriptionName, String topicName, boolean shared, boolean durable, String selector, boolean isExplicitClientID) {
+        JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, consumerIdCounter .incrementAndGet());
+        JmsTopic topic = new JmsTopic(topicName);
+
+        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId);
+
+        consumerInfo.setSubscriptionName(subscriptionName);
+        consumerInfo.setDestination(topic);
+        consumerInfo.setShared(shared);
+        consumerInfo.setDurable(durable);
+        consumerInfo.setSelector(selector);
+        consumerInfo.setExplicitClientID(isExplicitClientID);
+
+        return consumerInfo;
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedDurable() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, true, true);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedDurableWithoutClientID() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, true, false);
+        assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, true, false);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, true, false);
+        assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, true, false);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, true, false);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "global3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedVolatile() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, false, true);
+        assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, false, true);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, false, true);
+        assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, false, true);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, false, true);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedVolatileWithoutClientID() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, false, false);
+        assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global-volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, false, false);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global-volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, false, false);
+        assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, false, false);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, false, false);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+    @Test
+    public void testReserveNextSubscriptionLinkNameExclusiveDurable() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, false, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        // This shouldn't happen, checks elsewhere should stop requests for an exclusive durable sub link
+        // name if its already in use, but check we get the same name anyway even with an existing registration.
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, false, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, false, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        // This shouldn't happen, checks elsewhere should stop requests for an exclusive durable sub link
+        // name if its already in use, but check we get the same name anyway even with an existing registration.
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, false, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameExclusiveNonDurable() {
+        String topicName = "myTopic";
+        String subscriptionName = "mySubscription";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName, topicName, false, false, true);
+        try {
+            tracker.reserveNextSubscriptionLinkName(subscriptionName, subInfo);
+            fail("Should have thrown exception, tracker doesn't name these subs");
+        } catch (IllegalStateException ise) {
+            // Expected
+        }
+
+        // Verify it no-ops with an exclusive non-durable sub info
+        tracker.consumerRemoved(subInfo);
+    }
+
+    @Test
+    public void testIsActiveExclusiveDurableSub() {
+        String subscriptionName1 = "mySubscription";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker.isActiveExclusiveDurableSub(subscriptionName1));
+
+        JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName1, "myTopic", false, true, true);
+        tracker.reserveNextSubscriptionLinkName(subscriptionName1, subInfo);
+
+        assertTrue(tracker.isActiveExclusiveDurableSub(subscriptionName1));
+
+        tracker.consumerRemoved(subInfo);
+
+        assertFalse(tracker.isActiveExclusiveDurableSub(subscriptionName1));
+    }
+
+    @Test
+    public void testIsActiveSharedDurableSub() {
+        String subscriptionName1 = "mySubscription";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker.isActiveSharedDurableSub(subscriptionName1));
+
+        JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName1, "myTopic", true, true, true);
+        tracker.reserveNextSubscriptionLinkName(subscriptionName1, subInfo);
+
+        assertTrue(tracker.isActiveSharedDurableSub(subscriptionName1));
+
+        tracker.consumerRemoved(subInfo);
+
+        assertFalse(tracker.isActiveSharedDurableSub(subscriptionName1));
+    }
+
+    @Test
+    public void testIsActiveDurableSub() {
+        String subscriptionName = "mySubscription";
+
+        // Test when an exclusive durable sub is active
+        AmqpSubscriptionTracker tracker1 = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker1.isActiveDurableSub(subscriptionName));
+
+        JmsConsumerInfo subInfo1 = createConsumerInfo(subscriptionName, "myTopic", false, true, true);
+        tracker1.reserveNextSubscriptionLinkName(subscriptionName, subInfo1);
+
+        assertTrue(tracker1.isActiveDurableSub(subscriptionName));
+        assertTrue(tracker1.isActiveExclusiveDurableSub(subscriptionName));
+        assertFalse(tracker1.isActiveSharedDurableSub(subscriptionName));
+
+        tracker1.consumerRemoved(subInfo1);
+
+        assertFalse(tracker1.isActiveDurableSub(subscriptionName));
+        assertFalse(tracker1.isActiveExclusiveDurableSub(subscriptionName));
+        assertFalse(tracker1.isActiveSharedDurableSub(subscriptionName));
+
+        // Test when an shared durable sub is active
+        AmqpSubscriptionTracker tracker2 = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker2.isActiveDurableSub(subscriptionName));
+
+        JmsConsumerInfo subInfo2 = createConsumerInfo(subscriptionName, "myTopic", true, true, true);
+        tracker2.reserveNextSubscriptionLinkName(subscriptionName, subInfo2);
+
+        assertTrue(tracker2.isActiveDurableSub(subscriptionName));
+        assertFalse(tracker2.isActiveExclusiveDurableSub(subscriptionName));
+        assertTrue(tracker2.isActiveSharedDurableSub(subscriptionName));
+
+        tracker2.consumerRemoved(subInfo2);
+
+        assertFalse(tracker2.isActiveDurableSub(subscriptionName));
+        assertFalse(tracker2.isActiveExclusiveDurableSub(subscriptionName));
+        assertFalse(tracker2.isActiveSharedDurableSub(subscriptionName));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org