You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2020/10/21 13:57:31 UTC

[activemq-artemis] branch master updated: NO JIRA - add some durable sub loadbalancing to the scenario test

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 69e5832  NO JIRA - add some durable sub loadbalancing to the scenario test
69e5832 is described below

commit 69e58322f1b99c742a2a9761d92d61e955e535a5
Author: gtully <ga...@gmail.com>
AuthorDate: Wed Oct 21 14:57:12 2020 +0100

    NO JIRA - add some durable sub loadbalancing to the scenario test
---
 .../jms/cluster/TopicClusterPageStoreSizeTest.java | 112 ++++++++++++++++++++-
 1 file changed, 109 insertions(+), 3 deletions(-)

diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
index b8bf8b8..9dc6eae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.jms.cluster;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -58,9 +59,9 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
 
       Topic topic1 = createTopic(TOPIC, true);
 
-      Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
-      Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
       MessageProducer prod1 = session1.createProducer(null);
       prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -75,9 +76,11 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
 
       if (forcePaging) {
          for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
+            System.err.println("server1: force paging on:" + psName);
             server1.getPagingManager().getPageStore(psName).startPaging();
          }
          for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+            System.err.println("server2: force paging on:" + psName);
             server2.getPagingManager().getPageStore(psName).startPaging();
          }
       }
@@ -86,10 +89,17 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
 
       TextMessage m2 = (TextMessage) cons2.receive(5000);
       assertNotNull(m2);
+      assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+      System.err.println("sub2 on 2, no ack, message txt:" + m2.getText());
+
+
       TextMessage m1 = (TextMessage) cons1.receive(5000);
+      assertNotNull(m1);
       assertTrue(m1.getJMSDestination().toString().contains(TOPIC));
+      System.err.println("message txt:" + m1.getText());
 
-      assertNotNull(m1);
+      m1.acknowledge();
+      // leave m2 for reconnect on server1
 
       conn1.close();
       conn2.close();
@@ -99,7 +109,103 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
       }
 
       for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+         System.err.println("server2: size of pages store: " + psName + " :" + server2.getPagingManager().getPageStore(psName).getAddressSize());
          assertTrue("non negative size: " + psName, server2.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
       }
+
+      if (forcePaging) {
+         // message in the store, should have getPagedSize or is there some such thing?
+         assertTrue("size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getNumberOfPages() > 0);
+      } else {
+         assertTrue("size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() > 0);
+      }
+
+      // reconnect
+      // get message for someClient2 on server 1 (cf1)
+      conn1 = cf1.createConnection();
+
+      conn1.setClientID("someClient2");
+
+      conn1.start();
+
+      session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+      cons1 = session1.createDurableSubscriber(topic1, "sub2");
+
+      m2 = (TextMessage) cons1.receive(5000);
+      assertNotNull(m2);
+      assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+      System.err.println("sub2 on 1, message txt:" + m2.getText());
+
+      m2.acknowledge();
+
+      // publish another
+      prod1 = session1.createProducer(null);
+      prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      prod1.send(topic1, session1.createTextMessage("someOtherMessage"));
+
+      // stays on server 1
+      assertTrue("some size on 1", server1.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() > 0);
+      assertTrue("no size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() == 0);
+
+      // duplicate this sub on 2
+      conn2 = cf2.createConnection();
+
+      conn2.setClientID("someClient2");
+
+      conn2.start();
+
+      session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+      // the clientId unique guarantee is per broker, not per cluster
+      cons2 = session2.createDurableSubscriber(topic1, "sub2");
+
+      // should not be able to consume yet
+      m2 = (TextMessage) cons2.receiveNoWait();
+      assertNull("did not get message", m2);
+
+      // still available on cons1
+      m2 = (TextMessage) cons1.receive(5000);
+      assertNotNull("got message", m2);
+      assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+      System.err.println("sub2 on 1, message txt:" + m2.getText());
+      m2.acknowledge();
+
+      // if we send another, lb will kick in..
+      prod1.send(topic1, session1.createTextMessage("someOtherOtherMessage"));
+
+      m2 = (TextMessage) cons2.receive(5000);
+      assertNotNull("got message", m2);
+      assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+
+      System.err.println("sub2 on 2: message txt:" + m2.getText());
+      m2.acknowledge();
+
+      // no duplicate, not available on cons1
+      m2 = (TextMessage) cons1.receiveNoWait();
+      assertNull("non null message", m2);
+
+      conn1.close();
+      conn2.close();
+
+      // pick up sub1:someClient1 messages, one from each broker
+      for (ConnectionFactory cf : new ConnectionFactory[]{cf2, cf1}) {
+         conn2 = cf.createConnection();
+         conn2.setClientID("someClient1");
+         conn2.start();
+
+         session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         cons2 = session2.createDurableSubscriber(topic1, "sub1");
+
+         m2 = (TextMessage) cons2.receive(5000);
+         assertNotNull("got message", m2);
+         assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+         System.err.println("sub1 on: " + cf + " - message txt:" + m2.getText());
+         m2.acknowledge();
+
+         conn2.close();
+      }
    }
 }