You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/11 17:50:07 UTC

[1/3] qpid-jms git commit: QPIDJMS-220, QPIDJMS-207: initial work on support for shared topic subscriptions

Repository: qpid-jms
Updated Branches:
  refs/heads/master 06a721625 -> 952de60ae


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 64f7023..363c76d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -19,6 +19,7 @@
 package org.apache.qpid.jms.test.testpeer;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
@@ -41,6 +42,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.jms.provider.amqp.AmqpSupport;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.ReceiverSettleMode;
 import org.apache.qpid.jms.test.testpeer.basictypes.Role;
 import org.apache.qpid.jms.test.testpeer.basictypes.SenderSettleMode;
@@ -1060,7 +1062,14 @@ public class TestAmqpPeer implements AutoCloseable
         expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, false, deferAttachResponseWrite, null, null);
     }
 
-    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
+    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
+                                     boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
+    {
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null);
+    }
+
+    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
+                                     boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride)
     {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(linkNameMatcher)
@@ -1092,6 +1101,8 @@ public class TestAmqpPeer implements AutoCloseable
                 attachResponse.setTarget(attachMatcher.getReceivedTarget());
                 if(refuseLink) {
                     attachResponse.setSource(null);
+                } else if(responseSourceOverride != null){
+                    attachResponse.setSource(responseSourceOverride);
                 } else {
                     attachResponse.setSource(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource()));
                 }
@@ -1143,6 +1154,44 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(attachMatcher);
     }
 
+    public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
+        expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, false, clientIdSet);
+    }
+
+    public void expectSharedVolatileSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
+        expectSharedSubscriberAttach(topicName, subscriptionName, false, linkNameMatcher, false, clientIdSet);
+    }
+
+    public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet) {
+        expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, refuseLink, clientIdSet);
+    }
+
+    private void expectSharedSubscriberAttach(String topicName, String subscriptionName, boolean durable, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet)
+    {
+        Symbol[] sourceCapabilities;
+        if(clientIdSet) {
+            sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED };
+        } else {
+            sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED, AmqpSupport.GLOBAL };
+        }
+
+        SourceMatcher sourceMatcher = new SourceMatcher();
+        sourceMatcher.withAddress(equalTo(topicName));
+        sourceMatcher.withDynamic(equalTo(false));
+        if(durable) {
+            //TODO: will possibly be changed to a 1/config durability
+            sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE));
+            sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER));
+        } else {
+            sourceMatcher.withDurable(equalTo(TerminusDurability.NONE));
+            sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH));
+        }
+
+        sourceMatcher.withCapabilities(arrayContaining(sourceCapabilities));
+
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, refuseLink, false);
+    }
+
     public void expectDurableSubscriberAttach(String topicName, String subscriptionName)
     {
         SourceMatcher sourceMatcher = new SourceMatcher();
@@ -1152,9 +1201,43 @@ public class TestAmqpPeer implements AutoCloseable
         sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE));
         sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER));
 
+        sourceMatcher.withCapabilities(arrayContaining(AmqpDestinationHelper.TOPIC_CAPABILITY));
+
         expectReceiverAttach(equalTo(subscriptionName), sourceMatcher);
     }
 
+    public void expectDurableSubUnsubscribeNullSourceLookup(boolean failLookup, boolean shared, String subscriptionName, String topicName, boolean hasClientID) {
+        String linkName = subscriptionName;
+        if(!hasClientID) {
+            linkName += AmqpSupport.SUB_NAME_DELIMITER + "global";
+        }
+
+        Matcher<String> linkNameMatcher = equalTo(linkName);
+        Matcher<Object> nullSourceMatcher = nullValue();
+
+        Source responseSourceOverride = null;
+        Symbol errorType = null;
+        String errorMessage = null;
+
+        if(failLookup){
+            errorType = AmqpError.NOT_FOUND;
+            errorMessage = "No subscription link found";
+        } else {
+            responseSourceOverride = new Source();
+            responseSourceOverride.setAddress(topicName);
+            responseSourceOverride.setDynamic(false);
+            //TODO: will possibly be changed to a 1/config durability
+            responseSourceOverride.setDurable(TerminusDurability.UNSETTLED_STATE);
+            responseSourceOverride.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+
+            if(shared) {
+                responseSourceOverride.setCapabilities(new Symbol[]{SHARED_SUBS});
+            }
+        }
+
+        expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride);
+    }
+
     public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed)
     {
         Matcher<Boolean> closeMatcher = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-jms git commit: QPIDJMS-220, QPIDJMS-207: initial work on support for shared topic subscriptions

Posted by ro...@apache.org.
QPIDJMS-220, QPIDJMS-207: initial work on support for shared topic subscriptions


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/952de60a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/952de60a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/952de60a

Branch: refs/heads/master
Commit: 952de60aec6abb295d5285c3e4474a04014b1038
Parents: 06a7216
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Nov 11 17:48:41 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Nov 11 17:48:41 2016 +0000

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   10 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |    8 +-
 .../java/org/apache/qpid/jms/JmsSession.java    |   22 +-
 .../jms/JmsSharedDurableMessageConsumer.java    |   36 +
 .../qpid/jms/JmsSharedMessageConsumer.java      |   36 +
 .../apache/qpid/jms/meta/JmsConnectionInfo.java |    9 +-
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |   36 +-
 .../qpid/jms/provider/amqp/AmqpConnection.java  |   20 +-
 .../provider/amqp/AmqpConnectionProperties.java |   23 +
 .../provider/amqp/AmqpConnectionSession.java    |   33 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |    9 +
 .../qpid/jms/provider/amqp/AmqpProvider.java    |   56 +-
 .../qpid/jms/provider/amqp/AmqpSession.java     |   19 -
 .../provider/amqp/AmqpSubscriptionTracker.java  |  285 ++++
 .../qpid/jms/provider/amqp/AmqpSupport.java     |    6 +
 .../amqp/builders/AmqpConsumerBuilder.java      |   83 +-
 .../amqp/builders/AmqpResourceBuilder.java      |   15 +
 .../org/apache/qpid/jms/JmsSessionTest.java     |   51 +-
 .../jms/integration/IntegrationTestFixture.java |    4 +
 .../SubscriptionsIntegrationTest.java           | 1480 ++++++++++++++++++
 .../qpid/jms/meta/JmsConnectionInfoTest.java    |   11 +-
 .../qpid/jms/meta/JmsConsumerInfoTest.java      |   37 +-
 .../amqp/AmqpSubscriptionTrackerTest.java       |  285 ++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |   85 +-
 24 files changed, 2555 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 33cb169..82bdeff 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -101,7 +101,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     private final ThreadPoolExecutor executor;
 
     private volatile IOException firstFailureError;
-    private boolean clientIdSet;
     private ExceptionListener exceptionListener;
     private JmsMessageFactory messageFactory;
     private Provider provider;
@@ -295,7 +294,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public synchronized void setClientID(String clientID) throws JMSException {
         checkClosedOrFailed();
 
-        if (clientIdSet) {
+        if (connectionInfo.isExplicitClientID()) {
             throw new IllegalStateException("The clientID has already been set");
         }
         if (clientID == null || clientID.isEmpty()) {
@@ -305,8 +304,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
             throw new IllegalStateException("Cannot set the client id once connected.");
         }
 
-        this.connectionInfo.setClientId(clientID);
-        this.clientIdSet = true;
+        this.connectionInfo.setClientId(clientID, true);
 
         // We weren't connected if we got this far, we should now connect to ensure the
         // configured clientID is valid.
@@ -478,7 +476,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
             }
 
             if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) {
-                connectionInfo.setClientId(clientIdGenerator.generateId());
+                connectionInfo.setClientId(clientIdGenerator.generateId(), false);
             }
 
             createResource(connectionInfo);
@@ -560,7 +558,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     protected synchronized boolean isExplicitClientID() {
-        return clientIdSet;
+        return connectionInfo.isExplicitClientID();
     }
 
     //----- Provider interface methods ---------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b224574..3d2fba9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -93,9 +93,11 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy();
 
         consumerInfo = new JmsConsumerInfo(consumerId);
-        consumerInfo.setClientId(connection.getClientID());
+        consumerInfo.setExplicitClientID(connection.isExplicitClientID());
         consumerInfo.setSelector(selector);
+        consumerInfo.setDurable(isDurableSubscription());
         consumerInfo.setSubscriptionName(name);
+        consumerInfo.setShared(isSharedSubscription());
         consumerInfo.setDestination(destination);
         consumerInfo.setAcknowledgementMode(acknowledgementMode);
         consumerInfo.setNoLocal(noLocal);
@@ -639,6 +641,10 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         return false;
     }
 
+    public boolean isSharedSubscription() {
+        return false;
+    }
+
     public boolean isBrowser() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index e6c85cc..a3af20d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -510,8 +510,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     @Override
     public MessageConsumer createSharedConsumer(Topic topic, String name) throws JMSException {
         checkClosed();
-        // TODO Auto-generated method stub
-        throw new JMSException("Not yet implemented");
+        return createSharedConsumer(topic, name, null);
     }
 
     /**
@@ -520,8 +519,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     @Override
     public MessageConsumer createSharedConsumer(Topic topic, String name, String selector) throws JMSException {
         checkClosed();
-        // TODO Auto-generated method stub
-        throw new JMSException("Not yet implemented");
+        checkDestination(topic);
+        selector = checkSelector(selector);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+        JmsMessageConsumer result = new JmsSharedMessageConsumer(getNextConsumerId(), this, dest, name, selector);
+        result.init();
+        return result;
     }
 
     /**
@@ -530,8 +533,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     @Override
     public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
         checkClosed();
-        // TODO Auto-generated method stub
-        throw new JMSException("Not yet implemented");
+        return createSharedDurableConsumer(topic, name, null);
     }
 
     /**
@@ -540,8 +542,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     @Override
     public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String selector) throws JMSException {
         checkClosed();
-        // TODO Auto-generated method stub
-        throw new JMSException("Not yet implemented");
+        checkDestination(topic);
+        selector = checkSelector(selector);
+        JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+        JmsMessageConsumer result = new JmsSharedDurableMessageConsumer(getNextConsumerId(), this, dest, name, selector);
+        result.init();
+        return result;
     }
 
     //////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java
new file mode 100644
index 0000000..8229ef5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a JmsMessageConsumer that is part of a Shared Durable Subscription
+ */
+public class JmsSharedDurableMessageConsumer extends JmsSharedMessageConsumer implements AutoCloseable {
+
+    public JmsSharedDurableMessageConsumer(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, String selector) throws JMSException {
+        super(id, s, destination, name, selector);
+    }
+
+    @Override
+    public boolean isDurableSubscription() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java
new file mode 100644
index 0000000..7aa09db
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a JmsMessageConsumer that is part of a Shared Subscription
+ */
+public class JmsSharedMessageConsumer extends JmsMessageConsumer implements AutoCloseable {
+
+    public JmsSharedMessageConsumer(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, String selector) throws JMSException {
+        super(id, s, destination, name, selector, false);
+    }
+
+    @Override
+    public boolean isSharedSubscription() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index bc723c0..9768132 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -47,6 +47,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
     private URI configuredURI;
     private URI connectedURI;
     private String clientId;
+    private boolean explicitClientID;
     private String username;
     private String password;
     private boolean forceAsyncSend;
@@ -89,6 +90,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
 
     private void copy(JmsConnectionInfo copy) {
         copy.clientId = clientId;
+        copy.explicitClientID = explicitClientID;
         copy.username = username;
         copy.password = password;
         copy.forceAsyncSend = forceAsyncSend;
@@ -148,8 +150,13 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
         return clientId;
     }
 
-    public void setClientId(String clientId) {
+    public void setClientId(String clientId, boolean explicitClientID) {
         this.clientId = clientId;
+        this.explicitClientID = explicitClientID;
+    }
+
+    public boolean isExplicitClientID() {
+        return explicitClientID;
     }
 
     public String getUsername() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index b22b3b5..e8f793f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -29,8 +29,10 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
     private int prefetchSize;
     private boolean browser;
     private String selector;
-    private String clientId;
+    private boolean explicitClientID;
     private String subscriptionName;
+    private boolean durable;
+    private boolean shared;
     private boolean noLocal;
     private int acknowledgementMode;
     private boolean localMessageExpiry;
@@ -69,9 +71,11 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         info.prefetchSize = prefetchSize;
         info.browser = browser;
         info.selector = selector;
-        info.clientId = clientId;
+        info.explicitClientID = explicitClientID;
+        info.durable = durable;
         info.subscriptionName = subscriptionName;
         info.noLocal = noLocal;
+        info.shared = shared;
         info.acknowledgementMode = acknowledgementMode;
         info.lastDeliveredSequenceId = lastDeliveredSequenceId;
         info.redeliveryPolicy = getRedeliveryPolicy().copy();
@@ -79,10 +83,6 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         info.listener = listener;
     }
 
-    public boolean isDurable() {
-        return subscriptionName != null;
-    }
-
     @Override
     public JmsConsumerId getId() {
         return consumerId;
@@ -128,12 +128,20 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         this.selector = selector;
     }
 
-    public String getClientId() {
-        return clientId;
+    public boolean isExplicitClientID() {
+        return explicitClientID;
+    }
+
+    public void setExplicitClientID(boolean explicitClientID) {
+        this.explicitClientID = explicitClientID;
+    }
+
+    public boolean isDurable() {
+        return durable;
     }
 
-    public void setClientId(String clientId) {
-        this.clientId = clientId;
+    public void setDurable(boolean durable) {
+        this.durable = durable;
     }
 
     public String getSubscriptionName() {
@@ -144,6 +152,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
         this.subscriptionName = durableSubscriptionId;
     }
 
+    public boolean isShared() {
+        return shared;
+    }
+
+    public void setShared(boolean shared) {
+        this.shared = shared;
+    }
+
     public boolean isNoLocal() {
         return noLocal;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index cf4e441..e7c9b62 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -41,6 +41,8 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
 
+    private AmqpSubscriptionTracker subTracker = new AmqpSubscriptionTracker();
+
     private final AmqpJmsMessageFactory amqpMessageFactory;
 
     private final URI remoteURI;
@@ -80,15 +82,15 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
     }
 
     public void unsubscribe(String subscriptionName, AsyncResult request) {
-
-        for (AmqpSession session : sessions.values()) {
-            if (session.containsSubscription(subscriptionName)) {
-                request.onFailure(new JMSException("Cannot remove an active durable subscription"));
-                return;
-            }
+        // Check if there is an active (i.e open subscriber) shared or exclusive durable subscription using this name
+        if(subTracker.isActiveDurableSub(subscriptionName)) {
+            request.onFailure(new JMSException("Can't remove an active durable subscription: " + subscriptionName));
+            return;
         }
 
-        connectionSession.unsubscribe(subscriptionName, request);
+        boolean hasClientID = getResourceInfo().isExplicitClientID();
+
+        connectionSession.unsubscribe(subscriptionName, hasClientID, request);
     }
 
     @Override
@@ -221,6 +223,10 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
         return properties;
     }
 
+    public AmqpSubscriptionTracker getSubTracker() {
+        return subTracker;
+    }
+
     /**
      * Allows a connection resource to schedule a task for future execution.
      *

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index c090853..815104a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -20,6 +20,7 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX;
 
 import java.util.Arrays;
@@ -44,6 +45,7 @@ public class AmqpConnectionProperties {
 
     private boolean delayedDeliverySupported = false;
     private boolean anonymousRelaySupported = false;
+    private boolean sharedSubsSupported = false;
     private boolean connectionOpenFailed = false;
 
     /**
@@ -84,6 +86,10 @@ public class AmqpConnectionProperties {
         if (list.contains(DELAYED_DELIVERY)) {
             delayedDeliverySupported = true;
         }
+
+        if (list.contains(SHARED_SUBS)) {
+            sharedSubsSupported = true;
+        }
     }
 
     protected void processProperties(Map<Symbol, Object> properties) {
@@ -110,6 +116,23 @@ public class AmqpConnectionProperties {
     }
 
     /**
+     * @return true if the connection supports shared subscriptions features.
+     */
+    public boolean isSharedSubsSupported() {
+        return sharedSubsSupported;
+    }
+
+    /**
+     * Sets if the connection supports shared subscriptions features.
+     *
+     * @param sharedSubsSupported
+     *      true if the shared subscriptions features are supported.
+     */
+    public void setSharedSubsSupported(boolean sharedSubsSupported) {
+        this.sharedSubsSupported = sharedSubsSupported;
+    }
+
+    /**
      * @return true if the connection supports sending message with delivery delays.
      */
     public boolean isDelayedDeliverySupported() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 0a7c689..7ca27aa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -63,12 +63,17 @@ public class AmqpConnectionSession extends AmqpSession {
      *
      * @param subscriptionName
      *        the subscription name that is to be removed.
+     * @param hasClientID
+     *        whether the connection has a clientID set.
      * @param request
      *        the request that awaits the completion of this action.
      */
-    public void unsubscribe(String subscriptionName, AsyncResult request) {
-        DurableSubscriptionReattachBuilder builder = new DurableSubscriptionReattachBuilder(this, getResourceInfo(), subscriptionName);
-        DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(builder, request);
+    public void unsubscribe(String subscriptionName, boolean hasClientID, AsyncResult request) {
+        AmqpSubscriptionTracker subTracker = getConnection().getSubTracker();
+        String linkName = subTracker.getFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
+
+        DurableSubscriptionReattachBuilder builder = new DurableSubscriptionReattachBuilder(this, getResourceInfo(), linkName);
+        DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(subscriptionName, builder, request);
         pendingUnsubs.put(subscriptionName, subscribeRequest);
 
         LOG.debug("Attempting remove of subscription: {}", subscriptionName);
@@ -81,24 +86,24 @@ public class AmqpConnectionSession extends AmqpSession {
             super(resource, receiver, parent);
         }
 
-        public String getSubscriptionName() {
+        public String getLinkName() {
             return getEndpoint().getName();
         }
     }
 
     private final class DurableSubscriptionReattachBuilder extends AmqpResourceBuilder<DurableSubscriptionReattach, AmqpSession, JmsSessionInfo, Receiver> {
 
-        private final String subscriptionName;
+        private final String linkName;
 
-        public DurableSubscriptionReattachBuilder(AmqpSession parent, JmsSessionInfo resourceInfo, String subscriptionName) {
+        public DurableSubscriptionReattachBuilder(AmqpSession parent, JmsSessionInfo resourceInfo, String linkName) {
             super(parent, resourceInfo);
 
-            this.subscriptionName = subscriptionName;
+            this.linkName = linkName;
         }
 
         @Override
         protected Receiver createEndpoint(JmsSessionInfo resourceInfo) {
-            Receiver receiver = getParent().getEndpoint().receiver(subscriptionName);
+            Receiver receiver = getParent().getEndpoint().receiver(linkName);
             receiver.setTarget(new Target());
             receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
             receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
@@ -121,18 +126,20 @@ public class AmqpConnectionSession extends AmqpSession {
 
     private final class DurableSubscriptionReattachRequest extends WrappedAsyncResult {
 
+        private final String subscriptionName;
         private final DurableSubscriptionReattachBuilder subscriberBuilder;
 
-        public DurableSubscriptionReattachRequest(DurableSubscriptionReattachBuilder subscriberBuilder, AsyncResult originalRequest) {
+        public DurableSubscriptionReattachRequest(String subscriptionName, DurableSubscriptionReattachBuilder subscriberBuilder, AsyncResult originalRequest) {
             super(originalRequest);
+            this.subscriptionName = subscriptionName;
             this.subscriberBuilder = subscriberBuilder;
         }
 
         @Override
         public void onSuccess() {
             DurableSubscriptionReattach subscriber = subscriberBuilder.getResource();
-            LOG.trace("Reattached to subscription: {}", subscriber.getSubscriptionName());
-            pendingUnsubs.remove(subscriber.getSubscriptionName());
+            LOG.trace("Reattached to subscription '{}' using link name '{}'", subscriptionName, subscriber.getLinkName());
+            pendingUnsubs.remove(subscriptionName);
             if (subscriber.getEndpoint().getRemoteSource() != null) {
                 subscriber.close(getWrappedRequest());
             } else {
@@ -144,8 +151,8 @@ public class AmqpConnectionSession extends AmqpSession {
         @Override
         public void onFailure(Throwable result) {
             DurableSubscriptionReattach subscriber = subscriberBuilder.getResource();
-            LOG.trace("Failed to reattach to subscription: {}", subscriber.getSubscriptionName());
-            pendingUnsubs.remove(subscriber.getSubscriptionName());
+            LOG.trace("Failed to reattach to subscription '{}' using link name '{}'", subscriptionName, subscriber.getLinkName());
+            pendingUnsubs.remove(subscriptionName);
             subscriber.resourceClosed();
             super.onFailure(result);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index b60bb83..30e1cef 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -565,6 +565,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     public void postRollback() {
     }
 
+    @Override
+    public void handleResourceClosure(AmqpProvider provider, Exception error) {
+        AmqpConnection connection = session.getConnection();
+        AmqpSubscriptionTracker subTracker = connection.getSubTracker();
+        JmsConsumerInfo consumerInfo = getResourceInfo();
+
+        subTracker.consumerRemoved(consumerInfo);
+    }
+
     //----- Inner classes used in message pull operations --------------------//
 
     protected static final class ScheduledRequest implements AsyncResult {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index ca8a0e1..76dbc90 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -415,8 +415,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
                         @Override
                         public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
-                            AmqpSession session = connection.getSession(sessionInfo.getId());
-                            session.close(request);
+                            final AmqpSession session = connection.getSession(sessionInfo.getId());
+                            session.close(new AsyncResult() {
+                                // TODO: bit of a hack, but works. Similarly below for locally initiated consumer close.
+                                @Override
+                                public void onSuccess() {
+                                    onComplete();
+                                    request.onSuccess();
+                                }
+
+                                @Override
+                                public void onFailure(Throwable result) {
+                                    onComplete();
+                                    request.onFailure(result);
+                                }
+
+                                @Override
+                                public boolean isComplete() {
+                                    return request.isComplete();
+                                }
+
+                                void onComplete() {
+                                    // Mark the sessions resources closed, which in turn calls
+                                    // the subscription cleanup.
+                                    session.handleResourceClosure(AmqpProvider.this, null);
+                                }
+                            });
                         }
 
                         @Override
@@ -427,10 +451,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         }
 
                         @Override
-                        public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+                        public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception {
                             AmqpSession session = connection.getSession(consumerInfo.getParentId());
-                            AmqpConsumer consumer = session.getConsumer(consumerInfo);
-                            consumer.close(request);
+                            final AmqpConsumer consumer = session.getConsumer(consumerInfo);
+                            consumer.close(new AsyncResult() {
+                                // TODO: bit of a hack, but works. Similarly above for locally initiated session close.
+                                @Override
+                                public void onSuccess() {
+                                    onComplete();
+                                    request.onSuccess();
+                                }
+
+                                @Override
+                                public void onFailure(Throwable result) {
+                                    onComplete();
+                                    request.onFailure(result);
+                                }
+
+                                @Override
+                                public boolean isComplete() {
+                                    return request.isComplete();
+                                }
+
+                                void onComplete() {
+                                    connection.getSubTracker().consumerRemoved(consumerInfo);
+                                }
+                            });
                         }
 
                         @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index efe52d6..5f441e9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -241,25 +241,6 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
     }
 
     /**
-     * Query the Session to see if there are any registered consumer instances that have
-     * a durable subscription with the given subscription name.
-     *
-     * @param subscriptionName
-     *        the name of the subscription being searched for.
-     *
-     * @return true if there is a consumer that has the given subscription.
-     */
-    public boolean containsSubscription(String subscriptionName) {
-        for (AmqpConsumer consumer : consumers.values()) {
-            if (subscriptionName.equals(consumer.getResourceInfo().getSubscriptionName())) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
      * Call to send an error that occurs outside of the normal asynchronous processing
      * of a session resource such as a remote close etc.
      *

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java
new file mode 100644
index 0000000..da6af22
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.JMSRuntimeException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+
+/**
+ * Class used to track named subscriptions on a connection to allow verifying
+ * current usage and assigning appropriate link names.
+ */
+public class AmqpSubscriptionTracker {
+
+    Set<String> exclusiveDurableSubs = new HashSet<>();
+    Map<String, SubDetails> sharedDurableSubs = new HashMap<>();
+    Map<String, SubDetails> sharedVolatileSubs = new HashMap<>();
+
+    public String reserveNextSubscriptionLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
+        validateSubscriptionName(subscriptionName);
+
+        if(consumerInfo == null) {
+            throw new IllegalArgumentException("Consumer info must not be null.");
+        }
+
+        if (consumerInfo.isShared()) {
+            if (consumerInfo.isDurable()) {
+                return getSharedDurableSubLinkName(subscriptionName, consumerInfo);
+            } else {
+                return getSharedVolatileSubLinkName(subscriptionName, consumerInfo);
+            }
+        } else if (consumerInfo.isDurable()) {
+            registerExclusiveDurableSub(subscriptionName);
+            return subscriptionName;
+        } else {
+            throw new IllegalStateException("Non-shared non-durable sub link naming is not handled by the tracker.");
+        }
+    }
+
+    private void validateSubscriptionName(String subscriptionName) {
+        if(subscriptionName == null) {
+            throw new IllegalArgumentException("Subscription name must not be null.");
+        }
+
+        if(subscriptionName.isEmpty()) {
+            throw new IllegalArgumentException("Subscription name must not be empty.");
+        }
+
+        if(subscriptionName.contains(SUB_NAME_DELIMITER)) {
+            throw new IllegalArgumentException("Subscription name must not contain '" + SUB_NAME_DELIMITER +"' character.");
+        }
+    }
+
+    private String getSharedDurableSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
+        JmsDestination topic = consumerInfo.getDestination();
+        String selector = consumerInfo.getSelector();
+
+        SubDetails subDetails = null;
+        if(sharedDurableSubs.containsKey(subscriptionName)) {
+            subDetails = sharedDurableSubs.get(subscriptionName);
+
+            if(subDetails.matches(topic, selector)){
+                subDetails.addSubscriber(consumerInfo);
+            } else {
+                throw new JMSRuntimeException("Subscription details dont match existing subscriber.");
+            }
+        } else {
+            subDetails = new SubDetails(topic, selector, consumerInfo);
+        }
+
+        sharedDurableSubs.put(subscriptionName, subDetails);
+
+        int count = subDetails.totalSubscriberCount();
+
+        return getDurableSubscriptionLinkName(subscriptionName, consumerInfo.isExplicitClientID(), count);
+    }
+
+    private String getDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID, int count) {
+        String linkName = getFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
+        if(count > 1) {
+            if(hasClientID) {
+                linkName += SUB_NAME_DELIMITER + count;
+            } else {
+                linkName += count;
+            }
+        }
+
+        return linkName;
+    }
+
+    public String getFirstDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID) {
+        validateSubscriptionName(subscriptionName);
+
+        String receiverLinkName = subscriptionName;
+        if(!hasClientID) {
+            receiverLinkName += SUB_NAME_DELIMITER + "global";
+        }
+
+        return receiverLinkName;
+    }
+
+    private String getSharedVolatileSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
+        JmsDestination topic = consumerInfo.getDestination();
+        String selector = consumerInfo.getSelector();
+
+        SubDetails subDetails = null;
+        if(sharedVolatileSubs.containsKey(subscriptionName)) {
+            subDetails = sharedVolatileSubs.get(subscriptionName);
+
+            if(subDetails.matches(topic, selector)){
+                subDetails.addSubscriber(consumerInfo);
+            } else {
+                throw new JMSRuntimeException("Subscription details dont match existing subscriber");
+            }
+        } else {
+            subDetails = new SubDetails(topic, selector, consumerInfo);
+        }
+
+        sharedVolatileSubs.put(subscriptionName, subDetails);
+
+        String receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
+        int count = subDetails.totalSubscriberCount();
+
+        if (consumerInfo.isExplicitClientID()) {
+            receiverLinkName += "volatile" + count;
+        } else {
+            receiverLinkName += "global-volatile" + count;
+        }
+
+        return receiverLinkName;
+    }
+
+    private void registerExclusiveDurableSub(String subscriptionName) {
+        exclusiveDurableSubs.add(subscriptionName);
+    }
+
+    /**
+     * Checks if there is an exclusive durable subscription already
+     * recorded as active with the given subscription name.
+     *
+     * @param subscriptionName name of subscription to check
+     * @return true if there is an exclusive durable sub with this name already active
+     */
+    public boolean isActiveExclusiveDurableSub(String subscriptionName) {
+        return exclusiveDurableSubs.contains(subscriptionName);
+    }
+
+    /**
+     * Checks if there is a shared durable subscription already
+     * recorded as active with the given subscription name.
+     *
+     * @param subscriptionName name of subscription to check
+     * @return true if there is a shared durable sub with this name already active
+     */
+    public boolean isActiveSharedDurableSub(String subscriptionName) {
+        return sharedDurableSubs.containsKey(subscriptionName);
+    }
+
+    /**
+     * Checks if there is either a shared or exclusive durable subscription
+     * already recorded as active with the given subscription name.
+     *
+     * @param subscriptionName name of subscription to check
+     * @return true if there is a durable sub with this name already active
+     */
+    public boolean isActiveDurableSub(String subscriptionName) {
+        return isActiveExclusiveDurableSub(subscriptionName) || isActiveSharedDurableSub(subscriptionName);
+    }
+
+    /**
+     * Checks if there is an shared volatile subscription already
+     * recorded as active with the given subscription name.
+     *
+     * @param subscriptionName name of subscription to check
+     * @return true if there is a shared volatile sub with this name already active
+     */
+    public boolean isActiveSharedVolatileSub(String subscriptionName) {
+        return sharedVolatileSubs.containsKey(subscriptionName);
+    }
+
+    public void consumerRemoved(JmsConsumerInfo consumerInfo) {
+        String subscriptionName = consumerInfo.getSubscriptionName();
+
+        if (subscriptionName != null && !subscriptionName.isEmpty()) {
+            if (consumerInfo.isShared()) {
+                if (consumerInfo.isDurable()) {
+                    if(sharedDurableSubs.containsKey(subscriptionName)) {
+                        SubDetails subDetails = sharedDurableSubs.get(subscriptionName);
+                        subDetails.removeSubscriber(consumerInfo);
+
+                        int count = subDetails.activeSubscribers();
+                        if(count < 1) {
+                            sharedDurableSubs.remove(subscriptionName);
+                        }
+                    }
+                } else {
+                    if(sharedVolatileSubs.containsKey(subscriptionName)) {
+                        SubDetails subDetails = sharedVolatileSubs.get(subscriptionName);
+                        subDetails.removeSubscriber(consumerInfo);
+
+                        int count = subDetails.activeSubscribers();
+                        if(count < 1) {
+                            sharedVolatileSubs.remove(subscriptionName);
+                        }
+                    }
+                }
+            } else if (consumerInfo.isDurable()) {
+                exclusiveDurableSubs.remove(subscriptionName);
+            }
+        }
+    }
+
+    private static class SubDetails {
+        private JmsDestination topic = null;
+        private String selector = null;
+        private Set<JmsConsumerInfo> subscribers = new HashSet<>();
+        private int totalSubscriberCount;
+
+        public SubDetails(JmsDestination topic, String selector, JmsConsumerInfo info) {
+            if(topic == null) {
+                throw new IllegalArgumentException("Topic destination must not be null");
+            }
+
+            this.topic = topic;
+            this.selector = selector;
+            addSubscriber(info);
+        }
+
+        public void addSubscriber(JmsConsumerInfo info) {
+            if(info == null) {
+                throw new IllegalArgumentException("Consumer info must not be null");
+            }
+
+            totalSubscriberCount++;
+            subscribers.add(info);
+        }
+
+        public void removeSubscriber(JmsConsumerInfo info) {
+            subscribers.remove(info);
+        }
+
+        public int activeSubscribers() {
+            return subscribers.size();
+        }
+
+        public int totalSubscriberCount() {
+            return totalSubscriberCount;
+        }
+
+        public boolean matches(JmsDestination newTopic, String newSelector) {
+            if(!topic.equals(newTopic)) {
+                return false;
+            }
+
+            if (selector == null) {
+                return newSelector == null;
+            } else {
+                return selector.equals(newSelector);
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 9738d68..adac112 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -44,6 +44,7 @@ public class AmqpSupport {
     public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
     public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
     public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
+    public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
 
     // Symbols used to announce connection error information
     public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
@@ -69,6 +70,8 @@ public class AmqpSupport {
     public static final Symbol COPY = Symbol.getSymbol("copy");
     public static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
     public static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+    public static final Symbol SHARED = Symbol.valueOf("shared");
+    public static final Symbol GLOBAL = Symbol.valueOf("global");
 
     // Delivery states
     public static final Rejected REJECTED = new Rejected();
@@ -80,6 +83,9 @@ public class AmqpSupport {
     public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:";
     public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:";
 
+    // Subscription Name Delimiter
+    public static final String SUB_NAME_DELIMITER = "|";
+
     //----- Static initializer -----------------------------------------------//
 
     static {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
index 3bf8836..5cec9af 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
@@ -22,14 +22,19 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_SELECTOR_SYMBOL;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 
 import javax.jms.InvalidDestinationException;
+import javax.jms.JMSRuntimeException;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
 import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsNoLocalType;
 import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsSelectorType;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
@@ -59,22 +64,51 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
     @Override
     protected Receiver createEndpoint(JmsConsumerInfo resourceInfo) {
         JmsDestination destination = resourceInfo.getDestination();
-        String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
+        String address = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
 
         Source source = new Source();
-        source.setAddress(subscription);
+        source.setAddress(address);
         Target target = new Target();
 
         configureSource(source);
 
-        String receiverName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + subscription;
-        if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) {
-            // In the case of Durable Topic Subscriptions the client must use the same
-            // receiver name which is derived from the subscription name property.
-            receiverName = resourceInfo.getSubscriptionName();
+        String receiverLinkName = null;
+        String subscriptionName = resourceInfo.getSubscriptionName();
+        if (subscriptionName != null && !subscriptionName.isEmpty()) {
+            AmqpConnection connection = getParent().getConnection();
+
+            if (resourceInfo.isShared() && !connection.getProperties().isSharedSubsSupported()) {
+                // Don't allow shared sub if peer hasn't said it can handle them (or we haven't overridden it).
+                throw new JMSRuntimeException("Remote peer does not support shared subscriptions");
+            }
+
+            AmqpSubscriptionTracker subTracker = connection.getSubTracker();
+
+            // Validate subscriber type allowed given existing active subscriber types.
+            if (resourceInfo.isShared() && resourceInfo.isDurable()) {
+                if(subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
+                    // Don't allow shared sub if there is already an active exclusive durable sub
+                    throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+                }
+            } else if (!resourceInfo.isShared() && resourceInfo.isDurable()) {
+                if (subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
+                    // Exclusive durable sub is already active
+                    throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+                } else if (subTracker.isActiveSharedDurableSub(subscriptionName)) {
+                    // Don't allow exclusive durable sub if there is already an active shared durable sub
+                    throw new JMSRuntimeException("A shared durable subscription is already active with name '" + subscriptionName + "'");
+                }
+            }
+
+            // Get the link name for the subscription. Throws if certain further validations fail.
+            receiverLinkName = subTracker.reserveNextSubscriptionLinkName(subscriptionName, resourceInfo);
         }
 
-        Receiver receiver = getParent().getEndpoint().receiver(receiverName);
+        if(receiverLinkName == null) {
+            receiverLinkName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + address;
+        }
+
+        Receiver receiver = getParent().getEndpoint().receiver(receiverLinkName);
         receiver.setSource(source);
         receiver.setTarget(target);
         if (resourceInfo.isBrowser() || resourceInfo.isPresettle()) {
@@ -88,6 +122,15 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
     }
 
     @Override
+    protected void afterClosed(AmqpConsumer resource, JmsConsumerInfo info) {
+        // If the resource being built is closed during the creation process
+        // then this is a failure, we need to ensure we don't track it.
+        AmqpConnection connection = getParent().getConnection();
+        AmqpSubscriptionTracker subTracker = connection.getSubTracker();
+        subTracker.consumerRemoved(info);
+    }
+
+    @Override
     protected AmqpConsumer createResource(AmqpSession parent, JmsConsumerInfo resourceInfo, Receiver endpoint) {
         return new AmqpConsumer(parent, resourceInfo, endpoint);
     }
@@ -118,7 +161,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
         Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
                                           Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL };
 
-        if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) {
+        if (resourceInfo.isDurable()) {
             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
             source.setDurable(TerminusDurability.UNSETTLED_STATE);
             source.setDistributionMode(COPY);
@@ -131,14 +174,32 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
             source.setDistributionMode(COPY);
         }
 
+        // Capabilities
+        LinkedList<Symbol> capabilities = new LinkedList<>();
+
         Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(resourceInfo.getDestination());
-        if(typeCapability != null) {
-            source.setCapabilities(typeCapability);
+        if(typeCapability != null){
+            capabilities.add(typeCapability);
+        }
+
+        if(resourceInfo.isShared()) {
+            capabilities.add(AmqpSupport.SHARED);
+        }
+
+        if(!resourceInfo.isExplicitClientID()) {
+            capabilities.add(AmqpSupport.GLOBAL);
+        }
+
+        if(!capabilities.isEmpty()) {
+            Symbol[] capArray = capabilities.toArray(new Symbol[capabilities.size()]);
+            source.setCapabilities(capArray);
         }
 
+        //Outcomes
         source.setOutcomes(outcomes);
         source.setDefaultOutcome(MODIFIED_FAILED);
 
+        // Filters
         if (resourceInfo.isNoLocal()) {
             filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 5912f7f..a1a9aac 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -164,6 +164,9 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
         // If the resource being built is closed during the creation process
         // then this is always an error.
 
+        // Perform any post processing relating to closure during creation attempt
+        afterClosed(getResource(), getResourceInfo());
+
         Throwable openError;
         if (hasRemoteError()) {
             openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
@@ -245,6 +248,18 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
         // Nothing to do here.
     }
 
+    /**
+     * Called if endpoint opening process fails in order to give the subclasses a
+     * place to perform any follow-on processing or teardown steps before the operation
+     * is deemed to have been completed and failure is signalled.
+     *
+     * @param resource the resource
+     * @param resourceInfo the resourceInfo
+     */
+    protected void afterClosed(TARGET resource, INFO resourceInfo) {
+        // Nothing to do here.
+    }
+
     protected boolean hasRemoteError() {
         return getEndpoint().getRemoteCondition().getCondition() != null;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
index be3e668..27fc511 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -35,7 +35,10 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
+import org.apache.qpid.jms.JmsConnectionTestSupport;
+import org.apache.qpid.jms.JmsMessageConsumer;
 import org.apache.qpid.jms.JmsSession;
 import org.apache.qpid.jms.JmsTemporaryQueue;
 import org.junit.Before;
@@ -332,41 +335,53 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
         } catch (RuntimeException re) {}
     }
 
-    //----- Not yet implemented, should all be cleared on implementation -----//
-
     @Test(timeout = 10000)
     public void testCreateSharedConsumer() throws Exception {
         JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        try {
-            session.createSharedConsumer(session.createTopic("test"), "subscription");
-            fail("Should fail until implemented.");
-        } catch (JMSException ex) {}
+
+        Topic topic = session.createTopic("test");
+        JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedConsumer(topic, "subscription");
+
+        assertNotNull(consumer);
+        assertNull("unexpected selector", consumer.getMessageSelector());
+        assertEquals("unexpected topic", topic, consumer.getDestination());
     }
 
     @Test(timeout = 10000)
     public void testCreateSharedConsumerWithSelector() throws Exception {
         JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        try {
-            session.createSharedConsumer(session.createTopic("test"), "subscription", "a = b");
-            fail("Should fail until implemented.");
-        } catch (JMSException ex) {}
+
+        String selector = "a = b";
+        Topic topic = session.createTopic("test");
+        JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedConsumer(topic, "subscription", selector);
+
+        assertNotNull(consumer);
+        assertEquals("unexpected selector", selector, consumer.getMessageSelector());
+        assertEquals("unexpected topic", topic, consumer.getDestination());
     }
 
     @Test(timeout = 10000)
     public void testCreateSharedDurableConsumer() throws Exception {
         JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        try {
-            session.createSharedDurableConsumer(session.createTopic("test"), "subscription");
-            fail("Should fail until implemented.");
-        } catch (JMSException ex) {}
+
+        Topic topic = session.createTopic("test");
+        JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedDurableConsumer(topic, "subscription");
+
+        assertNotNull(consumer);
+        assertNull("unexpected selector", consumer.getMessageSelector());
+        assertEquals("unexpected topic", topic, consumer.getDestination());
     }
 
     @Test(timeout = 10000)
     public void testCreateSharedDurableConsumerWithSelector() throws Exception {
         JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        try {
-            session.createSharedDurableConsumer(session.createTopic("test"), "subscription", "a = b");
-            fail("Should fail until implemented.");
-        } catch (JMSException ex) {}
+
+        String selector = "a = b";
+        Topic topic = session.createTopic("test");
+        JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedDurableConsumer(topic, "subscription", selector);
+
+        assertNotNull(consumer);
+        assertEquals("unexpected selector", selector, consumer.getMessageSelector());
+        assertEquals("unexpected topic", topic, consumer.getDestination());
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
index 7c33fc1..2ddf739 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
@@ -54,6 +54,10 @@ public class IntegrationTestFixture {
         return establishConnecton(testPeer, false, optionsString, serverCapabilities, serverProperties, true);
     }
 
+    Connection establishConnectonWithoutClientID(TestAmqpPeer testPeer, Symbol[] serverCapabilities) throws JMSException {
+        return establishConnecton(testPeer, false, null, serverCapabilities, null, false);
+    }
+
     Connection establishConnecton(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId) throws JMSException {
         Symbol[] desiredCapabilities = new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-jms git commit: QPIDJMS-220, QPIDJMS-207: initial work on support for shared topic subscriptions

Posted by ro...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
new file mode 100644
index 0000000..3e08b56
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
@@ -0,0 +1,1480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    // -------------------------------------- //
+
+    @Test
+    public void testConstants() throws Exception {
+        assertEquals(Symbol.valueOf("SHARED-SUBS"), AmqpSupport.SHARED_SUBS);
+        assertEquals("|", AmqpSupport.SUB_NAME_DELIMITER);
+        assertEquals(Symbol.valueOf("shared"), AmqpSupport.SHARED);
+        assertEquals(Symbol.valueOf("global"), AmqpSupport.GLOBAL);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber detaches links with closed = false.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCloseSharedDurableTopicSubscriberDetachesWithCloseFalse() throws Exception {
+        doSharedTopicSubscriberDetachTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber detaches links with closed = true.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCloseSharedVolatileTopicSubscriberDetachesWithCloseTrue() throws Exception {
+        doSharedTopicSubscriberDetachTestImpl(false);
+    }
+
+    private void doSharedTopicSubscriberDetachTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber;
+            if (durable) {
+                subscriber = session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                subscriber = session.createSharedConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectDetach(!durable, true, !durable);
+            subscriber.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscribers name their links such that the first link is the
+     * bare subscription name, and subsquent links use a counter suffix to ensure they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubscriberLinkNames() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscribers names their links such that the subscription name
+     * is suffixed with a counter to ensure they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffix() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, true);
+    }
+
+    /**
+     * Verifies that on a connection without a ClientID, shared durable subscribers name their links
+     * such that the first link is the subscription name with 'global' suffix, and subsequent links
+     * additionally append a counter suffix to ensure they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubscriberLinkNamesNoClientID() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, false);
+    }
+
+    /**
+     * Verifies that on a connection without a ClientID, shared volatile subscribers names their links
+     * such that the subscription name is suffixed with a 'global' qualifier an counter to ensure
+     * they are unique.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffixNoClientID() throws Exception {
+        doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, false);
+    }
+
+    private void doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable, boolean useClientID) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection;
+            if(useClientID) {
+                connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            } else {
+                connection = testFixture.establishConnectonWithoutClientID(testPeer, serverCapabilities);
+            }
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver
+            if (durable) {
+                String linkName = subscriptionName;
+                if(!useClientID) {
+                    linkName += SUB_NAME_DELIMITER + "global";
+                }
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            } else {
+                String linkName;
+                if(useClientID) {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile1";
+                } else {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile1";
+                }
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName);
+            }
+
+            // Attach the second shared receiver, expect the link name to differ
+            if (durable) {
+                String linkName = subscriptionName;
+                if(useClientID) {
+                    linkName += SUB_NAME_DELIMITER + "2";
+                } else {
+                    linkName += SUB_NAME_DELIMITER + "global2";
+                }
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            } else {
+                String linkName;
+                if(useClientID) {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile2";
+                } else {
+                    linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile2";
+                }
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscribers names their links on a per-connection basis, such that
+     * suffix counter on one connection is not impacted by subscriptions on another connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubsriberLinkNamesMultipleConnectionSubs() throws Exception {
+        doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscribers names their links on a per-connection basis, such that
+     * suffix counter on one connection is not impacted by subscriptions on another connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleConnectionSubs() throws Exception {
+        doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
+    }
+
+    private void doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer peer1 = new TestAmqpPeer();
+             TestAmqpPeer peer2 = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish first connection
+            Connection connection1 = testFixture.establishConnecton(peer1, serverCapabilities);
+            connection1.start();
+
+            peer1.expectBegin();
+            Session sessionConn1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Establish second connection
+            Connection connection2 = testFixture.establishConnecton(peer2, serverCapabilities);
+            connection2.start();
+
+            peer2.expectBegin();
+            Session sessionConn2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = sessionConn1.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            Matcher<?> durSubLinkNameMatcher = equalTo(subscriptionName);
+            Matcher<?> volatileSubLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+
+            // Attach the first connections shared receiver
+            if (durable) {
+                peer1.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true);
+            } else {
+                peer1.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true);
+            }
+            peer1.expectLinkFlow();
+
+            if (durable) {
+                sessionConn1.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                sessionConn1.createSharedConsumer(dest, subscriptionName);
+            }
+
+            // Attach the second connections shared receiver, expect the link name to be the same since its per-connection
+            if (durable) {
+                peer2.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true);
+            } else {
+                peer2.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true);
+            }
+            peer2.expectLinkFlow();
+
+            if (durable) {
+                sessionConn2.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                sessionConn2.createSharedConsumer(dest, subscriptionName);
+            }
+
+            peer1.expectClose();
+            connection1.close();
+
+            peer2.expectClose();
+            connection2.close();
+
+            peer1.waitForAllHandlersToComplete(1000);
+            peer2.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscribers names their links on a per-connection basis, such that
+     * suffix counter for one session is impacted by subscriptions on another session on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubsriberLinkNamesMultipleSessionSubs() throws Exception {
+        doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscribers names their links on a per-connection basis, such that
+     * suffix counter for one session is impacted by subscriptions on another session on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleSessionSubs() throws Exception {
+        doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
+    }
+
+    private void doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the first sessions shared receiver
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session1.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session1.createSharedConsumer(dest, subscriptionName);
+            }
+
+            // Attach the second sessions shared receiver, expect the link name to be different since its per-connection
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile2");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session2.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                session2.createSharedConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that shared durable subscribers name their links on a per-subscription name basis, such that
+     * suffix counter for one subscription nameis not impacted by those for another subscription on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubsriberLinkNamesMultipleNamedSubs() throws Exception {
+        doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
+    }
+
+    /**
+     * Verifies that shared volatile subscribers name their links on a per-subscription name basis, such that
+     * suffix counter for one subscription name is not impacted by those for another subscription on the same connection.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleNamedSubs() throws Exception {
+        doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
+    }
+
+    private void doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName1 = "mySubscription1";
+            String subscriptionName2 = "mySubscription2";
+
+            // Attach the first subscriptions shared receivers
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName1);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "2");
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile2");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName1);
+                session.createSharedDurableConsumer(dest, subscriptionName1);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName1);
+                session.createSharedConsumer(dest, subscriptionName1);
+            }
+
+            // Attach the first subscriptions shared receivers
+            if (durable) {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName2);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "2");
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+            } else {
+                Matcher<?> linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+                testPeer.expectLinkFlow();
+                linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile2");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            if (durable) {
+                session.createSharedDurableConsumer(dest, subscriptionName2);
+                session.createSharedDurableConsumer(dest, subscriptionName2);
+            } else {
+                session.createSharedConsumer(dest, subscriptionName2);
+                session.createSharedConsumer(dest, subscriptionName2);
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared volatile subscriber and shared durable subscriber with the same subscription name
+     * can be active on the same connection at the same time and names their links appropriately to distinguish
+     * themselves from each other.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableAndVolatileSubsCoexistUsingDistinctLinkNames() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the durable shared receiver
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            session.createSharedDurableConsumer(dest, subscriptionName);
+
+            // Attach the volatile shared receiver
+            Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+            testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            session.createSharedConsumer(dest, subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+     * attempt to create a shared durable subscriber fails.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedDurableTopicSubscriberFailsIfNotSupported() throws Exception {
+        doSharedTopicSubscriberSupportedTestImpl(true);
+    }
+
+    /**
+     * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+     * attempt to create a shared volatile subscriber fails.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupported() throws Exception {
+        doSharedTopicSubscriberSupportedTestImpl(true);
+    }
+
+    private void doSharedTopicSubscriberSupportedTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // DONT include server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            testPeer.expectClose();
+
+            try {
+                if (durable) {
+                    session.createSharedDurableConsumer(dest, subscriptionName);
+                } else {
+                    session.createSharedConsumer(dest, subscriptionName);
+                }
+
+                fail("Expected an exception to be thrown");
+            } catch (JMSException jmse) {
+                // expected
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name
+     * can't be active on the same connection at the same time (shared first, then exclusive).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedAndExclusiveDurableSubsCantCoexistSharedFirst() throws Exception {
+        doSharedAndExclusiveDurableSubsCantCoexistTestImpl(true);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name
+     * can't be active on the same connection at the same time (exclusive first, then shared).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedAndExclusiveDurableSubsCantCoexistExclusiveFirst() throws Exception {
+        doSharedAndExclusiveDurableSubsCantCoexistTestImpl(false);
+    }
+
+    private void doSharedAndExclusiveDurableSubsCantCoexistTestImpl(boolean sharedFirst) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            if(sharedFirst) {
+                // Attach the durable shared receiver
+                Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+                testPeer.expectLinkFlow();
+
+                session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                // Attach the durable exclusive receiver
+                testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+                testPeer.expectLinkFlow();
+
+                session.createDurableConsumer(dest, subscriptionName);
+            }
+
+            testPeer.expectClose();
+
+            try {
+                if (sharedFirst) {
+                    // Now try to attach a durable exclusive receiver
+                    session.createDurableConsumer(dest, subscriptionName);
+                } else {
+                    // Now try to attach a durable shared receiver
+                    session.createSharedDurableConsumer(dest, subscriptionName);
+                }
+                fail("Expected to fail due to concurrent shared + non-shared durable sub attempt");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    @Test(timeout = 20000)
+    public void testExclusiveDurableSubCanOnlyBeActiveOnceAtATime() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the durable exclusive receiver
+            testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session.createDurableConsumer(dest, subscriptionName);
+
+            try {
+                // Now try to attach a second active durable exclusive receiver
+                session.createDurableConsumer(dest, subscriptionName);
+                fail("Expected to fail due to concurrent active subscriber attempt");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            // Now try to attach a new active durable exclusive receiver
+            testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2 = session.createDurableConsumer(dest, subscriptionName);
+            assertNotNull(subscriber2);
+
+            testPeer.expectClose();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a upon failure to locate a subscription link during an
+     * unsubscribe attempt an [InvalidDestination] exception is thrown.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeNonExistingSubscription() throws Exception {
+        doUnsubscribeNonExistingSubscriptionTestImpl(true);
+    }
+
+    /**
+     * Verifies that a upon failure to locate a subscription link during an
+     * unsubscribe attempt an [InvalidDestination] exception is thrown, but
+     * this time using a connection without a ClientID set (and thus
+     * expecting a different link name to be used).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeNonExistingSubscriptionWithoutClientID() throws Exception {
+        doUnsubscribeNonExistingSubscriptionTestImpl(false);
+    }
+
+    private void doUnsubscribeNonExistingSubscriptionTestImpl(boolean hasClientID) throws JMSException, InterruptedException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection;
+            if(hasClientID) {
+                connection = testFixture.establishConnecton(testPeer);
+            } else {
+                connection = testFixture.establishConnectonWithoutClientID(testPeer, null);
+            }
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String subscriptionName = "myInvalidSubscriptionName";
+            // Try to unsubscribe non-existing subscription
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(true, false, subscriptionName, null, hasClientID);
+            testPeer.expectDetach(true, true, true);
+
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a InvalidDestinationException");
+            } catch (InvalidDestinationException ide) {
+                // Expected
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /**
+     * Verifies that a upon attempt to unsubscribe a subscription that is
+     * currently in active use by a subscriber, an exception is thrown,
+     * and that once the subscriber is closed a further unsubscribe attempt
+     * is successfully undertaken.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeExclusiveDurableSubWhileActiveThenInactive() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the durable exclusive receiver
+            testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
+            testPeer.expectLinkFlow();
+
+            TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
+            assertNotNull("TopicSubscriber object was null", subscriber);
+
+            // Now try to unsubscribe, should fail
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+            }
+
+            // Now close the subscriber
+            testPeer.expectDetach(false, true, false);
+
+            subscriber.close();
+
+            // Try to unsubscribe again, should work now
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session.unsubscribe(subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /**
+     * Verifies that a upon attempt to unsubscribe a shared durable subscription that is
+     * currently in active use by multiple subscribers, an exception is thrown, and that
+     * only once the last subscriber is closed is an unsubscribe attempt successful.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeSharedDurableSubWhileActiveThenInactive() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Attach the first durable shared receiver
+            Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session.createSharedDurableConsumer(dest, subscriptionName);
+
+            // Attach the second durable shared receiver
+            linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2 = session.createSharedDurableConsumer(dest, subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to unsubscribe, should fail
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again, should still fail
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now close the second subscriber
+            testPeer.expectDetach(false, true, false);
+
+            subscriber2.close();
+
+            // Try to unsubscribe again, should now work
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session.unsubscribe(subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a upon a failed attempt to create a shared subscriber that it
+     * is possible to unsubscribe (not that it should be required), i.e that the
+     * failed creation does not leave an incorrect recording of an active subscriber.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testUnsubscribeAfterFailedCreation() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Fail to attach a shared durable receiver
+            Matcher<?> linkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, true);
+            testPeer.expectDetach(true, false, true);
+
+            try {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Try to unsubscribe, should be able to (strictly speaking an unsub attempt
+            // would probably fail normally, due to no subscription, but this test
+            // doesn't care about that, just that the attempt proceeds, so overlook that.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session.unsubscribe(subscriptionName);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /**
+     * Verifies that subscriber cleanup occurs when the subscriber is remotely closed (after creation).
+     */
+    @Test(timeout = 20000)
+    public void testRemotelyDetachLinkWithDurableSharedConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            final CountDownLatch subscriberClosed = new CountDownLatch(1);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConsumerClosed(MessageConsumer consumer, Exception exception) {
+                    subscriberClosed.countDown();
+                }
+            });
+
+            // Create first session
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver on the first session
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest,  subscriptionName);
+
+            // Create second session
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Attach the second shared receiver on the second session
+            durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+
+            testPeer.expectLinkFlow();
+
+            // Then remotely detach the link after the flow is received.
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, false, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteDetach");
+
+            MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest,  subscriptionName);
+            assertNotNull(subscriber2);
+
+            assertTrue("Consumer closed callback didn't trigger", subscriberClosed.await(5, TimeUnit.SECONDS));
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to unsubscribe (using first session). It should fail due to sub still
+            // being in use on the first session. No frames should be sent.
+            try {
+                session1.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again (using first session), should now work.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session1.unsubscribe(subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that a shared durable subscriber creation using the same subscription
+     * name but different topic than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubChangeOfTopic() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(true, true, false);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber creation using the same subscription
+     * name but different topic than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubChangeOfTopic() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(false, true, false);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber creation using the same subscription
+     * name but different selector than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubChangeOfSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(true, false, true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber creation using the same subscription
+     * name but different selector than an existing subscriber can't be active on the
+     * same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubChangeOfSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(false, false, true);
+    }
+
+    /**
+     * Verifies that a shared durable subscriber creation using the same subscription
+     * name but different topic and selector than an existing subscriber can't be active
+     * on the same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedDurableSubChangeOfTopicAndSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(true, true, true);
+    }
+
+    /**
+     * Verifies that a shared volatile subscriber creation using the same subscription
+     * name but different topic and selector than an existing subscriber can't be active
+     * on the same connection at the same time fails (can't change details in use).
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testSharedVolatileSubChangeOfTopicAndSelector() throws Exception {
+        doSharedSubChangeOfDetailsTestImpl(false, true, true);
+    }
+
+    private void doSharedSubChangeOfDetailsTestImpl(boolean durable, boolean changeTopic, boolean changeSelector) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName1 = "myTopic";
+            String topicName2 = topicName1;
+            if (changeTopic) {
+                topicName2 += "2";
+            }
+
+            String noSelectorNull = null;
+            String selector2 = null;
+            if (changeSelector) {
+                selector2 = "someProperty = 2";
+            }
+
+            Topic dest1 = session.createTopic(topicName1);
+            Topic dest2 = session.createTopic(topicName2);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver
+            if (durable) {
+                Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName1, subscriptionName, durableLinkNameMatcher, true);
+            } else {
+                Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName1, subscriptionName, volatileLinkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1;
+            if (durable) {
+                subscriber1 = session.createSharedDurableConsumer(dest1, subscriptionName, noSelectorNull);
+            } else {
+                subscriber1 = session.createSharedConsumer(dest1, subscriptionName, noSelectorNull);
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to create another subscriber, same name but different topic. Should fail. No frames should be sent.
+            try {
+                if (durable) {
+                    session.createSharedDurableConsumer(dest2, subscriptionName, selector2);
+                } else {
+                    session.createSharedConsumer(dest2, subscriptionName, selector2);
+                }
+
+                fail("Expected to fail due to attempting change of subscription details while subscriber is active");
+            } catch (JMSException jmse) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            if (durable) {
+                testPeer.expectDetach(false, true, false);
+            } else {
+                testPeer.expectDetach(true, true, true);
+            }
+            subscriber1.close();
+
+            // Now try a new subscriber again, with changed topic, it should succeed (note also the verified reuse of link names).
+            if (durable) {
+                Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedDurableSubscriberAttach(topicName2, subscriptionName, durableLinkNameMatcher, true);
+            } else {
+                Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedVolatileSubscriberAttach(topicName2, subscriptionName, volatileLinkNameMatcher, true);
+            }
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2;
+            if (durable) {
+                subscriber2 = session.createSharedDurableConsumer(dest2, subscriptionName, selector2);
+            } else {
+                subscriber2 = session.createSharedConsumer(dest2, subscriptionName, selector2);
+            }
+            assertNotNull(subscriber2);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that subscriber cleanup occurs when the session it is on is remotely closed.
+     */
+    @Test(timeout = 20000)
+    public void testRemotelyEndSessionWithDurableSharedConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Exception exception) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            // Create first session
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver on the first session
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest,  subscriptionName);
+
+            // Create second session
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Attach the second shared receiver on the second session
+            durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+
+            testPeer.expectLinkFlow();
+
+            // Then remotely end the session (and thus the subscriber along with it) after the flow is received.
+            testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteClosure");
+
+            MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest,  subscriptionName);
+            assertNotNull(subscriber2);
+
+            assertTrue("Session closed callback didn't trigger", sessionClosed.await(5, TimeUnit.SECONDS));
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now try to unsubscribe (using first session, still open). It should fail due to sub still
+            // being in use on the first session. No frames should be sent.
+            try {
+                session1.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again (using first session, still open), should now work.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session1.unsubscribe(subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    /**
+     * Verifies that subscriber cleanup occurs when the session it is on is locally closed.
+     */
+    @Test(timeout = 20000)
+    public void testLocallyEndSessionWithSharedConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Exception exception) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            // Create first session
+            testPeer.expectBegin();
+            Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session1.createTopic(topicName);
+
+            String subscriptionName = "mySubscription";
+
+            // Attach the first shared receiver on the first session
+            Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest,  subscriptionName);
+
+            // Create second session
+            testPeer.expectBegin();
+            Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Attach the second shared receiver on the second session
+            durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
+            testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
+            testPeer.expectLinkFlow();
+
+            MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest,  subscriptionName);
+            assertNotNull(subscriber2);
+
+            // Now close the second session (and thus the subscriber along with it).
+            testPeer.expectEnd();
+            session2.close();
+
+            // Now try to unsubscribe (using first session, still open). It should fail due to sub still
+            // being in use on the first session. No frames should be sent.
+            try {
+                session1.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            // Now close the first subscriber
+            testPeer.expectDetach(false, true, false);
+            subscriber1.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Try to unsubscribe again (using first session, still open), should now work.
+            testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
+            testPeer.expectDetach(true, true, true);
+
+            session1.unsubscribe(subscriptionName);
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    // -------------------------------------- //
+
+    /**
+     * Verifies that subscription name passed is not allowed to have the subscription name
+     * delimiter used in the receiver link naming to separate the subscription name from
+     * a suffix used to ensure unique link names are used on a connection.
+     */
+    @Test(timeout = 20000)
+    public void testSubscriptionNameNotAllowedToHaveNameSeparator() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Add server connection capability to indicate support for shared-subs
+            Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
+
+            // Establish connection
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+
+            // Create session
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Try the various methods that take subscription name with a name that
+            // contains the delimiter, they should fail. No frames should be sent.
+            String subscriptionName = "thisName|hasTheDelimiterAlready";
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            try {
+                session.createDurableSubscriber(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.createDurableConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.createSharedDurableConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.createSharedConsumer(dest, subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            try {
+                session.unsubscribe(subscriptionName);
+                fail("Should have thrown a JMSException");
+            } catch (JMSException ex) {
+                // Expected
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now close connection, should work.
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
index 0511840..7ac99f8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
@@ -68,7 +68,7 @@ public class JmsConnectionInfoTest {
         JmsConnectionInfo info = new JmsConnectionInfo(firstId);
 
         info.setForceSyncSend(true);
-        info.setClientId("test");
+        info.setClientId("test", true);
         info.setCloseTimeout(100);
         info.setConnectTimeout(200);
         info.setForceAsyncSends(true);
@@ -89,6 +89,7 @@ public class JmsConnectionInfoTest {
 
         assertEquals(true, copy.isForceSyncSend());
         assertEquals("test", copy.getClientId());
+        assertEquals(true, copy.isExplicitClientID());
         assertEquals(100, copy.getCloseTimeout());
         assertEquals(200, copy.getConnectTimeout());
         assertEquals(true, copy.isForceAsyncSend());
@@ -161,6 +162,14 @@ public class JmsConnectionInfoTest {
     }
 
     @Test
+    public void testIsExplicitClientId() {
+        final JmsConnectionInfo info = new JmsConnectionInfo(firstId);
+        assertFalse(info.isExplicitClientID());
+        info.setClientId("something", true);
+        assertTrue(info.isExplicitClientID());
+    }
+
+    @Test
     public void testGetEncodedUsername() {
         final JmsConnectionInfo info = new JmsConnectionInfo(firstId);
         info.setUsername("user");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
index 9bca41e..d4fdcc5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
@@ -85,12 +86,14 @@ public class JmsConsumerInfoTest {
 
         info.setAcknowledgementMode(1);
         info.setBrowser(true);
-        info.setClientId("test");
+        info.setExplicitClientID(true);
         info.setDestination(new JmsTopic("Test"));
         info.setLastDeliveredSequenceId(42);
         info.setNoLocal(true);
         info.setPrefetchSize(123456);
         info.setSelector("select");
+        info.setDurable(true);
+        info.setShared(true);
         info.setSubscriptionName("name");
         info.setRedeliveryPolicy(new JmsDefaultRedeliveryPolicy());
         info.setListener(true);
@@ -99,10 +102,12 @@ public class JmsConsumerInfoTest {
 
         assertEquals(1, copy.getAcknowledgementMode());
         assertEquals(true, copy.isBrowser());
-        assertEquals("test", copy.getClientId());
+        assertEquals(true, copy.isExplicitClientID());
         assertEquals(new JmsTopic("Test"), copy.getDestination());
         assertEquals(42, copy.getLastDeliveredSequenceId());
         assertEquals(true, copy.isNoLocal());
+        assertEquals(true, copy.isDurable());
+        assertEquals(true, copy.isShared());
         assertEquals(123456, copy.getPrefetchSize());
         assertEquals("select", copy.getSelector());
         assertEquals("name", copy.getSubscriptionName());
@@ -116,11 +121,37 @@ public class JmsConsumerInfoTest {
     public void testIsDurable() {
         JmsConsumerInfo info = new JmsConsumerInfo(firstId);
         assertFalse(info.isDurable());
-        info.setSubscriptionName("name");
+        info.setDurable(true);
         assertTrue(info.isDurable());
     }
 
     @Test
+    public void testIsExplicitClientID() {
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        assertFalse(info.isExplicitClientID());
+        info.setExplicitClientID(true);
+        assertTrue(info.isExplicitClientID());
+    }
+
+    @Test
+    public void testIsShared() {
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        assertFalse(info.isShared());
+        info.setShared(true);
+        assertTrue(info.isShared());
+    }
+
+    @Test
+    public void testGetSubscriptionName() {
+        String subName = "name";
+
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+        assertNull(info.getSubscriptionName());
+        info.setSubscriptionName(subName);
+        assertEquals(subName, info.getSubscriptionName());
+    }
+
+    @Test
     public void testCompareTo() {
         JmsConsumerInfo first = new JmsConsumerInfo(firstId);
         JmsConsumerInfo second = new JmsConsumerInfo(secondId);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
new file mode 100644
index 0000000..2a7c893
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.jms.JmsTopic;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.junit.Test;
+
+public class AmqpSubscriptionTrackerTest {
+
+    private AtomicInteger consumerIdCounter = new AtomicInteger();
+
+    private JmsConsumerInfo createConsumerInfo(String subscriptionName, String topicName, boolean shared, boolean durable, boolean hasClientID) {
+        return createConsumerInfo(subscriptionName, topicName, shared, durable, null, hasClientID);
+    }
+
+    private JmsConsumerInfo createConsumerInfo(String subscriptionName, String topicName, boolean shared, boolean durable, String selector, boolean isExplicitClientID) {
+        JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, consumerIdCounter .incrementAndGet());
+        JmsTopic topic = new JmsTopic(topicName);
+
+        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId);
+
+        consumerInfo.setSubscriptionName(subscriptionName);
+        consumerInfo.setDestination(topic);
+        consumerInfo.setShared(shared);
+        consumerInfo.setDurable(durable);
+        consumerInfo.setSelector(selector);
+        consumerInfo.setExplicitClientID(isExplicitClientID);
+
+        return consumerInfo;
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedDurable() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, true, true);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedDurableWithoutClientID() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, true, false);
+        assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, true, false);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, true, false);
+        assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, true, false);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, true, false);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "global3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedVolatile() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, false, true);
+        assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, false, true);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, false, true);
+        assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, false, true);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, false, true);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameSharedVolatileWithoutClientID() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, false, false);
+        assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global-volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, false, false);
+        assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global-volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, false, false);
+        assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, false, false);
+        assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+
+        // Register a third subscriber for a subscription, after removing the first subscriber for the subscription.
+        // Validate the new link name isn't the same as the second subscribers (which is still using its name...)
+        tracker.consumerRemoved(sub2consumer1);
+        JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, false, false);
+        assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3));
+    }
+    @Test
+    public void testReserveNextSubscriptionLinkNameExclusiveDurable() {
+        String topicName = "myTopic";
+        String subscriptionName1 = "mySubscription1";
+        String subscriptionName2 = "mySubscription2";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        // For the first shared sub name
+        JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, false, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1));
+        // This shouldn't happen, checks elsewhere should stop requests for an exclusive durable sub link
+        // name if its already in use, but check we get the same name anyway even with an existing registration.
+        JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, false, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2));
+
+        // For the second shared sub name
+        JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, false, true, true);
+        assertEquals("Unexpected first sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1));
+        // This shouldn't happen, checks elsewhere should stop requests for an exclusive durable sub link
+        // name if its already in use, but check we get the same name anyway even with an existing registration.
+        JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, false, true, true);
+        assertEquals("Unexpected second sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2));
+    }
+
+    @Test
+    public void testReserveNextSubscriptionLinkNameExclusiveNonDurable() {
+        String topicName = "myTopic";
+        String subscriptionName = "mySubscription";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName, topicName, false, false, true);
+        try {
+            tracker.reserveNextSubscriptionLinkName(subscriptionName, subInfo);
+            fail("Should have thrown exception, tracker doesn't name these subs");
+        } catch (IllegalStateException ise) {
+            // Expected
+        }
+
+        // Verify it no-ops with an exclusive non-durable sub info
+        tracker.consumerRemoved(subInfo);
+    }
+
+    @Test
+    public void testIsActiveExclusiveDurableSub() {
+        String subscriptionName1 = "mySubscription";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker.isActiveExclusiveDurableSub(subscriptionName1));
+
+        JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName1, "myTopic", false, true, true);
+        tracker.reserveNextSubscriptionLinkName(subscriptionName1, subInfo);
+
+        assertTrue(tracker.isActiveExclusiveDurableSub(subscriptionName1));
+
+        tracker.consumerRemoved(subInfo);
+
+        assertFalse(tracker.isActiveExclusiveDurableSub(subscriptionName1));
+    }
+
+    @Test
+    public void testIsActiveSharedDurableSub() {
+        String subscriptionName1 = "mySubscription";
+
+        AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker.isActiveSharedDurableSub(subscriptionName1));
+
+        JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName1, "myTopic", true, true, true);
+        tracker.reserveNextSubscriptionLinkName(subscriptionName1, subInfo);
+
+        assertTrue(tracker.isActiveSharedDurableSub(subscriptionName1));
+
+        tracker.consumerRemoved(subInfo);
+
+        assertFalse(tracker.isActiveSharedDurableSub(subscriptionName1));
+    }
+
+    @Test
+    public void testIsActiveDurableSub() {
+        String subscriptionName = "mySubscription";
+
+        // Test when an exclusive durable sub is active
+        AmqpSubscriptionTracker tracker1 = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker1.isActiveDurableSub(subscriptionName));
+
+        JmsConsumerInfo subInfo1 = createConsumerInfo(subscriptionName, "myTopic", false, true, true);
+        tracker1.reserveNextSubscriptionLinkName(subscriptionName, subInfo1);
+
+        assertTrue(tracker1.isActiveDurableSub(subscriptionName));
+        assertTrue(tracker1.isActiveExclusiveDurableSub(subscriptionName));
+        assertFalse(tracker1.isActiveSharedDurableSub(subscriptionName));
+
+        tracker1.consumerRemoved(subInfo1);
+
+        assertFalse(tracker1.isActiveDurableSub(subscriptionName));
+        assertFalse(tracker1.isActiveExclusiveDurableSub(subscriptionName));
+        assertFalse(tracker1.isActiveSharedDurableSub(subscriptionName));
+
+        // Test when an shared durable sub is active
+        AmqpSubscriptionTracker tracker2 = new AmqpSubscriptionTracker();
+
+        assertFalse(tracker2.isActiveDurableSub(subscriptionName));
+
+        JmsConsumerInfo subInfo2 = createConsumerInfo(subscriptionName, "myTopic", true, true, true);
+        tracker2.reserveNextSubscriptionLinkName(subscriptionName, subInfo2);
+
+        assertTrue(tracker2.isActiveDurableSub(subscriptionName));
+        assertFalse(tracker2.isActiveExclusiveDurableSub(subscriptionName));
+        assertTrue(tracker2.isActiveSharedDurableSub(subscriptionName));
+
+        tracker2.consumerRemoved(subInfo2);
+
+        assertFalse(tracker2.isActiveDurableSub(subscriptionName));
+        assertFalse(tracker2.isActiveExclusiveDurableSub(subscriptionName));
+        assertFalse(tracker2.isActiveSharedDurableSub(subscriptionName));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org