You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2020/10/21 13:57:31 UTC
[activemq-artemis] branch master updated: NO JIRA - add some
durable sub loadbalancing to the scenario test
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 69e5832 NO JIRA - add some durable sub loadbalancing to the scenario test
69e5832 is described below
commit 69e58322f1b99c742a2a9761d92d61e955e535a5
Author: gtully <ga...@gmail.com>
AuthorDate: Wed Oct 21 14:57:12 2020 +0100
NO JIRA - add some durable sub loadbalancing to the scenario test
---
.../jms/cluster/TopicClusterPageStoreSizeTest.java | 112 ++++++++++++++++++++-
1 file changed, 109 insertions(+), 3 deletions(-)
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
index b8bf8b8..9dc6eae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/TopicClusterPageStoreSizeTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.jms.cluster;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -58,9 +59,9 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
Topic topic1 = createTopic(TOPIC, true);
- Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer prod1 = session1.createProducer(null);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -75,9 +76,11 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
if (forcePaging) {
for (SimpleString psName : server1.getPagingManager().getStoreNames()) {
+ System.err.println("server1: force paging on:" + psName);
server1.getPagingManager().getPageStore(psName).startPaging();
}
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+ System.err.println("server2: force paging on:" + psName);
server2.getPagingManager().getPageStore(psName).startPaging();
}
}
@@ -86,10 +89,17 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
TextMessage m2 = (TextMessage) cons2.receive(5000);
assertNotNull(m2);
+ assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+ System.err.println("sub2 on 2, no ack, message txt:" + m2.getText());
+
+
TextMessage m1 = (TextMessage) cons1.receive(5000);
+ assertNotNull(m1);
assertTrue(m1.getJMSDestination().toString().contains(TOPIC));
+ System.err.println("message txt:" + m1.getText());
- assertNotNull(m1);
+ m1.acknowledge();
+ // leave m2 for reconnect on server1
conn1.close();
conn2.close();
@@ -99,7 +109,103 @@ public class TopicClusterPageStoreSizeTest extends JMSClusteredTestBase {
}
for (SimpleString psName : server2.getPagingManager().getStoreNames()) {
+ System.err.println("server2: size of pages store: " + psName + " :" + server2.getPagingManager().getPageStore(psName).getAddressSize());
assertTrue("non negative size: " + psName, server2.getPagingManager().getPageStore(psName).getAddressSize() >= 0);
}
+
+ if (forcePaging) {
+ // message in the store, should have getPagedSize or is there some such thing?
+ assertTrue("size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getNumberOfPages() > 0);
+ } else {
+ assertTrue("size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() > 0);
+ }
+
+ // reconnect
+ // get message for someClient2 on server 1 (cf1)
+ conn1 = cf1.createConnection();
+
+ conn1.setClientID("someClient2");
+
+ conn1.start();
+
+ session1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons1 = session1.createDurableSubscriber(topic1, "sub2");
+
+ m2 = (TextMessage) cons1.receive(5000);
+ assertNotNull(m2);
+ assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+ System.err.println("sub2 on 1, message txt:" + m2.getText());
+
+ m2.acknowledge();
+
+ // publish another
+ prod1 = session1.createProducer(null);
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ prod1.send(topic1, session1.createTextMessage("someOtherMessage"));
+
+ // stays on server 1
+ assertTrue("some size on 1", server1.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() > 0);
+ assertTrue("no size on 2", server2.getPagingManager().getPageStore(SimpleString.toSimpleString(TOPIC)).getAddressSize() == 0);
+
+ // duplicate this sub on 2
+ conn2 = cf2.createConnection();
+
+ conn2.setClientID("someClient2");
+
+ conn2.start();
+
+ session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // the clientId unique guarantee is per broker, not per cluster
+ cons2 = session2.createDurableSubscriber(topic1, "sub2");
+
+ // should not be able to consume yet
+ m2 = (TextMessage) cons2.receiveNoWait();
+ assertNull("did not get message", m2);
+
+ // still available on cons1
+ m2 = (TextMessage) cons1.receive(5000);
+ assertNotNull("got message", m2);
+ assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+ System.err.println("sub2 on 1, message txt:" + m2.getText());
+ m2.acknowledge();
+
+ // if we send another, lb will kick in..
+ prod1.send(topic1, session1.createTextMessage("someOtherOtherMessage"));
+
+ m2 = (TextMessage) cons2.receive(5000);
+ assertNotNull("got message", m2);
+ assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+
+ System.err.println("sub2 on 2: message txt:" + m2.getText());
+ m2.acknowledge();
+
+ // no duplicate, not available on cons1
+ m2 = (TextMessage) cons1.receiveNoWait();
+ assertNull("non null message", m2);
+
+ conn1.close();
+ conn2.close();
+
+ // pick up sub1:someClient1 messages, one from each broker
+ for (ConnectionFactory cf : new ConnectionFactory[]{cf2, cf1}) {
+ conn2 = cf.createConnection();
+ conn2.setClientID("someClient1");
+ conn2.start();
+
+ session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ cons2 = session2.createDurableSubscriber(topic1, "sub1");
+
+ m2 = (TextMessage) cons2.receive(5000);
+ assertNotNull("got message", m2);
+ assertTrue(m2.getJMSDestination().toString().contains(TOPIC));
+ System.err.println("sub1 on: " + cf + " - message txt:" + m2.getText());
+ m2.acknowledge();
+
+ conn2.close();
+ }
}
}