You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/05/15 01:05:13 UTC

activemq git commit: Add some additional tests for durable subscription recovery and lookup.

Repository: activemq
Updated Branches:
  refs/heads/master c5a1b8606 -> 29fb4a4b3


Add some additional tests for durable subscription recovery and lookup.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/29fb4a4b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/29fb4a4b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/29fb4a4b

Branch: refs/heads/master
Commit: 29fb4a4b3fbb7083fad2d720d1f638524da7c735
Parents: c5a1b86
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu May 14 18:56:34 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu May 14 18:56:34 2015 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpDurableReceiverTest.java   | 99 ++++++++++++++++++++
 1 file changed, 99 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/29fb4a4b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
index 028991d..31c8961 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java
@@ -43,6 +43,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
         return true;
     }
 
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
     @Test(timeout = 60000)
     public void testCreateDurableReceiver() throws Exception {
 
@@ -143,6 +148,49 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testReattachToDurableNodeAfterRestart() throws Exception {
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver.detach();
+
+        connection.close();
+
+        restartBroker();
+
+        connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+        session = connection.createSession();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver.close();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
     public void testLookupExistingSubscription() throws Exception {
 
         final BrokerViewMBean brokerView = getProxyToBroker();
@@ -187,6 +235,57 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testLookupExistingSubscriptionAfterRestart() throws Exception {
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        AmqpSession session = connection.createSession();
+        AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver.detach();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        restartBroker();
+
+        connection = client.createConnection();
+        connection.setContainerId(getTestName());
+        connection.connect();
+
+        session = connection.createSession();
+        receiver = session.lookupSubscription(getTestName());
+
+        assertNotNull(receiver);
+
+        Receiver protonReceiver = receiver.getReceiver();
+        assertNotNull(protonReceiver.getRemoteSource());
+        Source remoteSource = (Source) protonReceiver.getRemoteSource();
+
+        assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
+        assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
+        assertEquals(COPY, remoteSource.getDistributionMode());
+
+        assertEquals(1, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        receiver.close();
+
+        assertEquals(0, brokerView.getDurableTopicSubscribers().length);
+        assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length);
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
     public void testLookupNonExistingSubscription() throws Exception {
 
         final BrokerViewMBean brokerView = getProxyToBroker();