You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/13 18:30:05 UTC
activemq git commit: NO-JIRA: Add some additional test variations and
add some more checks (cherry picked from commit
3e237ca73a9c9af18191ff4a23c456c1d427511e)
Repository: activemq
Updated Branches:
refs/heads/activemq-5.14.x 330deb2c8 -> 70728e97d
NO-JIRA: Add some additional test variations and add some more checks
(cherry picked from commit 3e237ca73a9c9af18191ff4a23c456c1d427511e)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/70728e97
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/70728e97
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/70728e97
Branch: refs/heads/activemq-5.14.x
Commit: 70728e97dabf29cccfc03794589b9515d8a7ce51
Parents: 330deb2
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 13 14:29:13 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 13 14:29:55 2016 -0400
----------------------------------------------------------------------
.../amqp/interop/AmqpDurableReceiverTest.java | 122 +++++++++++++++++--
1 file changed, 111 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/70728e97/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
index b016cc5..3db3301 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -31,6 +31,7 @@ import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@@ -42,6 +43,8 @@ import org.junit.Test;
*/
public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
+ private final String SELECTOR_STRING = "color = red";
+
@Override
protected boolean isUseOpenWireConnector() {
return true;
@@ -244,6 +247,103 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testLookupExistingSubscriptionWithSelector() throws Exception {
+
+ final BrokerViewMBean brokerView = getProxyToBroker();
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = trackConnection(client.createConnection());
+ connection.setContainerId(getTestName());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING);
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.detach();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver = session.lookupSubscription(getTestName());
+
+ assertNotNull(receiver);
+
+ Receiver protonReceiver = receiver.getReceiver();
+ assertNotNull(protonReceiver.getRemoteSource());
+ Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+ assertNotNull(remoteSource.getFilter());
+ assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+ assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed());
+
+ assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+ assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+ assertEquals(COPY, remoteSource.getDistributionMode());
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.close();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLookupExistingSubscriptionWithNoLocal() throws Exception {
+
+ final BrokerViewMBean brokerView = getProxyToBroker();
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = trackConnection(client.createConnection());
+ connection.setContainerId(getTestName());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), null, true);
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.detach();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver = session.lookupSubscription(getTestName());
+
+ assertNotNull(receiver);
+
+ Receiver protonReceiver = receiver.getReceiver();
+ assertNotNull(protonReceiver.getRemoteSource());
+ Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+ assertNotNull(remoteSource.getFilter());
+ assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+
+ assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+ assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+ assertEquals(COPY, remoteSource.getDistributionMode());
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.close();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception {
final BrokerViewMBean brokerView = getProxyToBroker();
@@ -254,7 +354,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
connection.connect();
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true);
+ AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING, true);
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
@@ -272,10 +372,10 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
- if (remoteSource.getFilter() != null) {
- assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
- assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
- }
+ assertNotNull(remoteSource.getFilter());
+ assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+ assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed());
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
@@ -303,7 +403,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
connection.connect();
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true);
+ AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING, true);
assertEquals(1, brokerView.getDurableTopicSubscribers().length);
assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
@@ -328,10 +428,10 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
- if (remoteSource.getFilter() != null) {
- assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
- assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
- }
+ assertNotNull(remoteSource.getFilter());
+ assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
+ assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
+ assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed());
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());