You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/11 17:50:09 UTC
[3/3] qpid-jms git commit: QPIDJMS-220,
QPIDJMS-207: initial work on support for shared topic subscriptions
QPIDJMS-220, QPIDJMS-207: initial work on support for shared topic subscriptions
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/952de60a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/952de60a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/952de60a
Branch: refs/heads/master
Commit: 952de60aec6abb295d5285c3e4474a04014b1038
Parents: 06a7216
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Nov 11 17:48:41 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Nov 11 17:48:41 2016 +0000
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 10 +-
.../org/apache/qpid/jms/JmsMessageConsumer.java | 8 +-
.../java/org/apache/qpid/jms/JmsSession.java | 22 +-
.../jms/JmsSharedDurableMessageConsumer.java | 36 +
.../qpid/jms/JmsSharedMessageConsumer.java | 36 +
.../apache/qpid/jms/meta/JmsConnectionInfo.java | 9 +-
.../apache/qpid/jms/meta/JmsConsumerInfo.java | 36 +-
.../qpid/jms/provider/amqp/AmqpConnection.java | 20 +-
.../provider/amqp/AmqpConnectionProperties.java | 23 +
.../provider/amqp/AmqpConnectionSession.java | 33 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 9 +
.../qpid/jms/provider/amqp/AmqpProvider.java | 56 +-
.../qpid/jms/provider/amqp/AmqpSession.java | 19 -
.../provider/amqp/AmqpSubscriptionTracker.java | 285 ++++
.../qpid/jms/provider/amqp/AmqpSupport.java | 6 +
.../amqp/builders/AmqpConsumerBuilder.java | 83 +-
.../amqp/builders/AmqpResourceBuilder.java | 15 +
.../org/apache/qpid/jms/JmsSessionTest.java | 51 +-
.../jms/integration/IntegrationTestFixture.java | 4 +
.../SubscriptionsIntegrationTest.java | 1480 ++++++++++++++++++
.../qpid/jms/meta/JmsConnectionInfoTest.java | 11 +-
.../qpid/jms/meta/JmsConsumerInfoTest.java | 37 +-
.../amqp/AmqpSubscriptionTrackerTest.java | 285 ++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 85 +-
24 files changed, 2555 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 33cb169..82bdeff 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -101,7 +101,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private final ThreadPoolExecutor executor;
private volatile IOException firstFailureError;
- private boolean clientIdSet;
private ExceptionListener exceptionListener;
private JmsMessageFactory messageFactory;
private Provider provider;
@@ -295,7 +294,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
public synchronized void setClientID(String clientID) throws JMSException {
checkClosedOrFailed();
- if (clientIdSet) {
+ if (connectionInfo.isExplicitClientID()) {
throw new IllegalStateException("The clientID has already been set");
}
if (clientID == null || clientID.isEmpty()) {
@@ -305,8 +304,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
throw new IllegalStateException("Cannot set the client id once connected.");
}
- this.connectionInfo.setClientId(clientID);
- this.clientIdSet = true;
+ this.connectionInfo.setClientId(clientID, true);
// We weren't connected if we got this far, we should now connect to ensure the
// configured clientID is valid.
@@ -478,7 +476,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) {
- connectionInfo.setClientId(clientIdGenerator.generateId());
+ connectionInfo.setClientId(clientIdGenerator.generateId(), false);
}
createResource(connectionInfo);
@@ -560,7 +558,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
protected synchronized boolean isExplicitClientID() {
- return clientIdSet;
+ return connectionInfo.isExplicitClientID();
}
//----- Provider interface methods ---------------------------------------//
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index b224574..3d2fba9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -93,9 +93,11 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy();
consumerInfo = new JmsConsumerInfo(consumerId);
- consumerInfo.setClientId(connection.getClientID());
+ consumerInfo.setExplicitClientID(connection.isExplicitClientID());
consumerInfo.setSelector(selector);
+ consumerInfo.setDurable(isDurableSubscription());
consumerInfo.setSubscriptionName(name);
+ consumerInfo.setShared(isSharedSubscription());
consumerInfo.setDestination(destination);
consumerInfo.setAcknowledgementMode(acknowledgementMode);
consumerInfo.setNoLocal(noLocal);
@@ -639,6 +641,10 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
return false;
}
+ public boolean isSharedSubscription() {
+ return false;
+ }
+
public boolean isBrowser() {
return false;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index e6c85cc..a3af20d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -510,8 +510,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public MessageConsumer createSharedConsumer(Topic topic, String name) throws JMSException {
checkClosed();
- // TODO Auto-generated method stub
- throw new JMSException("Not yet implemented");
+ return createSharedConsumer(topic, name, null);
}
/**
@@ -520,8 +519,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public MessageConsumer createSharedConsumer(Topic topic, String name, String selector) throws JMSException {
checkClosed();
- // TODO Auto-generated method stub
- throw new JMSException("Not yet implemented");
+ checkDestination(topic);
+ selector = checkSelector(selector);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+ JmsMessageConsumer result = new JmsSharedMessageConsumer(getNextConsumerId(), this, dest, name, selector);
+ result.init();
+ return result;
}
/**
@@ -530,8 +533,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
checkClosed();
- // TODO Auto-generated method stub
- throw new JMSException("Not yet implemented");
+ return createSharedDurableConsumer(topic, name, null);
}
/**
@@ -540,8 +542,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String selector) throws JMSException {
checkClosed();
- // TODO Auto-generated method stub
- throw new JMSException("Not yet implemented");
+ checkDestination(topic);
+ selector = checkSelector(selector);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+ JmsMessageConsumer result = new JmsSharedDurableMessageConsumer(getNextConsumerId(), this, dest, name, selector);
+ result.init();
+ return result;
}
//////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java
new file mode 100644
index 0000000..8229ef5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedDurableMessageConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a JmsMessageConsumer that is part of a Shared Durable Subscription
+ */
+public class JmsSharedDurableMessageConsumer extends JmsSharedMessageConsumer implements AutoCloseable {
+
+ public JmsSharedDurableMessageConsumer(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, String selector) throws JMSException {
+ super(id, s, destination, name, selector);
+ }
+
+ @Override
+ public boolean isDurableSubscription() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java
new file mode 100644
index 0000000..7aa09db
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSharedMessageConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a JmsMessageConsumer that is part of a Shared Subscription
+ */
+public class JmsSharedMessageConsumer extends JmsMessageConsumer implements AutoCloseable {
+
+ public JmsSharedMessageConsumer(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, String selector) throws JMSException {
+ super(id, s, destination, name, selector, false);
+ }
+
+ @Override
+ public boolean isSharedSubscription() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index bc723c0..9768132 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -47,6 +47,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
private URI configuredURI;
private URI connectedURI;
private String clientId;
+ private boolean explicitClientID;
private String username;
private String password;
private boolean forceAsyncSend;
@@ -89,6 +90,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
private void copy(JmsConnectionInfo copy) {
copy.clientId = clientId;
+ copy.explicitClientID = explicitClientID;
copy.username = username;
copy.password = password;
copy.forceAsyncSend = forceAsyncSend;
@@ -148,8 +150,13 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
return clientId;
}
- public void setClientId(String clientId) {
+ public void setClientId(String clientId, boolean explicitClientID) {
this.clientId = clientId;
+ this.explicitClientID = explicitClientID;
+ }
+
+ public boolean isExplicitClientID() {
+ return explicitClientID;
}
public String getUsername() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index b22b3b5..e8f793f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -29,8 +29,10 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
private int prefetchSize;
private boolean browser;
private String selector;
- private String clientId;
+ private boolean explicitClientID;
private String subscriptionName;
+ private boolean durable;
+ private boolean shared;
private boolean noLocal;
private int acknowledgementMode;
private boolean localMessageExpiry;
@@ -69,9 +71,11 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
info.prefetchSize = prefetchSize;
info.browser = browser;
info.selector = selector;
- info.clientId = clientId;
+ info.explicitClientID = explicitClientID;
+ info.durable = durable;
info.subscriptionName = subscriptionName;
info.noLocal = noLocal;
+ info.shared = shared;
info.acknowledgementMode = acknowledgementMode;
info.lastDeliveredSequenceId = lastDeliveredSequenceId;
info.redeliveryPolicy = getRedeliveryPolicy().copy();
@@ -79,10 +83,6 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
info.listener = listener;
}
- public boolean isDurable() {
- return subscriptionName != null;
- }
-
@Override
public JmsConsumerId getId() {
return consumerId;
@@ -128,12 +128,20 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
this.selector = selector;
}
- public String getClientId() {
- return clientId;
+ public boolean isExplicitClientID() {
+ return explicitClientID;
+ }
+
+ public void setExplicitClientID(boolean explicitClientID) {
+ this.explicitClientID = explicitClientID;
+ }
+
+ public boolean isDurable() {
+ return durable;
}
- public void setClientId(String clientId) {
- this.clientId = clientId;
+ public void setDurable(boolean durable) {
+ this.durable = durable;
}
public String getSubscriptionName() {
@@ -144,6 +152,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
this.subscriptionName = durableSubscriptionId;
}
+ public boolean isShared() {
+ return shared;
+ }
+
+ public void setShared(boolean shared) {
+ this.shared = shared;
+ }
+
public boolean isNoLocal() {
return noLocal;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index cf4e441..e7c9b62 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -41,6 +41,8 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
+ private AmqpSubscriptionTracker subTracker = new AmqpSubscriptionTracker();
+
private final AmqpJmsMessageFactory amqpMessageFactory;
private final URI remoteURI;
@@ -80,15 +82,15 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
}
public void unsubscribe(String subscriptionName, AsyncResult request) {
-
- for (AmqpSession session : sessions.values()) {
- if (session.containsSubscription(subscriptionName)) {
- request.onFailure(new JMSException("Cannot remove an active durable subscription"));
- return;
- }
+ // Check if there is an active (i.e open subscriber) shared or exclusive durable subscription using this name
+ if(subTracker.isActiveDurableSub(subscriptionName)) {
+ request.onFailure(new JMSException("Can't remove an active durable subscription: " + subscriptionName));
+ return;
}
- connectionSession.unsubscribe(subscriptionName, request);
+ boolean hasClientID = getResourceInfo().isExplicitClientID();
+
+ connectionSession.unsubscribe(subscriptionName, hasClientID, request);
}
@Override
@@ -221,6 +223,10 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
return properties;
}
+ public AmqpSubscriptionTracker getSubTracker() {
+ return subTracker;
+ }
+
/**
* Allows a connection resource to schedule a task for future execution.
*
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index c090853..815104a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -20,6 +20,7 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX;
import java.util.Arrays;
@@ -44,6 +45,7 @@ public class AmqpConnectionProperties {
private boolean delayedDeliverySupported = false;
private boolean anonymousRelaySupported = false;
+ private boolean sharedSubsSupported = false;
private boolean connectionOpenFailed = false;
/**
@@ -84,6 +86,10 @@ public class AmqpConnectionProperties {
if (list.contains(DELAYED_DELIVERY)) {
delayedDeliverySupported = true;
}
+
+ if (list.contains(SHARED_SUBS)) {
+ sharedSubsSupported = true;
+ }
}
protected void processProperties(Map<Symbol, Object> properties) {
@@ -110,6 +116,23 @@ public class AmqpConnectionProperties {
}
/**
+ * @return true if the connection supports shared subscriptions features.
+ */
+ public boolean isSharedSubsSupported() {
+ return sharedSubsSupported;
+ }
+
+ /**
+ * Sets if the connection supports shared subscriptions features.
+ *
+ * @param sharedSubsSupported
+ * true if the shared subscriptions features are supported.
+ */
+ public void setSharedSubsSupported(boolean sharedSubsSupported) {
+ this.sharedSubsSupported = sharedSubsSupported;
+ }
+
+ /**
* @return true if the connection supports sending message with delivery delays.
*/
public boolean isDelayedDeliverySupported() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 0a7c689..7ca27aa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -63,12 +63,17 @@ public class AmqpConnectionSession extends AmqpSession {
*
* @param subscriptionName
* the subscription name that is to be removed.
+ * @param hasClientID
+ * whether the connection has a clientID set.
* @param request
* the request that awaits the completion of this action.
*/
- public void unsubscribe(String subscriptionName, AsyncResult request) {
- DurableSubscriptionReattachBuilder builder = new DurableSubscriptionReattachBuilder(this, getResourceInfo(), subscriptionName);
- DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(builder, request);
+ public void unsubscribe(String subscriptionName, boolean hasClientID, AsyncResult request) {
+ AmqpSubscriptionTracker subTracker = getConnection().getSubTracker();
+ String linkName = subTracker.getFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
+
+ DurableSubscriptionReattachBuilder builder = new DurableSubscriptionReattachBuilder(this, getResourceInfo(), linkName);
+ DurableSubscriptionReattachRequest subscribeRequest = new DurableSubscriptionReattachRequest(subscriptionName, builder, request);
pendingUnsubs.put(subscriptionName, subscribeRequest);
LOG.debug("Attempting remove of subscription: {}", subscriptionName);
@@ -81,24 +86,24 @@ public class AmqpConnectionSession extends AmqpSession {
super(resource, receiver, parent);
}
- public String getSubscriptionName() {
+ public String getLinkName() {
return getEndpoint().getName();
}
}
private final class DurableSubscriptionReattachBuilder extends AmqpResourceBuilder<DurableSubscriptionReattach, AmqpSession, JmsSessionInfo, Receiver> {
- private final String subscriptionName;
+ private final String linkName;
- public DurableSubscriptionReattachBuilder(AmqpSession parent, JmsSessionInfo resourceInfo, String subscriptionName) {
+ public DurableSubscriptionReattachBuilder(AmqpSession parent, JmsSessionInfo resourceInfo, String linkName) {
super(parent, resourceInfo);
- this.subscriptionName = subscriptionName;
+ this.linkName = linkName;
}
@Override
protected Receiver createEndpoint(JmsSessionInfo resourceInfo) {
- Receiver receiver = getParent().getEndpoint().receiver(subscriptionName);
+ Receiver receiver = getParent().getEndpoint().receiver(linkName);
receiver.setTarget(new Target());
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
@@ -121,18 +126,20 @@ public class AmqpConnectionSession extends AmqpSession {
private final class DurableSubscriptionReattachRequest extends WrappedAsyncResult {
+ private final String subscriptionName;
private final DurableSubscriptionReattachBuilder subscriberBuilder;
- public DurableSubscriptionReattachRequest(DurableSubscriptionReattachBuilder subscriberBuilder, AsyncResult originalRequest) {
+ public DurableSubscriptionReattachRequest(String subscriptionName, DurableSubscriptionReattachBuilder subscriberBuilder, AsyncResult originalRequest) {
super(originalRequest);
+ this.subscriptionName = subscriptionName;
this.subscriberBuilder = subscriberBuilder;
}
@Override
public void onSuccess() {
DurableSubscriptionReattach subscriber = subscriberBuilder.getResource();
- LOG.trace("Reattached to subscription: {}", subscriber.getSubscriptionName());
- pendingUnsubs.remove(subscriber.getSubscriptionName());
+ LOG.trace("Reattached to subscription '{}' using link name '{}'", subscriptionName, subscriber.getLinkName());
+ pendingUnsubs.remove(subscriptionName);
if (subscriber.getEndpoint().getRemoteSource() != null) {
subscriber.close(getWrappedRequest());
} else {
@@ -144,8 +151,8 @@ public class AmqpConnectionSession extends AmqpSession {
@Override
public void onFailure(Throwable result) {
DurableSubscriptionReattach subscriber = subscriberBuilder.getResource();
- LOG.trace("Failed to reattach to subscription: {}", subscriber.getSubscriptionName());
- pendingUnsubs.remove(subscriber.getSubscriptionName());
+ LOG.trace("Failed to reattach to subscription '{}' using link name '{}'", subscriptionName, subscriber.getLinkName());
+ pendingUnsubs.remove(subscriptionName);
subscriber.resourceClosed();
super.onFailure(result);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index b60bb83..30e1cef 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -565,6 +565,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
public void postRollback() {
}
+ @Override
+ public void handleResourceClosure(AmqpProvider provider, Exception error) {
+ AmqpConnection connection = session.getConnection();
+ AmqpSubscriptionTracker subTracker = connection.getSubTracker();
+ JmsConsumerInfo consumerInfo = getResourceInfo();
+
+ subTracker.consumerRemoved(consumerInfo);
+ }
+
//----- Inner classes used in message pull operations --------------------//
protected static final class ScheduledRequest implements AsyncResult {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index ca8a0e1..76dbc90 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -415,8 +415,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
- AmqpSession session = connection.getSession(sessionInfo.getId());
- session.close(request);
+ final AmqpSession session = connection.getSession(sessionInfo.getId());
+ session.close(new AsyncResult() {
+ // TODO: bit of a hack, but works. Similarly below for locally initiated consumer close.
+ @Override
+ public void onSuccess() {
+ onComplete();
+ request.onSuccess();
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ onComplete();
+ request.onFailure(result);
+ }
+
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
+
+ void onComplete() {
+ // Mark the sessions resources closed, which in turn calls
+ // the subscription cleanup.
+ session.handleResourceClosure(AmqpProvider.this, null);
+ }
+ });
}
@Override
@@ -427,10 +451,32 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
@Override
- public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+ public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception {
AmqpSession session = connection.getSession(consumerInfo.getParentId());
- AmqpConsumer consumer = session.getConsumer(consumerInfo);
- consumer.close(request);
+ final AmqpConsumer consumer = session.getConsumer(consumerInfo);
+ consumer.close(new AsyncResult() {
+ // TODO: bit of a hack, but works. Similarly above for locally initiated session close.
+ @Override
+ public void onSuccess() {
+ onComplete();
+ request.onSuccess();
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ onComplete();
+ request.onFailure(result);
+ }
+
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
+
+ void onComplete() {
+ connection.getSubTracker().consumerRemoved(consumerInfo);
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index efe52d6..5f441e9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -241,25 +241,6 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
}
/**
- * Query the Session to see if there are any registered consumer instances that have
- * a durable subscription with the given subscription name.
- *
- * @param subscriptionName
- * the name of the subscription being searched for.
- *
- * @return true if there is a consumer that has the given subscription.
- */
- public boolean containsSubscription(String subscriptionName) {
- for (AmqpConsumer consumer : consumers.values()) {
- if (subscriptionName.equals(consumer.getResourceInfo().getSubscriptionName())) {
- return true;
- }
- }
-
- return false;
- }
-
- /**
* Call to send an error that occurs outside of the normal asynchronous processing
* of a session resource such as a remote close etc.
*
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java
new file mode 100644
index 0000000..da6af22
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTracker.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.JMSRuntimeException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+
+/**
+ * Class used to track named subscriptions on a connection to allow verifying
+ * current usage and assigning appropriate link names.
+ */
+public class AmqpSubscriptionTracker {
+
+ Set<String> exclusiveDurableSubs = new HashSet<>();
+ Map<String, SubDetails> sharedDurableSubs = new HashMap<>();
+ Map<String, SubDetails> sharedVolatileSubs = new HashMap<>();
+
+ public String reserveNextSubscriptionLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
+ validateSubscriptionName(subscriptionName);
+
+ if(consumerInfo == null) {
+ throw new IllegalArgumentException("Consumer info must not be null.");
+ }
+
+ if (consumerInfo.isShared()) {
+ if (consumerInfo.isDurable()) {
+ return getSharedDurableSubLinkName(subscriptionName, consumerInfo);
+ } else {
+ return getSharedVolatileSubLinkName(subscriptionName, consumerInfo);
+ }
+ } else if (consumerInfo.isDurable()) {
+ registerExclusiveDurableSub(subscriptionName);
+ return subscriptionName;
+ } else {
+ throw new IllegalStateException("Non-shared non-durable sub link naming is not handled by the tracker.");
+ }
+ }
+
+ private void validateSubscriptionName(String subscriptionName) {
+ if(subscriptionName == null) {
+ throw new IllegalArgumentException("Subscription name must not be null.");
+ }
+
+ if(subscriptionName.isEmpty()) {
+ throw new IllegalArgumentException("Subscription name must not be empty.");
+ }
+
+ if(subscriptionName.contains(SUB_NAME_DELIMITER)) {
+ throw new IllegalArgumentException("Subscription name must not contain '" + SUB_NAME_DELIMITER +"' character.");
+ }
+ }
+
+ private String getSharedDurableSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
+ JmsDestination topic = consumerInfo.getDestination();
+ String selector = consumerInfo.getSelector();
+
+ SubDetails subDetails = null;
+ if(sharedDurableSubs.containsKey(subscriptionName)) {
+ subDetails = sharedDurableSubs.get(subscriptionName);
+
+ if(subDetails.matches(topic, selector)){
+ subDetails.addSubscriber(consumerInfo);
+ } else {
+ throw new JMSRuntimeException("Subscription details dont match existing subscriber.");
+ }
+ } else {
+ subDetails = new SubDetails(topic, selector, consumerInfo);
+ }
+
+ sharedDurableSubs.put(subscriptionName, subDetails);
+
+ int count = subDetails.totalSubscriberCount();
+
+ return getDurableSubscriptionLinkName(subscriptionName, consumerInfo.isExplicitClientID(), count);
+ }
+
+ private String getDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID, int count) {
+ String linkName = getFirstDurableSubscriptionLinkName(subscriptionName, hasClientID);
+ if(count > 1) {
+ if(hasClientID) {
+ linkName += SUB_NAME_DELIMITER + count;
+ } else {
+ linkName += count;
+ }
+ }
+
+ return linkName;
+ }
+
+ public String getFirstDurableSubscriptionLinkName(String subscriptionName, boolean hasClientID) {
+ validateSubscriptionName(subscriptionName);
+
+ String receiverLinkName = subscriptionName;
+ if(!hasClientID) {
+ receiverLinkName += SUB_NAME_DELIMITER + "global";
+ }
+
+ return receiverLinkName;
+ }
+
+ private String getSharedVolatileSubLinkName(String subscriptionName, JmsConsumerInfo consumerInfo) {
+ JmsDestination topic = consumerInfo.getDestination();
+ String selector = consumerInfo.getSelector();
+
+ SubDetails subDetails = null;
+ if(sharedVolatileSubs.containsKey(subscriptionName)) {
+ subDetails = sharedVolatileSubs.get(subscriptionName);
+
+ if(subDetails.matches(topic, selector)){
+ subDetails.addSubscriber(consumerInfo);
+ } else {
+ throw new JMSRuntimeException("Subscription details dont match existing subscriber");
+ }
+ } else {
+ subDetails = new SubDetails(topic, selector, consumerInfo);
+ }
+
+ sharedVolatileSubs.put(subscriptionName, subDetails);
+
+ String receiverLinkName = subscriptionName + SUB_NAME_DELIMITER;
+ int count = subDetails.totalSubscriberCount();
+
+ if (consumerInfo.isExplicitClientID()) {
+ receiverLinkName += "volatile" + count;
+ } else {
+ receiverLinkName += "global-volatile" + count;
+ }
+
+ return receiverLinkName;
+ }
+
+ private void registerExclusiveDurableSub(String subscriptionName) {
+ exclusiveDurableSubs.add(subscriptionName);
+ }
+
+ /**
+ * Checks if there is an exclusive durable subscription already
+ * recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is an exclusive durable sub with this name already active
+ */
+ public boolean isActiveExclusiveDurableSub(String subscriptionName) {
+ return exclusiveDurableSubs.contains(subscriptionName);
+ }
+
+ /**
+ * Checks if there is a shared durable subscription already
+ * recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is a shared durable sub with this name already active
+ */
+ public boolean isActiveSharedDurableSub(String subscriptionName) {
+ return sharedDurableSubs.containsKey(subscriptionName);
+ }
+
+ /**
+ * Checks if there is either a shared or exclusive durable subscription
+ * already recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is a durable sub with this name already active
+ */
+ public boolean isActiveDurableSub(String subscriptionName) {
+ return isActiveExclusiveDurableSub(subscriptionName) || isActiveSharedDurableSub(subscriptionName);
+ }
+
+ /**
+ * Checks if there is an shared volatile subscription already
+ * recorded as active with the given subscription name.
+ *
+ * @param subscriptionName name of subscription to check
+ * @return true if there is a shared volatile sub with this name already active
+ */
+ public boolean isActiveSharedVolatileSub(String subscriptionName) {
+ return sharedVolatileSubs.containsKey(subscriptionName);
+ }
+
+ public void consumerRemoved(JmsConsumerInfo consumerInfo) {
+ String subscriptionName = consumerInfo.getSubscriptionName();
+
+ if (subscriptionName != null && !subscriptionName.isEmpty()) {
+ if (consumerInfo.isShared()) {
+ if (consumerInfo.isDurable()) {
+ if(sharedDurableSubs.containsKey(subscriptionName)) {
+ SubDetails subDetails = sharedDurableSubs.get(subscriptionName);
+ subDetails.removeSubscriber(consumerInfo);
+
+ int count = subDetails.activeSubscribers();
+ if(count < 1) {
+ sharedDurableSubs.remove(subscriptionName);
+ }
+ }
+ } else {
+ if(sharedVolatileSubs.containsKey(subscriptionName)) {
+ SubDetails subDetails = sharedVolatileSubs.get(subscriptionName);
+ subDetails.removeSubscriber(consumerInfo);
+
+ int count = subDetails.activeSubscribers();
+ if(count < 1) {
+ sharedVolatileSubs.remove(subscriptionName);
+ }
+ }
+ }
+ } else if (consumerInfo.isDurable()) {
+ exclusiveDurableSubs.remove(subscriptionName);
+ }
+ }
+ }
+
+ private static class SubDetails {
+ private JmsDestination topic = null;
+ private String selector = null;
+ private Set<JmsConsumerInfo> subscribers = new HashSet<>();
+ private int totalSubscriberCount;
+
+ public SubDetails(JmsDestination topic, String selector, JmsConsumerInfo info) {
+ if(topic == null) {
+ throw new IllegalArgumentException("Topic destination must not be null");
+ }
+
+ this.topic = topic;
+ this.selector = selector;
+ addSubscriber(info);
+ }
+
+ public void addSubscriber(JmsConsumerInfo info) {
+ if(info == null) {
+ throw new IllegalArgumentException("Consumer info must not be null");
+ }
+
+ totalSubscriberCount++;
+ subscribers.add(info);
+ }
+
+ public void removeSubscriber(JmsConsumerInfo info) {
+ subscribers.remove(info);
+ }
+
+ public int activeSubscribers() {
+ return subscribers.size();
+ }
+
+ public int totalSubscriberCount() {
+ return totalSubscriberCount;
+ }
+
+ public boolean matches(JmsDestination newTopic, String newSelector) {
+ if(!topic.equals(newTopic)) {
+ return false;
+ }
+
+ if (selector == null) {
+ return newSelector == null;
+ } else {
+ return selector.equals(newSelector);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 9738d68..adac112 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -44,6 +44,7 @@ public class AmqpSupport {
public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
+ public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
// Symbols used to announce connection error information
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
@@ -69,6 +70,8 @@ public class AmqpSupport {
public static final Symbol COPY = Symbol.getSymbol("copy");
public static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
public static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+ public static final Symbol SHARED = Symbol.valueOf("shared");
+ public static final Symbol GLOBAL = Symbol.valueOf("global");
// Delivery states
public static final Rejected REJECTED = new Rejected();
@@ -80,6 +83,9 @@ public class AmqpSupport {
public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:";
public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:";
+ // Subscription Name Delimiter
+ public static final String SUB_NAME_DELIMITER = "|";
+
//----- Static initializer -----------------------------------------------//
static {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
index 3bf8836..5cec9af 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java
@@ -22,14 +22,19 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_SELECTOR_SYMBOL;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import javax.jms.InvalidDestinationException;
+import javax.jms.JMSRuntimeException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
+import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker;
+import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsNoLocalType;
import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsSelectorType;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
@@ -59,22 +64,51 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
@Override
protected Receiver createEndpoint(JmsConsumerInfo resourceInfo) {
JmsDestination destination = resourceInfo.getDestination();
- String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
+ String address = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
Source source = new Source();
- source.setAddress(subscription);
+ source.setAddress(address);
Target target = new Target();
configureSource(source);
- String receiverName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + subscription;
- if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) {
- // In the case of Durable Topic Subscriptions the client must use the same
- // receiver name which is derived from the subscription name property.
- receiverName = resourceInfo.getSubscriptionName();
+ String receiverLinkName = null;
+ String subscriptionName = resourceInfo.getSubscriptionName();
+ if (subscriptionName != null && !subscriptionName.isEmpty()) {
+ AmqpConnection connection = getParent().getConnection();
+
+ if (resourceInfo.isShared() && !connection.getProperties().isSharedSubsSupported()) {
+ // Don't allow shared sub if peer hasn't said it can handle them (or we haven't overridden it).
+ throw new JMSRuntimeException("Remote peer does not support shared subscriptions");
+ }
+
+ AmqpSubscriptionTracker subTracker = connection.getSubTracker();
+
+ // Validate subscriber type allowed given existing active subscriber types.
+ if (resourceInfo.isShared() && resourceInfo.isDurable()) {
+ if(subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
+ // Don't allow shared sub if there is already an active exclusive durable sub
+ throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+ }
+ } else if (!resourceInfo.isShared() && resourceInfo.isDurable()) {
+ if (subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
+ // Exclusive durable sub is already active
+ throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
+ } else if (subTracker.isActiveSharedDurableSub(subscriptionName)) {
+ // Don't allow exclusive durable sub if there is already an active shared durable sub
+ throw new JMSRuntimeException("A shared durable subscription is already active with name '" + subscriptionName + "'");
+ }
+ }
+
+ // Get the link name for the subscription. Throws if certain further validations fail.
+ receiverLinkName = subTracker.reserveNextSubscriptionLinkName(subscriptionName, resourceInfo);
}
- Receiver receiver = getParent().getEndpoint().receiver(receiverName);
+ if(receiverLinkName == null) {
+ receiverLinkName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + address;
+ }
+
+ Receiver receiver = getParent().getEndpoint().receiver(receiverLinkName);
receiver.setSource(source);
receiver.setTarget(target);
if (resourceInfo.isBrowser() || resourceInfo.isPresettle()) {
@@ -88,6 +122,15 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
}
@Override
+ protected void afterClosed(AmqpConsumer resource, JmsConsumerInfo info) {
+ // If the resource being built is closed during the creation process
+ // then this is a failure, we need to ensure we don't track it.
+ AmqpConnection connection = getParent().getConnection();
+ AmqpSubscriptionTracker subTracker = connection.getSubTracker();
+ subTracker.consumerRemoved(info);
+ }
+
+ @Override
protected AmqpConsumer createResource(AmqpSession parent, JmsConsumerInfo resourceInfo, Receiver endpoint) {
return new AmqpConsumer(parent, resourceInfo, endpoint);
}
@@ -118,7 +161,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL };
- if (resourceInfo.getSubscriptionName() != null && !resourceInfo.getSubscriptionName().isEmpty()) {
+ if (resourceInfo.isDurable()) {
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setDistributionMode(COPY);
@@ -131,14 +174,32 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS
source.setDistributionMode(COPY);
}
+ // Capabilities
+ LinkedList<Symbol> capabilities = new LinkedList<>();
+
Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(resourceInfo.getDestination());
- if(typeCapability != null) {
- source.setCapabilities(typeCapability);
+ if(typeCapability != null){
+ capabilities.add(typeCapability);
+ }
+
+ if(resourceInfo.isShared()) {
+ capabilities.add(AmqpSupport.SHARED);
+ }
+
+ if(!resourceInfo.isExplicitClientID()) {
+ capabilities.add(AmqpSupport.GLOBAL);
+ }
+
+ if(!capabilities.isEmpty()) {
+ Symbol[] capArray = capabilities.toArray(new Symbol[capabilities.size()]);
+ source.setCapabilities(capArray);
}
+ //Outcomes
source.setOutcomes(outcomes);
source.setDefaultOutcome(MODIFIED_FAILED);
+ // Filters
if (resourceInfo.isNoLocal()) {
filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 5912f7f..a1a9aac 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -164,6 +164,9 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
// If the resource being built is closed during the creation process
// then this is always an error.
+ // Perform any post processing relating to closure during creation attempt
+ afterClosed(getResource(), getResourceInfo());
+
Throwable openError;
if (hasRemoteError()) {
openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
@@ -245,6 +248,18 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
// Nothing to do here.
}
+ /**
+ * Called if endpoint opening process fails in order to give the subclasses a
+ * place to perform any follow-on processing or teardown steps before the operation
+ * is deemed to have been completed and failure is signalled.
+ *
+ * @param resource the resource
+ * @param resourceInfo the resourceInfo
+ */
+ protected void afterClosed(TARGET resource, INFO resourceInfo) {
+ // Nothing to do here.
+ }
+
protected boolean hasRemoteError() {
return getEndpoint().getRemoteCondition().getCondition() != null;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
index be3e668..27fc511 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -35,7 +35,10 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.qpid.jms.JmsConnectionTestSupport;
+import org.apache.qpid.jms.JmsMessageConsumer;
import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.JmsTemporaryQueue;
import org.junit.Before;
@@ -332,41 +335,53 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
} catch (RuntimeException re) {}
}
- //----- Not yet implemented, should all be cleared on implementation -----//
-
@Test(timeout = 10000)
public void testCreateSharedConsumer() throws Exception {
JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try {
- session.createSharedConsumer(session.createTopic("test"), "subscription");
- fail("Should fail until implemented.");
- } catch (JMSException ex) {}
+
+ Topic topic = session.createTopic("test");
+ JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedConsumer(topic, "subscription");
+
+ assertNotNull(consumer);
+ assertNull("unexpected selector", consumer.getMessageSelector());
+ assertEquals("unexpected topic", topic, consumer.getDestination());
}
@Test(timeout = 10000)
public void testCreateSharedConsumerWithSelector() throws Exception {
JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try {
- session.createSharedConsumer(session.createTopic("test"), "subscription", "a = b");
- fail("Should fail until implemented.");
- } catch (JMSException ex) {}
+
+ String selector = "a = b";
+ Topic topic = session.createTopic("test");
+ JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedConsumer(topic, "subscription", selector);
+
+ assertNotNull(consumer);
+ assertEquals("unexpected selector", selector, consumer.getMessageSelector());
+ assertEquals("unexpected topic", topic, consumer.getDestination());
}
@Test(timeout = 10000)
public void testCreateSharedDurableConsumer() throws Exception {
JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try {
- session.createSharedDurableConsumer(session.createTopic("test"), "subscription");
- fail("Should fail until implemented.");
- } catch (JMSException ex) {}
+
+ Topic topic = session.createTopic("test");
+ JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedDurableConsumer(topic, "subscription");
+
+ assertNotNull(consumer);
+ assertNull("unexpected selector", consumer.getMessageSelector());
+ assertEquals("unexpected topic", topic, consumer.getDestination());
}
@Test(timeout = 10000)
public void testCreateSharedDurableConsumerWithSelector() throws Exception {
JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try {
- session.createSharedDurableConsumer(session.createTopic("test"), "subscription", "a = b");
- fail("Should fail until implemented.");
- } catch (JMSException ex) {}
+
+ String selector = "a = b";
+ Topic topic = session.createTopic("test");
+ JmsMessageConsumer consumer = (JmsMessageConsumer) session.createSharedDurableConsumer(topic, "subscription", selector);
+
+ assertNotNull(consumer);
+ assertEquals("unexpected selector", selector, consumer.getMessageSelector());
+ assertEquals("unexpected topic", topic, consumer.getDestination());
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
index 7c33fc1..2ddf739 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
@@ -54,6 +54,10 @@ public class IntegrationTestFixture {
return establishConnecton(testPeer, false, optionsString, serverCapabilities, serverProperties, true);
}
+ Connection establishConnectonWithoutClientID(TestAmqpPeer testPeer, Symbol[] serverCapabilities) throws JMSException {
+ return establishConnecton(testPeer, false, null, serverCapabilities, null, false);
+ }
+
Connection establishConnecton(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId) throws JMSException {
Symbol[] desiredCapabilities = new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY };
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org