You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/05/15 01:05:13 UTC
activemq git commit: Add some additional tests for durable
subscription recovery and lookup.
Repository: activemq
Updated Branches:
refs/heads/master c5a1b8606 -> 29fb4a4b3
Add some additional tests for durable subscription recovery and lookup.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/29fb4a4b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/29fb4a4b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/29fb4a4b
Branch: refs/heads/master
Commit: 29fb4a4b3fbb7083fad2d720d1f638524da7c735
Parents: c5a1b86
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu May 14 18:56:34 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu May 14 18:56:34 2015 -0400
----------------------------------------------------------------------
.../amqp/interop/AmqpDurableReceiverTest.java | 99 ++++++++++++++++++++
1 file changed, 99 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/29fb4a4b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
index 028991d..31c8961 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -43,6 +43,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
return true;
}
+ @Override
+ protected boolean isPersistent() {
+ return true;
+ }
+
@Test(timeout = 60000)
public void testCreateDurableReceiver() throws Exception {
@@ -143,6 +148,49 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testReattachToDurableNodeAfterRestart() throws Exception {
+
+ final BrokerViewMBean brokerView = getProxyToBroker();
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.createConnection();
+ connection.setContainerId(getTestName());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.detach();
+
+ connection.close();
+
+ restartBroker();
+
+ connection = client.createConnection();
+ connection.setContainerId(getTestName());
+ connection.connect();
+ session = connection.createSession();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.close();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
public void testLookupExistingSubscription() throws Exception {
final BrokerViewMBean brokerView = getProxyToBroker();
@@ -187,6 +235,57 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testLookupExistingSubscriptionAfterRestart() throws Exception {
+
+ final BrokerViewMBean brokerView = getProxyToBroker();
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.createConnection();
+ connection.setContainerId(getTestName());
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.detach();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ restartBroker();
+
+ connection = client.createConnection();
+ connection.setContainerId(getTestName());
+ connection.connect();
+
+ session = connection.createSession();
+ receiver = session.lookupSubscription(getTestName());
+
+ assertNotNull(receiver);
+
+ Receiver protonReceiver = receiver.getReceiver();
+ assertNotNull(protonReceiver.getRemoteSource());
+ Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+ assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+ assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+ assertEquals(COPY, remoteSource.getDistributionMode());
+
+ assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ receiver.close();
+
+ assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+ assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
public void testLookupNonExistingSubscription() throws Exception {
final BrokerViewMBean brokerView = getProxyToBroker();