You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/22 18:19:28 UTC
qpid-jms git commit: QPIDJMS-220: use a link capability in absense of
connection capability for shared subscription support detection
Repository: qpid-jms
Updated Branches:
refs/heads/master 90f9b1f51 -> ec2510d7c
QPIDJMS-220: use a link capability in absense of connection capability for shared subscription support detection
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ec2510d7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ec2510d7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ec2510d7
Branch: refs/heads/master
Commit: ec2510d7ca40d041775860d23c283b6781525bef
Parents: 90f9b1f
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Nov 22 18:17:40 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Nov 22 18:17:40 2016 +0000
----------------------------------------------------------------------
pom.xml | 2 +-
.../amqp/builders/AmqpConsumerBuilder.java | 50 ++++++-
.../SubscriptionsIntegrationTest.java | 133 +++++++++++++++++--
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 45 +++++--
4 files changed, 203 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b53d91c..240e1c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
<target-version>1.7</target-version>
<!-- Dependency Versions for this Project -->
- <proton-version>0.15.0</proton-version>
+ <proton-version>0.16.0-SNAPSHOT</proton-version>
<netty-version>4.1.5.Final</netty-version>
<slf4j-version>1.7.21</slf4j-version>
<geronimo.jms.2.spec.version>1.0-alpha-2</geronimo.jms.2.spec.version>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
index 5cec9af..e9a54b8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
@@ -20,9 +20,12 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.COPY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_NO_LOCAL_SYMBOL;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_SELECTOR_SYMBOL;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import javax.jms.InvalidDestinationException;
@@ -32,6 +35,7 @@ import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
+import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
@@ -57,6 +61,9 @@ import org.apache.qpid.proton.engine.Receiver;
*/
public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpSession, JmsConsumerInfo, Receiver> {
+ boolean validateSharedSubsLinkCapability;
+ boolean sharedSubsNotSupported;
+
public AmqpConsumerBuilder(AmqpSession parent, JmsConsumerInfo consumerInfo) {
super(parent, consumerInfo);
}
@@ -78,8 +85,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
AmqpConnection connection = getParent().getConnection();
if (resourceInfo.isShared() && !connection.getProperties().isSharedSubsSupported()) {
- // Don't allow shared sub if peer hasn't said it can handle them (or we haven't overridden it).
- throw new JMSRuntimeException("Remote peer does not support shared subscriptions");
+ validateSharedSubsLinkCapability = true;
}
AmqpSubscriptionTracker subTracker = connection.getSubTracker();
@@ -118,10 +124,39 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
}
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ if(validateSharedSubsLinkCapability) {
+ receiver.setDesiredCapabilities(new Symbol[] { AmqpSupport.SHARED_SUBS });
+ }
+
return receiver;
}
@Override
+ protected void afterOpened() {
+ if(validateSharedSubsLinkCapability) {
+ Symbol[] remoteOfferedCapabilities = endpoint.getRemoteOfferedCapabilities();
+
+ boolean supported = false;
+ if(remoteOfferedCapabilities != null) {
+ List<Symbol> list = Arrays.asList(remoteOfferedCapabilities);
+ if (list.contains(SHARED_SUBS)) {
+ supported = true;
+ }
+ }
+
+ if(!supported) {
+ sharedSubsNotSupported = true;
+
+ if(resourceInfo.isDurable()) {
+ endpoint.detach();
+ } else {
+ endpoint.close();
+ }
+ }
+ }
+ }
+
+ @Override
protected void afterClosed(AmqpConsumer resource, JmsConsumerInfo info) {
// If the resource being built is closed during the creation process
// then this is a failure, we need to ensure we don't track it.
@@ -131,12 +166,21 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
}
@Override
+ public void processRemoteDetach(AmqpProvider provider) {
+ handleClosed(provider, null);
+ }
+
+ @Override
protected AmqpConsumer createResource(AmqpSession parent, JmsConsumerInfo resourceInfo, Receiver endpoint) {
return new AmqpConsumer(parent, resourceInfo, endpoint);
}
@Override
protected Exception getOpenAbortException() {
+ if(sharedSubsNotSupported) {
+ return new JMSRuntimeException("Remote peer does not support shared subscriptions");
+ }
+
// Verify the attach response contained a non-null Source
org.apache.qpid.proton.amqp.transport.Source source = endpoint.getRemoteSource();
if (source != null) {
@@ -151,7 +195,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
protected boolean isClosePending() {
// When no link terminus was created, the peer will now detach/close us otherwise
// we need to validate the returned remote source prior to open completion.
- return endpoint.getRemoteSource() == null;
+ return sharedSubsNotSupported || endpoint.getRemoteSource() == null;
}
//----- Internal implementation ------------------------------------------//
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
index 3e08b56..6877a20 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java
@@ -565,29 +565,58 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
/**
* Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
- * attempt to create a shared durable subscriber fails.
+ * attempt to create a shared durable subscriber fails if the link also doesn't identify as
+ * supporting shared subscriptions.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedDurableTopicSubscriberFailsIfNotSupported() throws Exception {
- doSharedTopicSubscriberSupportedTestImpl(true);
+ doSharedSubscriptionNotSupportedTestImpl(true, false);
}
/**
* Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
- * attempt to create a shared volatile subscriber fails.
+ * attempt to create a shared volatile subscriber fails if the link also doesn't identify as
+ * supporting shared subscriptions.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupported() throws Exception {
- doSharedTopicSubscriberSupportedTestImpl(true);
+ doSharedSubscriptionNotSupportedTestImpl(false, false);
}
- private void doSharedTopicSubscriberSupportedTestImpl(boolean durable) throws Exception {
+ /**
+ * Verifies that when a durable shared subscriber creation fails because the connection and link
+ * both lack the capability indicating support, the subscriber is appropriately removed from
+ * record, releasing use of the link name such that attempting another subscriber creation
+ * uses the same initial link name.
+ *
+ * @throws Exception if an unexpected exception occurs
+ */
+ @Test(timeout = 20000)
+ public void testCreateSharedDurableTopicSubscriberFailsIfNotSupportedReleasesLinkName() throws Exception {
+ doSharedSubscriptionNotSupportedTestImpl(true, true);
+ }
+
+ /**
+ * Verifies that when a volatile shared subscriber creation fails because the connection and link
+ * both lack the capability indicating support, the subscriber is appropriately removed from
+ * record, releasing use of the link name such that attempting another subscriber creation
+ * uses the same initial link name.
+ *
+ * @throws Exception if an unexpected exception occurs
+ */
+ @Test(timeout = 20000)
+ public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupportedReleasesLinkName() throws Exception {
+ doSharedSubscriptionNotSupportedTestImpl(false, true);
+ }
+
+ private void doSharedSubscriptionNotSupportedTestImpl(boolean durable, boolean repeat) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT include server connection capability to indicate support for shared-subs
+ // This will cause the link capability to be desired, and we verify failure if not offered.
Symbol[] serverCapabilities = new Symbol[]{};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
@@ -600,20 +629,96 @@ public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
+ int iterations = repeat ? 2 : 1;
+
+ // Expect a shared receiver to attach, then detach due to the link also not
+ // reporting it offers the shared subs capability, i.e sharing not supported.
+ for (int i = 0; i < iterations; i++) {
+ try {
+ if (durable) {
+ Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+ testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true, false, true, true, false);
+ testPeer.expectDetach(false, true, false);
+
+ session.createSharedDurableConsumer(dest, subscriptionName);
+ } else {
+ Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+ testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, false, false, true, true, false);
+ testPeer.expectDetach(true, true, true);
+
+ session.createSharedConsumer(dest, subscriptionName);
+ }
+
+ fail("Expected an exception to be thrown");
+ } catch (JMSException jmse) {
+ // expected
+ }
+ }
+
testPeer.expectClose();
+ connection.close();
- try {
- if (durable) {
- session.createSharedDurableConsumer(dest, subscriptionName);
- } else {
- session.createSharedConsumer(dest, subscriptionName);
- }
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
- fail("Expected an exception to be thrown");
- } catch (JMSException jmse) {
- // expected
+ /**
+ * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+ * attempt to create a shared durable subscriber succeeds if the server offers link capability for
+ * shared subscriptions.
+ *
+ * @throws Exception if an unexpected exception occurs
+ */
+ @Test(timeout = 20000)
+ public void testCreateSharedDurableTopicSubscriberSucceedsWithOnlyLinkCapability() throws Exception {
+ doSharedSubscriptionLinkCapabilitySupportedTestImpl(true);
+ }
+
+ /**
+ * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
+ * attempt to create a shared volatile subscriber succeeds if the server offers link capability for
+ * shared subscriptions.
+ *
+ * @throws Exception if an unexpected exception occurs
+ */
+ @Test(timeout = 20000)
+ public void testCreateSharedVolatileTopicSubscriberSucceedsWithOnlyLinkCapability() throws Exception {
+ doSharedSubscriptionLinkCapabilitySupportedTestImpl(false);
+ }
+
+ private void doSharedSubscriptionLinkCapabilitySupportedTestImpl(boolean durable) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // DONT include server connection capability to indicate support for shared-subs.
+ // This will cause the link capability to be desired, and we verify success if offered.
+ Symbol[] serverCapabilities = new Symbol[]{};
+
+ Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
+ connection.start();
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic dest = session.createTopic(topicName);
+ String subscriptionName = "mySubscription";
+
+ // Expect a shared receiver to attach, and succeed due to the server offering
+ // the shared subs capability, i.e sharing is supported.
+ if (durable) {
+ Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
+ testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true, false, true, true, true);
+ testPeer.expectLinkFlow();
+
+ session.createSharedDurableConsumer(dest, subscriptionName);
+ } else {
+ Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
+ testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, false, false, true, true, true);
+ testPeer.expectLinkFlow();
+
+ session.createSharedConsumer(dest, subscriptionName);
}
+ testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec2510d7/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 363c76d..0840254 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1065,11 +1065,12 @@ public class TestAmqpPeer implements AutoCloseable
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
{
- expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null);
+ expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null, null, null);
}
- public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
- boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride)
+ private void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
+ boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride,
+ Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse)
{
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(linkNameMatcher)
@@ -1080,12 +1081,23 @@ public class TestAmqpPeer implements AutoCloseable
.withSource(sourceMatcher)
.withTarget(notNullValue());
+ if(desiredCapabilitiesMatcher != null) {
+ attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher);
+ }
+
final AttachFrame attachResponse = new AttachFrame()
.setRole(Role.SENDER)
+ .setOfferedCapabilities(offeredCapabilitiesResponse)
.setSndSettleMode(settled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST)
.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ expectReceiverAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, responseSourceOverride);
+ }
+
+ private void expectReceiverAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach,
+ boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride)
+ {
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
@@ -1155,18 +1167,19 @@ public class TestAmqpPeer implements AutoCloseable
}
public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
- expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, false, clientIdSet);
+ expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, false, clientIdSet, false, false);
}
public void expectSharedVolatileSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
- expectSharedSubscriberAttach(topicName, subscriptionName, false, linkNameMatcher, false, clientIdSet);
+ expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, false, false, clientIdSet, false, false);
}
public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet) {
- expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, refuseLink, clientIdSet);
+ expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, refuseLink, clientIdSet, false, false);
}
- private void expectSharedSubscriberAttach(String topicName, String subscriptionName, boolean durable, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet)
+ public void expectSharedSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean durable, boolean refuseLink,
+ boolean clientIdSet, boolean expectLinkCapability, boolean responseOffersLinkCapability)
{
Symbol[] sourceCapabilities;
if(clientIdSet) {
@@ -1189,7 +1202,21 @@ public class TestAmqpPeer implements AutoCloseable
sourceMatcher.withCapabilities(arrayContaining(sourceCapabilities));
- expectReceiverAttach(linkNameMatcher, sourceMatcher, refuseLink, false);
+ // If we don't have the connection capability set we expect a desired link capability
+ Matcher<?> linkDesiredCapabilitiesMatcher;
+ if(expectLinkCapability) {
+ linkDesiredCapabilitiesMatcher = arrayContaining(new Symbol[] { AmqpSupport.SHARED_SUBS });
+ } else {
+ linkDesiredCapabilitiesMatcher = nullValue();
+ }
+
+ // Generate offered capability response if supported
+ Symbol[] linkOfferedCapabilitiesResponse = null;
+ if(responseOffersLinkCapability) {
+ linkOfferedCapabilitiesResponse = new Symbol[] { AmqpSupport.SHARED_SUBS };
+ }
+
+ expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, false, null, null, null, linkDesiredCapabilitiesMatcher, linkOfferedCapabilitiesResponse);
}
public void expectDurableSubscriberAttach(String topicName, String subscriptionName)
@@ -1235,7 +1262,7 @@ public class TestAmqpPeer implements AutoCloseable
}
}
- expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride);
+ expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride, null, null);
}
public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org