You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/22 18:19:28 UTC

qpid-jms git commit: QPIDJMS-220: use a link capability in absense of connection capability for shared subscription support detection

Repository: qpid-jms
Updated Branches:
  refs/heads/master 90f9b1f51 -> ec2510d7c


QPIDJMS-220: use a link capability in absense of connection capability for shared subscription support detection


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ec2510d7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ec2510d7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ec2510d7

Branch: refs/heads/master
Commit: ec2510d7ca40d041775860d23c283b6781525bef
Parents: 90f9b1f
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Nov 22 18:17:40 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Nov 22 18:17:40 2016 +0000

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../amqp/builders/AmqpConsumerBuilder.java      |  50 ++++++-
 .../SubscriptionsIntegrationTest.java           | 133 +++++++++++++++++--
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  45 +++++--
 4 files changed, 203 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b53d91c..240e1c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
     <target-version>1.7</target-version>
 
     <!-- Dependency Versions for this Project -->
-    <proton-version>0.15.0</proton-version>
+    <proton-version>0.16.0-SNAPSHOT</proton-version>
     <netty-version>4.1.5.Final</netty-version>
     <slf4j-version>1.7.21</slf4j-version>
     <geronimo.jms.2.spec.version>1.0-alpha-2</geronimo.jms.2.spec.version>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
index 5cec9af..e9a54b8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
@@ -20,9 +20,12 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.COPY;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_NO_LOCAL_SYMBOL;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_SELECTOR_SYMBOL;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import javax.jms.InvalidDestinationException;
@@ -32,6 +35,7 @@ import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
+import org.apache.qpid.jms.provider.amqp.AmqpProvider;
 import org.apache.qpid.jms.provider.amqp.AmqpSession;
 import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker;
 import org.apache.qpid.jms.provider.amqp.AmqpSupport;
@@ -57,6 +61,9 @@ import org.apache.qpid.proton.engine.Receiver;
  */
 public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpSession, JmsConsumerInfo, Receiver> {
 
+    boolean validateSharedSubsLinkCapability;
+    boolean sharedSubsNotSupported;
+
     public AmqpConsumerBuilder(AmqpSession parent, JmsConsumerInfo consumerInfo) {
         super(parent, consumerInfo);
     }
@@ -78,8 +85,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
             AmqpConnection connection = getParent().getConnection();
 
             if (resourceInfo.isShared() && !connection.getProperties().isSharedSubsSupported()) {
-                // Don't allow shared sub if peer hasn't said it can handle them (or we haven't overridden it).
-                throw new JMSRuntimeException("Remote peer does not support shared subscriptions");
+                validateSharedSubsLinkCapability = true;
             }
 
             AmqpSubscriptionTracker subTracker = connection.getSubTracker();
@@ -118,10 +124,39 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
         }
         receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
+        if(validateSharedSubsLinkCapability) {
+            receiver.setDesiredCapabilities(new Symbol[] { AmqpSupport.SHARED_SUBS });
+        }
+
         return receiver;
     }
 
     @Override
+    protected void afterOpened() {
+        if(validateSharedSubsLinkCapability) {
+            Symbol[] remoteOfferedCapabilities = endpoint.getRemoteOfferedCapabilities();
+
+            boolean supported = false;
+            if(remoteOfferedCapabilities != null) {
+                List<Symbol> list = Arrays.asList(remoteOfferedCapabilities);
+                if (list.contains(SHARED_SUBS)) {
+                    supported = true;
+                }
+            }
+
+            if(!supported) {
+                sharedSubsNotSupported = true;
+
+                if(resourceInfo.isDurable()) {
+                    endpoint.detach();
+                } else {
+                    endpoint.close();
+                }
+            }
+        }
+    }
+
+    @Override
     protected void afterClosed(AmqpConsumer resource, JmsConsumerInfo info) {
         // If the resource being built is closed during the creation process
         // then this is a failure, we need to ensure we don't track it.
@@ -131,12 +166,21 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
     }
 
     @Override
+    public void processRemoteDetach(AmqpProvider provider) {
+        handleClosed(provider, null);
+    }
+
+    @Override
     protected AmqpConsumer createResource(AmqpSession parent, JmsConsumerInfo resourceInfo, Receiver endpoint) {
         return new AmqpConsumer(parent, resourceInfo, endpoint);
     }
 
     @Override
     protected Exception getOpenAbortException() {
+        if(sharedSubsNotSupported) {
+            return new JMSRuntimeException("Remote peer does not support shared subscriptions");
+        }
+
         // Verify the attach response contained a non-null Source
         org.apache.qpid.proton.amqp.transport.Source source = endpoint.getRemoteSource();
         if (source != null) {
@@ -151,7 +195,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
     protected boolean isClosePending() {
         // When no link terminus was created, the peer will now detach/close us otherwise
         // we need to validate the returned remote source prior to open completion.
-        return endpoint.getRemoteSource() == null;
+        return sharedSubsNotSupported || endpoint.getRemoteSource() == null;
     }
 
     //----- Internal implementation ------------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
index 3e08b56..6877a20 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
@@ -565,29 +565,58 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
 
     /**
      * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
-     * attempt to create a shared durable subscriber fails.
+     * attempt to create a shared durable subscriber fails if the link also doesn't identify as
+     * supporting shared subscriptions.
      *
      * @throws Exception if an unexpected exception occurs
      */
     @Test(timeout = 20000)
     public void testCreateSharedDurableTopicSubscriberFailsIfNotSupported() throws Exception {
-        doSharedTopicSubscriberSupportedTestImpl(true);
+        doSharedSubscriptionNotSupportedTestImpl(true, false);
     }
 
     /**
      * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
-     * attempt to create a shared volatile subscriber fails.
+     * attempt to create a shared volatile subscriber fails if the link also doesn't identify as
+     * supporting shared subscriptions.
      *
      * @throws Exception if an unexpected exception occurs
      */
     @Test(timeout = 20000)
     public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupported() throws Exception {
-        doSharedTopicSubscriberSupportedTestImpl(true);
+        doSharedSubscriptionNotSupportedTestImpl(false, false);
     }
 
-    private void doSharedTopicSubscriberSupportedTestImpl(boolean durable) throws Exception {
+    /**
+     * Verifies that when a durable shared subscriber creation fails because the connection and link
+     * both lack the capability indicating support, the subscriber is appropriately removed from
+     * record, releasing use of the link name such that attempting another subscriber creation
+     * uses the same initial link name.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedDurableTopicSubscriberFailsIfNotSupportedReleasesLinkName() throws Exception {
+        doSharedSubscriptionNotSupportedTestImpl(true, true);
+    }
+
+    /**
+     * Verifies that when a volatile shared subscriber creation fails because the connection and link
+     * both lack the capability indicating support, the subscriber is appropriately removed from
+     * record, releasing use of the link name such that attempting another subscriber creation
+     * uses the same initial link name.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupportedReleasesLinkName() throws Exception {
+        doSharedSubscriptionNotSupportedTestImpl(false, true);
+    }
+
+    private void doSharedSubscriptionNotSupportedTestImpl(boolean durable, boolean repeat) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             // DONT include server connection capability to indicate support for shared-subs
+            // This will cause the link capability to be desired, and we verify failure if not offered.
             Symbol[] serverCapabilities = new Symbol[]{};
 
             Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
@@ -600,20 +629,96 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
             Topic dest = session.createTopic(topicName);
             String subscriptionName = "mySubscription";
 
+            int iterations = repeat ? 2 : 1;
+
+            // Expect a shared receiver to attach, then detach due to the link also not
+            // reporting it offers the shared subs capability, i.e sharing not supported.
+            for (int i = 0; i < iterations; i++) {
+                try {
+                    if (durable) {
+                        Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                        testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true, false, true, true, false);
+                        testPeer.expectDetach(false, true, false);
+
+                        session.createSharedDurableConsumer(dest, subscriptionName);
+                    } else {
+                        Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                        testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, false, false, true, true, false);
+                        testPeer.expectDetach(true, true, true);
+
+                        session.createSharedConsumer(dest, subscriptionName);
+                    }
+
+                    fail("Expected an exception to be thrown");
+                } catch (JMSException jmse) {
+                    // expected
+                }
+            }
+
             testPeer.expectClose();
+            connection.close();
 
-            try {
-                if (durable) {
-                    session.createSharedDurableConsumer(dest, subscriptionName);
-                } else {
-                    session.createSharedConsumer(dest, subscriptionName);
-                }
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 
-                fail("Expected an exception to be thrown");
-            } catch (JMSException jmse) {
-                // expected
+    /**
+     * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+     * attempt to create a shared durable subscriber succeeds if the server offers link capability for
+     * shared subscriptions.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedDurableTopicSubscriberSucceedsWithOnlyLinkCapability() throws Exception {
+        doSharedSubscriptionLinkCapabilitySupportedTestImpl(true);
+    }
+
+    /**
+     * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+     * attempt to create a shared volatile subscriber succeeds if the server offers link capability for
+     * shared subscriptions.
+     *
+     * @throws Exception if an unexpected exception occurs
+     */
+    @Test(timeout = 20000)
+    public void testCreateSharedVolatileTopicSubscriberSucceedsWithOnlyLinkCapability() throws Exception {
+        doSharedSubscriptionLinkCapabilitySupportedTestImpl(false);
+    }
+
+    private void doSharedSubscriptionLinkCapabilitySupportedTestImpl(boolean durable) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // DONT include server connection capability to indicate support for shared-subs.
+            // This will cause the link capability to be desired, and we verify success if offered.
+            Symbol[] serverCapabilities = new Symbol[]{};
+
+            Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            // Expect a shared receiver to attach, and succeed due to the server offering
+            // the shared subs capability, i.e sharing is supported.
+            if (durable) {
+                Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+                testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true, false, true, true, true);
+                testPeer.expectLinkFlow();
+
+                session.createSharedDurableConsumer(dest, subscriptionName);
+            } else {
+                Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+                testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, false, false, true, true, true);
+                testPeer.expectLinkFlow();
+
+                session.createSharedConsumer(dest, subscriptionName);
             }
 
+            testPeer.expectClose();
             connection.close();
 
             testPeer.waitForAllHandlersToComplete(1000);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 363c76d..0840254 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1065,11 +1065,12 @@ public class TestAmqpPeer implements AutoCloseable
     public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
                                      boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
     {
-        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null);
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null, null, null);
     }
 
-    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
-                                     boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride)
+    private void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
+                                     boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride,
+                                     Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse)
     {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(linkNameMatcher)
@@ -1080,12 +1081,23 @@ public class TestAmqpPeer implements AutoCloseable
                 .withSource(sourceMatcher)
                 .withTarget(notNullValue());
 
+        if(desiredCapabilitiesMatcher != null) {
+            attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher);
+        }
+
         final AttachFrame attachResponse = new AttachFrame()
                             .setRole(Role.SENDER)
+                            .setOfferedCapabilities(offeredCapabilitiesResponse)
                             .setSndSettleMode(settled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)
                             .setRcvSettleMode(ReceiverSettleMode.FIRST)
                             .setInitialDeliveryCount(UnsignedInteger.ZERO);
 
+        expectReceiverAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, responseSourceOverride);
+    }
+
+    private void expectReceiverAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach,
+                                     boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride)
+    {
         // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
         final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
         attachResponseSender.setValueProvider(new ValueProvider()
@@ -1155,18 +1167,19 @@ public class TestAmqpPeer implements AutoCloseable
     }
 
     public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
-        expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, false, clientIdSet);
+        expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, false, clientIdSet, false, false);
     }
 
     public void expectSharedVolatileSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
-        expectSharedSubscriberAttach(topicName, subscriptionName, false, linkNameMatcher, false, clientIdSet);
+        expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, false, false, clientIdSet, false, false);
     }
 
     public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet) {
-        expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, refuseLink, clientIdSet);
+        expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, refuseLink, clientIdSet, false, false);
     }
 
-    private void expectSharedSubscriberAttach(String topicName, String subscriptionName, boolean durable, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet)
+    public void expectSharedSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean durable, boolean refuseLink,
+                                              boolean clientIdSet, boolean expectLinkCapability, boolean responseOffersLinkCapability)
     {
         Symbol[] sourceCapabilities;
         if(clientIdSet) {
@@ -1189,7 +1202,21 @@ public class TestAmqpPeer implements AutoCloseable
 
         sourceMatcher.withCapabilities(arrayContaining(sourceCapabilities));
 
-        expectReceiverAttach(linkNameMatcher, sourceMatcher, refuseLink, false);
+        // If we don't have the connection capability set we expect a desired link capability
+        Matcher<?> linkDesiredCapabilitiesMatcher;
+        if(expectLinkCapability) {
+            linkDesiredCapabilitiesMatcher = arrayContaining(new Symbol[] { AmqpSupport.SHARED_SUBS });
+        } else {
+            linkDesiredCapabilitiesMatcher = nullValue();
+        }
+
+        // Generate offered capability response if supported
+        Symbol[] linkOfferedCapabilitiesResponse = null;
+        if(responseOffersLinkCapability) {
+             linkOfferedCapabilitiesResponse = new Symbol[] { AmqpSupport.SHARED_SUBS };
+        }
+
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, false, null, null, null, linkDesiredCapabilitiesMatcher, linkOfferedCapabilitiesResponse);
     }
 
     public void expectDurableSubscriberAttach(String topicName, String subscriptionName)
@@ -1235,7 +1262,7 @@ public class TestAmqpPeer implements AutoCloseable
             }
         }
 
-        expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride);
+        expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride, null, null);
     }
 
     public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org