You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/06 14:08:06 UTC
[4/7] activemq-artemis git commit: ARTEMIS-581 Implement max disk
usage, and global-max-size
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index c32ebc1..b80df4e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
+import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index f915a31..9e4a650 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -51,7 +51,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
-import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
+import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java
new file mode 100644
index 0000000..f6003c5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AddressFullLoggingTest extends ActiveMQTestBase {
+
+ @BeforeClass
+ public static void prepareLogger() {
+ AssertionLoggerHandler.startCapture();
+ }
+
+ @Test
+ public void testBlockLogging() throws Exception {
+ final int MAX_MESSAGES = 200;
+ final String MY_ADDRESS = "myAddress";
+ final String MY_QUEUE = "myQueue";
+
+ ActiveMQServer server = createServer(false);
+
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+ server.start();
+
+ internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server);
+ }
+
+ @Test
+ public void testGlobalBlockLogging() throws Exception {
+ final int MAX_MESSAGES = 200;
+ final String MY_ADDRESS = "myAddress";
+ final String MY_QUEUE = "myQueue";
+
+ ActiveMQServer server = createServer(false);
+
+ AddressSettings defaultSetting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+ server.getConfiguration().setGlobalMaxSize(20 * 1024);
+
+ server.start();
+
+
+ internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server);
+ }
+
+ private void internalTest(int MAX_MESSAGES,
+ String MY_ADDRESS,
+ String MY_QUEUE,
+ ActiveMQServer server) throws Exception {
+ ServerLocator locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ ClientSessionFactory factory = createSessionFactory(locator);
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(MY_ADDRESS, MY_QUEUE, true);
+
+ final ClientProducer producer = session.createProducer(MY_ADDRESS);
+
+ final ClientMessage message = session.createMessage(false);
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
+ Callable<Object> sendMessageTask = new Callable<Object>() {
+ @Override
+ public Object call() throws ActiveMQException {
+ producer.send(message);
+ return null;
+ }
+ };
+
+ int sendCount = 0;
+
+ for (int i = 0; i < MAX_MESSAGES; i++) {
+ Future<Object> future = executor.submit(sendMessageTask);
+ try {
+ future.get(3, TimeUnit.SECONDS);
+ sendCount++;
+ }
+ catch (TimeoutException ex) {
+ // message sending has been blocked
+ break;
+ }
+ finally {
+ future.cancel(true); // may or may not desire this
+ }
+ }
+
+ executor.shutdown();
+ session.close();
+
+ session = factory.createSession(false, true, true);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(MY_QUEUE);
+ for (int i = 0; i < sendCount; i++) {
+ ClientMessage msg = consumer.receive(250);
+ if (msg == null)
+ break;
+ msg.acknowledge();
+ }
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ // Using the code only so the test doesn't fail just because someone edits the log text
+ Assert.assertTrue("Expected to find AMQ222183", AssertionLoggerHandler.findText("AMQ222183", "myAddress"));
+ Assert.assertTrue("Expected to find AMQ221046", AssertionLoggerHandler.findText("AMQ221046", "myAddress"));
+ }
+
+ @AfterClass
+ public static void clearLogger() {
+ AssertionLoggerHandler.stopCapture();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
new file mode 100644
index 0000000..ee975ca
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GlobalPagingTest extends PagingTest {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected ActiveMQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final long pageSize,
+ final long maxAddressSize,
+ final Map<String, AddressSettings> settings) {
+ ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
+
+ if (settings != null) {
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
+ }
+
+ server.getConfiguration().setGlobalMaxSize(maxAddressSize);
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+
+
+ @Test
+ public void testPagingOverFullDisk() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+ server.getConfiguration().setGlobalMaxSize(-1);
+
+ server.start();
+
+ ActiveMQServerImpl serverImpl = (ActiveMQServerImpl)server;
+ serverImpl.getMonitor().stop(); // stop the scheduled executor, we will do it manually only
+ serverImpl.getMonitor().tick();
+
+ final int numberOfMessages = 500;
+
+ locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+
+ final ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ final ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ final byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+
+ Queue queue = server.locateQueue(ADDRESS);
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+ sendFewMessages(numberOfMessages, session, producer, body);
+
+ serverImpl.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+
+ serverImpl.getMonitor().tick();
+
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ sendFewMessages(numberOfMessages, session, producer, body);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ t.start();
+
+ t.join(1000);
+ Assert.assertTrue(t.isAlive());
+
+ // releasing the disk
+ serverImpl.getMonitor().setMaxUsage(1).tick();
+ t.join(5000);
+ Assert.assertFalse(t.isAlive());
+
+
+ session.start();
+
+ assertEquals(numberOfMessages * 2, getMessageCount(queue));
+
+ // The consumer has to be created after the getMessageCount(queue) assertion
+ // otherwise delivery could alter the messagecount and give us a false failure
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ ClientMessage msg = null;
+
+ for (int i = 0; i < numberOfMessages * 2; i++) {
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ if (i % 500 == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+
+ assertEquals(0, getMessageCount(queue));
+ }
+
+ protected void sendFewMessages(int numberOfMessages,
+ ClientSession session,
+ ClientProducer producer,
+ byte[] body) throws ActiveMQException {
+ ClientMessage message;
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java
new file mode 100644
index 0000000..54c658f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A PagingOrderTest. PagingTest has a lot of tests already. I decided to create a newer one more
+ * specialized on Ordering and counters
+ */
+public class PagingOrderTest extends ActiveMQTestBase {
+
+ private static final int PAGE_MAX = 100 * 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ private Connection conn;
+
+ @Test
+ public void testOrder1() throws Throwable {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(true, true, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages / 2; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("id").intValue());
+
+ if (i < 100) {
+ // Do not consume the last one so we could restart
+ message.acknowledge();
+ }
+ }
+
+ session.close();
+
+ sf.close();
+ sf = createSessionFactory(locator);
+
+ session = sf.createSession(true, true, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 100; i < numberOfMessages; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.close();
+ }
+
+ @Test
+ public void testPageCounter() throws Throwable {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ ServerLocator sl = createInVMNonHALocator();
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession sess = sf.createSession(true, true, 0);
+ sess.start();
+ ClientConsumer cons = sess.createConsumer(ADDRESS);
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+ sess.close();
+ sl.close();
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ t1.start();
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 20 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ t1.join();
+
+ assertEquals(0, errors.get());
+
+ assertEquals(numberOfMessages, getMessageCount(q2));
+ assertEquals(numberOfMessages, getMessagesAdded(q2));
+ assertEquals(0, getMessageCount(q1));
+ assertEquals(numberOfMessages, getMessagesAdded(q1));
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server.start();
+
+ Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
+
+ q1 = null;
+ q2 = null;
+
+ for (Binding bind : bindings.getBindings()) {
+ if (bind instanceof LocalQueueBinding) {
+ LocalQueueBinding qb = (LocalQueueBinding) bind;
+ if (qb.getQueue().getName().equals(ADDRESS)) {
+ q1 = qb.getQueue();
+ }
+
+ if (qb.getQueue().getName().equals(new SimpleString("inactive"))) {
+ q2 = qb.getQueue();
+ }
+ }
+ }
+
+ assertNotNull(q1);
+
+ assertNotNull(q2);
+
+ assertEquals("q2 msg count", numberOfMessages, getMessageCount(q2));
+ assertEquals("q2 msgs added", numberOfMessages, getMessagesAdded(q2));
+ assertEquals("q1 msg count", 0, getMessageCount(q1));
+ // 0, since nothing was sent to the queue after the server was restarted
+ assertEquals("q1 msgs added", 0, getMessagesAdded(q1));
+
+ }
+
+ @Test
+ public void testPageCounter2() throws Throwable {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ ServerLocator sl = createInVMNonHALocator();
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession sess = sf.createSession(true, true, 0);
+ sess.start();
+ ClientConsumer cons = sess.createConsumer(ADDRESS);
+ for (int i = 0; i < 100; i++) {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ msg.acknowledge();
+ }
+ sess.close();
+ sl.close();
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 20 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ t1.start();
+ t1.join();
+
+ assertEquals(0, errors.get());
+ long timeout = System.currentTimeMillis() + 10000;
+ while (numberOfMessages - 100 != getMessageCount(q1) && System.currentTimeMillis() < timeout) {
+ Thread.sleep(500);
+
+ }
+
+ assertEquals(numberOfMessages, getMessageCount(q2));
+ assertEquals(numberOfMessages, getMessagesAdded(q2));
+ assertEquals(numberOfMessages - 100, getMessageCount(q1));
+ }
+
+ @Test
+ public void testOrderOverRollback() throws Throwable {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 3000;
+
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages / 2; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.rollback();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.commit();
+ }
+
+ @Test
+ public void testOrderOverRollback2() throws Throwable {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 200;
+
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(0);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ // number of references without paging
+ int numberOfRefs = queue.getNumberOfReferences();
+
+ // consume all non-paged references
+ for (int ref = 0; ref < numberOfRefs; ref++) {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ int msgIDRolledBack = msg.getIntProperty("id").intValue();
+ msg.acknowledge();
+
+ session.rollback();
+
+ msg = consumer.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals(msgIDRolledBack, msg.getIntProperty("id").intValue());
+
+ session.rollback();
+
+ session.close();
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server.start();
+
+ locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(0);
+
+ sf = createSessionFactory(locator);
+
+ session = sf.createSession(false, false, 0);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ for (int i = msgIDRolledBack; i < numberOfMessages; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("id").intValue());
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ session.close();
+ }
+
+ @Test
+ public void testPagingOverCreatedDestinationTopics() throws Exception {
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+ JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+ InVMNamingContext context = new InVMNamingContext();
+ jmsServer.setRegistry(new JndiBindingRegistry(context));
+ jmsServer.start();
+
+ jmsServer.createTopic(true, "tt", "/topic/TT");
+
+ server.getActiveMQServerControl().addAddressSettings("jms.topic.TT", "DLQ", "DLQ", -1, false, 5, 1024 * 1024, 1024 * 10, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
+
+ ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ Connection conn = cf.createConnection();
+ conn.setClientID("tst");
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic) context.lookup("/topic/TT");
+ sess.createDurableSubscriber(topic, "t1");
+
+ MessageProducer prod = sess.createProducer(topic);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage txt = sess.createTextMessage("TST");
+ prod.send(txt);
+
+ PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.topic.TT"));
+
+ assertEquals(1024 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+
+ jmsServer.stop();
+
+ server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+ jmsServer = new JMSServerManagerImpl(server);
+ context = new InVMNamingContext();
+ jmsServer.setRegistry(new JndiBindingRegistry(context));
+ jmsServer.start();
+
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.topic.TT");
+
+ assertEquals(1024 * 1024, settings.getMaxSizeBytes());
+ assertEquals(10 * 1024, settings.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+ store = server.getPagingManager().getPageStore(new SimpleString("TT"));
+
+ conn.close();
+
+ server.stop();
+
+ }
+
+ @Test
+ public void testPagingOverCreatedDestinationQueues() throws Exception {
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, -1, -1, new HashMap<String, AddressSettings>());
+ server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+
+ JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+ InVMNamingContext context = new InVMNamingContext();
+ jmsServer.setRegistry(new JndiBindingRegistry(context));
+ jmsServer.start();
+
+ server.getActiveMQServerControl().addAddressSettings("jms.queue.Q1", "DLQ", "DLQ", -1, false, 5, 100 * 1024, 10 * 1024, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
+
+ jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");
+
+ ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ conn = cf.createConnection();
+ conn.setClientID("tst");
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = (javax.jms.Queue) context.lookup("/queue/Q1");
+
+ MessageProducer prod = sess.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ BytesMessage bmt = sess.createBytesMessage();
+
+ bmt.writeBytes(new byte[1024]);
+
+ for (int i = 0; i < 500; i++) {
+ prod.send(bmt);
+ }
+
+ PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+
+ assertEquals(100 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+
+ jmsServer.stop();
+
+ server = createServer(true, config, -1, -1, new HashMap<String, AddressSettings>());
+ server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+
+ jmsServer = new JMSServerManagerImpl(server);
+ context = new InVMNamingContext();
+ jmsServer.setRegistry(new JndiBindingRegistry(context));
+ jmsServer.start();
+
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.queue.Q1");
+
+ assertEquals(100 * 1024, settings.getMaxSizeBytes());
+ assertEquals(10 * 1024, settings.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+ store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+ assertEquals(100 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java
new file mode 100644
index 0000000..bb94634
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Test;
+
+/**
+ * A PagingOrderTest.
+ * <br>
+ * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
+ */
+public class PagingSyncTest extends ActiveMQTestBase {
+
+ private static final int PAGE_MAX = 100 * 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ @Test
+ public void testOrder1() throws Throwable {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false).setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}