You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/10/21 09:56:12 UTC
[09/41] activemq-artemis git commit: ARTEMIS-799 Fix issues with the
AMQP Durable Topic Subscription model
ARTEMIS-799 Fix issues with the AMQP Durable Topic Subscription model
Fixes several issues found in the handling of durable topic
subscriptions (test cases added).
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/226f28ab
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/226f28ab
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/226f28ab
Branch: refs/heads/ARTEMIS-780
Commit: 226f28abf5468148c7dcbc397b90c383e72b3a3e
Parents: 9743043
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 12 17:33:08 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 14 03:54:35 2016 +0200
----------------------------------------------------------------------
.../amqp/proton/AMQPConnectionContext.java | 13 +-
.../amqp/proton/AmqpJmsSelectorFilter.java | 46 +++
.../protocol/amqp/proton/AmqpNoLocalFilter.java | 43 +++
.../amqp/proton/ProtonServerSenderContext.java | 41 ++-
.../amqp/proton/handler/EventHandler.java | 2 +-
.../protocol/amqp/proton/handler/Events.java | 5 +-
.../amqp/AmqpDurableReceiverTest.java | 350 +++++++++++++++++++
7 files changed, 483 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 70e4fd0..1f193eb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -24,7 +24,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -46,6 +45,8 @@ import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
+import io.netty.buffer.ByteBuf;
+
public class AMQPConnectionContext extends ProtonInitializable {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@@ -181,7 +182,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
protected void remoteLinkOpened(Link link) throws Exception {
- AMQPSessionContext protonSession = (AMQPSessionContext) getSessionExtension(link.getSession());
+ AMQPSessionContext protonSession = getSessionExtension(link.getSession());
link.setSource(link.getRemoteSource());
link.setTarget(link.getRemoteTarget());
@@ -321,6 +322,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
public void onRemoteClose(Connection connection) {
synchronized (getLock()) {
connection.close();
+ connection.free();
for (AMQPSessionContext protonSession : sessions.values()) {
protonSession.close();
}
@@ -352,6 +354,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
public void onRemoteClose(Session session) throws Exception {
synchronized (getLock()) {
session.close();
+ session.free();
}
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
@@ -375,6 +378,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override
public void onRemoteClose(Link link) throws Exception {
link.close();
+ link.free();
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) {
linkContext.close(true);
@@ -384,10 +388,11 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override
public void onRemoteDetach(Link link) throws Exception {
link.detach();
+ link.free();
}
@Override
- public void onDetach(Link link) throws Exception {
+ public void onLocalDetach(Link link) throws Exception {
Object context = link.getContext();
if (context instanceof ProtonServerSenderContext) {
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
@@ -402,10 +407,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
handler.onMessage(delivery);
} else {
// TODO: logs
-
System.err.println("Handler is null, can't delivery " + delivery);
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java
new file mode 100644
index 0000000..2e6ec2f
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpJmsSelectorFilter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+/**
+ * A Described Type wrapper for JMS selector values.
+ */
+public class AmqpJmsSelectorFilter implements DescribedType {
+
+ private final String selector;
+
+ public AmqpJmsSelectorFilter(String selector) {
+ this.selector = selector;
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return AmqpSupport.JMS_SELECTOR_CODE;
+ }
+
+ @Override
+ public Object getDescribed() {
+ return this.selector;
+ }
+
+ @Override
+ public String toString() {
+ return "AmqpJmsSelectorType{" + selector + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java
new file mode 100644
index 0000000..24f3ead
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpNoLocalFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.proton;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class AmqpNoLocalFilter implements DescribedType {
+
+ public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter();
+
+ private final String noLocal;
+
+ public AmqpNoLocalFilter() {
+ this.noLocal = "NoLocalFilter{}";
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return AmqpSupport.NO_LOCAL_CODE;
+ }
+
+ @Override
+ public Object getDescribed() {
+ return this.noLocal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 1ed5745..76279c5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -170,21 +170,46 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
String clientId = connection.getRemoteContainer();
String pubId = sender.getName();
queue = createQueueName(clientId, pubId);
- boolean exists = sessionSPI.queueQuery(queue, false).isExists();
+ QueueQueryResult result = sessionSPI.queueQuery(queue, false);
// Once confirmed that the address exists we need to return a Source that reflects
// the lifetime policy and capabilities of the new subscription.
- //
- // TODO we are not applying selector or noLocal filters to the source we just
- // looked up which would violate expectations if the client checked that they
- // are present on subscription recovery (JMS Durable Re-subscribe) etc
- if (exists) {
+ if (result.isExists()) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(queue);
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY);
source.setCapabilities(TOPIC);
+
+ SimpleString filterString = result.getFilterString();
+ if (filterString != null) {
+ selector = filterString.toString();
+ boolean noLocal = false;
+
+ String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+ String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+
+ if (selector.endsWith(noLocalFilter)) {
+ if (selector.length() > noLocalFilter.length()) {
+ noLocalFilter = " AND " + noLocalFilter;
+ selector = selector.substring(0, selector.length() - noLocalFilter.length());
+ } else {
+ selector = null;
+ }
+
+ noLocal = true;
+ }
+
+ if (noLocal) {
+ supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
+ }
+
+ if (selector != null && !selector.trim().isEmpty()) {
+ supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
+ }
+ }
+
sender.setSource(source);
} else {
throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
@@ -228,7 +253,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
}
- source.setAddress(queue);
} else {
// otherwise we are a volatile subscription
queue = java.util.UUID.randomUUID().toString();
@@ -237,7 +261,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
- source.setAddress(queue);
}
} else {
queue = source.getAddress();
@@ -308,7 +331,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// any durable resources for say pub subs
if (remoteLinkClose) {
Source source = (Source) sender.getSource();
- if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
+ if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) {
String queueName = source.getAddress();
QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
if (result.isExists() && source.getDynamic()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
index 91c9a67..00bd27a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java
@@ -69,7 +69,7 @@ public interface EventHandler {
void onRemoteDetach(Link link) throws Exception;
- void onDetach(Link link) throws Exception;
+ void onLocalDetach(Link link) throws Exception;
void onDelivery(Delivery delivery) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
index 6552f64..405491a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/Events.java
@@ -85,7 +85,7 @@ public final class Events {
handler.onFinal(event.getLink());
break;
case LINK_LOCAL_DETACH:
- handler.onDetach(event.getLink());
+ handler.onLocalDetach(event.getLink());
break;
case LINK_REMOTE_DETACH:
handler.onRemoteDetach(event.getLink());
@@ -96,7 +96,8 @@ public final class Events {
case DELIVERY:
handler.onDelivery(event.getDelivery());
break;
+ default:
+ break;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/226f28ab/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
new file mode 100644
index 0000000..e0c6b6c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for broker side support of the Durable Subscription mapping for JMS.
+ */
+public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpDurableReceiverTest.class);
+
+ private final String SELECTOR_STRING = "color = red";
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateDurableReceiver() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+ receiver.flow(1);
+
+ assertEquals(getTopicName(), lookupSubscription());
+
+ AmqpSender sender = session.createSender(getTopicName());
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("message:1");
+ sender.send(message);
+
+ message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+
+ connection.close();
+
+ assertEquals(getTopicName(), lookupSubscription());
+ }
+
+ @Test(timeout = 60000)
+ public void testDetachedDurableReceiverRemainsActive() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+ assertEquals(getTopicName(), lookupSubscription());
+
+ receiver.detach();
+
+ assertEquals(getTopicName(), lookupSubscription());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCloseDurableReceiverRemovesSubscription() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+ assertEquals(getTopicName(), lookupSubscription());
+
+ receiver.close();
+
+ assertNull(lookupSubscription());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReattachToDurableNode() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+ receiver.detach();
+
+ receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+ receiver.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLookupExistingSubscription() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
+
+ receiver.detach();
+
+ receiver = session.lookupSubscription(getSubscriptionName());
+
+ assertNotNull(receiver);
+
+ Receiver protonReceiver = receiver.getReceiver();
+ assertNotNull(protonReceiver.getRemoteSource());
+ Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+ if (remoteSource.getFilter() != null) {
+ assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+ }
+
+ assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+ assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+ assertEquals(COPY, remoteSource.getDistributionMode());
+
+ receiver.close();
+
+ try {
+ receiver = session.lookupSubscription(getSubscriptionName());
+ fail("Should not be able to lookup the subscription");
+ } catch (Exception e) {
+ }
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLookupExistingSubscriptionWithSelector() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), SELECTOR_STRING, false);
+
+ receiver.detach();
+
+ receiver = session.lookupSubscription(getSubscriptionName());
+
+ assertNotNull(receiver);
+
+ Receiver protonReceiver = receiver.getReceiver();
+ assertNotNull(protonReceiver.getRemoteSource());
+ Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+ assertNotNull(remoteSource.getFilter());
+ assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+ String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed();
+ assertEquals(SELECTOR_STRING, selector);
+
+ assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+ assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+ assertEquals(COPY, remoteSource.getDistributionMode());
+
+ receiver.close();
+
+ try {
+ receiver = session.lookupSubscription(getSubscriptionName());
+ fail("Should not be able to lookup the subscription");
+ } catch (Exception e) {
+ }
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLookupExistingSubscriptionWithNoLocal() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), null, true);
+
+ receiver.detach();
+
+ receiver = session.lookupSubscription(getSubscriptionName());
+
+ assertNotNull(receiver);
+
+ Receiver protonReceiver = receiver.getReceiver();
+ assertNotNull(protonReceiver.getRemoteSource());
+ Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+ assertNotNull(remoteSource.getFilter());
+ assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+
+ assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+ assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+ assertEquals(COPY, remoteSource.getDistributionMode());
+
+ receiver.close();
+
+ try {
+ receiver = session.lookupSubscription(getSubscriptionName());
+ fail("Should not be able to lookup the subscription");
+ } catch (Exception e) {
+ }
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), SELECTOR_STRING, true);
+
+ receiver.detach();
+
+ receiver = session.lookupSubscription(getSubscriptionName());
+
+ assertNotNull(receiver);
+
+ Receiver protonReceiver = receiver.getReceiver();
+ assertNotNull(protonReceiver.getRemoteSource());
+ Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+ assertNotNull(remoteSource.getFilter());
+ assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+ String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed();
+ assertEquals(SELECTOR_STRING, selector);
+
+ assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+ assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+ assertEquals(COPY, remoteSource.getDistributionMode());
+
+ receiver.close();
+
+ try {
+ receiver = session.lookupSubscription(getSubscriptionName());
+ fail("Should not be able to lookup the subscription");
+ } catch (Exception e) {
+ }
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLookupNonExistingSubscription() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.createConnection());
+ connection.setContainerId(getContainerID());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+
+ try {
+ session.lookupSubscription(getSubscriptionName());
+ fail("Should throw an exception since there is not subscription");
+ } catch (Exception e) {
+ LOG.info("Error on lookup: {}", e.getMessage());
+ }
+
+ connection.close();
+ }
+
+ public String lookupSubscription() {
+ Binding binding = server.getPostOffice().getBinding(new SimpleString(getContainerID() + "." + getSubscriptionName()));
+ if (binding != null) {
+ return binding.getAddress().toString();
+ }
+
+ return null;
+ }
+
+ public String getContainerID() {
+ return "myContainerID";
+ }
+
+ public String getSubscriptionName() {
+ return "mySubscription";
+ }
+
+ public String getTopicName() {
+ return "jms.topic.myTopic";
+ }
+}