You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/15 22:19:37 UTC
[2/2] activemq-artemis git commit: NO-JIRA Added test coverage for
broker paging with unlimited memory
NO-JIRA Added test coverage for broker paging with unlimited memory
This test starts 2 servers and send messages to
a queue until it enters into paging state. Then
it changes the address max-size to -1, restarts
the 2 servers again and consumes all the messages.
It verifies that even if the max-size has changed
all the paged messages will be depaged and consumed.
No stuck messages after restarting.
The tests is there to guard a case where messages
won't be depaged on server restart after the max-size
is changed to -1. This issue has been fixed into
master along with the fix for ARTEMIS-581, particularly
the changes to the method PagingStoreImpl.getMaxSize().
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/19c6f2d5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/19c6f2d5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/19c6f2d5
Branch: refs/heads/master
Commit: 19c6f2d5c4171518e0933a3a6abfc54453c95565
Parents: 7b60df5
Author: Howard Gao <ho...@gmail.com>
Authored: Tue Nov 14 23:29:02 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 15 17:19:30 2017 -0500
----------------------------------------------------------------------
.../artemis/tests/util/ActiveMQTestBase.java | 14 +++
.../distribution/TwoWayTwoNodeClusterTest.java | 121 +++++++++++++++++++
2 files changed, 135 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19c6f2d5/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 85ad922..7cd225f 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -2118,6 +2118,20 @@ public abstract class ActiveMQTestBase extends Assert {
return message;
}
+ protected ClientMessage createTextMessage(final ClientSession session, final boolean durable, final int numChars) {
+ ClientMessage message = session.createMessage(Message.TEXT_TYPE,
+ durable,
+ 0,
+ System.currentTimeMillis(),
+ (byte)4);
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < numChars; i++) {
+ builder.append('a');
+ }
+ message.getBodyBuffer().writeString(builder.toString());
+ return message;
+ }
+
protected XidImpl newXID() {
return new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19c6f2d5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
index 36f22eb..3798b92 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
@@ -16,11 +16,22 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.junit.Before;
import org.junit.Test;
+import java.util.Map;
+
public class TwoWayTwoNodeClusterTest extends ClusterTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@@ -48,6 +59,116 @@ public class TwoWayTwoNodeClusterTest extends ClusterTestBase {
return false;
}
+ /*
+ * This test starts 2 servers and send messages to
+ * a queue until it enters into paging state. Then
+ * it changes the max-size to -1, restarts the 2 servers
+ * and consumes all the messages. If verifies that
+ * even if the max-size has changed all the paged
+ * messages will be depaged and consumed. No stuck
+ * messages after restarting.
+ */
+ @Test(timeout = 60000)
+ public void testClusterRestartWithConfigChanged() throws Exception {
+ Configuration config0 = servers[0].getConfiguration();
+ Configuration config1 = servers[1].getConfiguration();
+
+ configureBeforeStart(config0, config1);
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues", "queue0", null, true);
+ createQueue(1, "queues", "queue0", null, true);
+
+ waitForBindings(0, "queues", 1, 0, true);
+ waitForBindings(1, "queues", 1, 0, true);
+
+ waitForBindings(0, "queues", 1, 0, false);
+ waitForBindings(1, "queues", 1, 0, false);
+
+ ClientSessionFactory sf0 = sfs[0];
+ ClientSession session0 = sf0.createSession(false, false);
+ ClientProducer producer = session0.createProducer("queues");
+ final int numSent = 200;
+ for (int i = 0; i < numSent; i++) {
+ ClientMessage msg = createTextMessage(session0, true, 5000);
+ producer.send(msg);
+ if (i % 50 == 0) {
+ session0.commit();
+ }
+ }
+ session0.commit();
+ session0.close();
+
+ while (true) {
+ long msgCount0 = getMessageCount(servers[0], "queues");
+ long msgCount1 = getMessageCount(servers[1], "queues");
+
+ if (msgCount0 + msgCount1 >= numSent) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ Queue queue0 = servers[0].locateQueue(new SimpleString("queue0"));
+ assertTrue(queue0.getPageSubscription().isPaging());
+
+ closeAllSessionFactories();
+ stopServers(0, 1);
+
+ AddressSettings addressSettings0 = config0.getAddressesSettings().get("#");
+ AddressSettings addressSettings1 = config1.getAddressesSettings().get("#");
+
+ addressSettings0.setMaxSizeBytes(-1);
+ addressSettings1.setMaxSizeBytes(-1);
+
+ startServers(0, 1);
+
+ waitForBindings(0, "queues", 1, 0, true);
+ waitForBindings(1, "queues", 1, 0, true);
+
+ waitForBindings(0, "queues", 1, 0, false);
+ waitForBindings(1, "queues", 1, 0, false);
+
+ setupSessionFactory(0, isNetty());
+ addConsumer(0, 0, "queue0", null);
+
+ waitForBindings(0, "queues", 1, 1, true);
+
+ for (int i = 0; i < numSent; i++) {
+ ClientMessage m = consumers[0].consumer.receive(5000);
+ assertNotNull("failed to receive message " + i, m);
+ }
+ }
+
+ private void configureBeforeStart(Configuration... serverConfigs) {
+ for (Configuration config : serverConfigs) {
+ config.setPersistenceEnabled(true);
+ config.setMessageCounterEnabled(true);
+ config.setJournalFileSize(20971520);
+ config.setJournalMinFiles(20);
+ config.setJournalCompactPercentage(50);
+
+ Map<String, AddressSettings> addressSettingsMap0 = config.getAddressesSettings();
+ AddressSettings addrSettings = addressSettingsMap0.get("#");
+ if (addrSettings == null) {
+ addrSettings = new AddressSettings();
+ addressSettingsMap0.put("#", addrSettings);
+ }
+ addrSettings.setDeadLetterAddress(new SimpleString("jms.queue.DLQ"));
+ addrSettings.setExpiryAddress(new SimpleString("jms.queue.ExpiryQueue"));
+ addrSettings.setRedeliveryDelay(30);
+ addrSettings.setMaxDeliveryAttempts(5);
+ addrSettings.setMaxSizeBytes(1048576);
+ addrSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addrSettings.setPageSizeBytes(524288);
+ addrSettings.setMessageCounterHistoryDayLimit(10);
+ addrSettings.setRedistributionDelay(1000);
+ }
+ }
+
@Test
public void testStartStop() throws Exception {