You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/06 14:08:08 UTC
[6/7] activemq-artemis git commit: ARTEMIS-581 Implement max disk
usage, and global-max-size
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
new file mode 100644
index 0000000..7353b47
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.files;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FileMoveManagerTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder;
+
+ @Rule
+ public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
+
+ private File dataLocation;
+ private FileMoveManager manager;
+
+
+ @Before
+ public void setUp() {
+ dataLocation = new File(temporaryFolder.getRoot(), "data");
+ dataLocation.mkdirs();
+ manager = new FileMoveManager(dataLocation, 10);
+ }
+
+
+ public FileMoveManagerTest() {
+ File parent = new File("./target/tmp");
+ parent.mkdirs();
+ temporaryFolder = new TemporaryFolder(parent);
+ }
+
+ @Test
+ public void testBackupFiles() {
+ int[] originalFiles = new int[12];
+ int count = 0;
+
+ // It will fake folders creation
+ for (int i = 0; i < 12; i++) {
+ originalFiles[count++] = i;
+ File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
+ bkp.mkdirs();
+ }
+
+ Assert.assertEquals(12, manager.getFolders().length);
+ Assert.assertEquals(12, manager.getNumberOfFolders());
+
+
+ assertIDs(originalFiles, manager.getIDlist());
+ }
+
+ @Test
+ public void testMinMax() {
+ int[] originalFiles = new int[12];
+ int count = 0;
+
+ // It will fake folders creation
+ for (int i = 0; i < 5; i++) {
+ originalFiles[count++] = i;
+ File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
+ bkp.mkdirs();
+ }
+
+ // simulates a hole where someone removed a folder by hand
+
+ // It will fake folders creation
+ for (int i = 7; i < 14; i++) {
+ originalFiles[count++] = i;
+ File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
+ bkp.mkdirs();
+ }
+
+ Assert.assertEquals(12, manager.getFolders().length);
+ Assert.assertEquals(12, manager.getNumberOfFolders());
+
+ int[] ids = manager.getIDlist();
+
+ assertIDs(originalFiles, ids);
+
+ Assert.assertEquals(0, manager.getMinID());
+ Assert.assertEquals(13, manager.getMaxID());
+
+ manager.setMaxFolders(3).checkOldFolders();
+
+ Assert.assertEquals(3, manager.getNumberOfFolders());
+ Assert.assertEquals(13, manager.getMaxID());
+ Assert.assertEquals(11, manager.getMinID());
+
+ }
+
+ @Test
+ public void testGarbageCreated() {
+ // I'm pretending an admin created a folder here
+ File garbage = new File(dataLocation, "bkp.zzz");
+ garbage.mkdirs();
+
+ testMinMax();
+
+ resetTmp();
+ // the admin renamed a folder maybe
+ garbage = new File(dataLocation, "bkp.001.old");
+ garbage.mkdirs();
+
+ resetTmp();
+
+ // the admin renamed a folder maybe
+ garbage = new File(dataLocation, "bkp.1.5");
+ garbage.mkdirs();
+
+ testMinMax();
+ }
+
+
+ @Test
+ public void testNoFolders() {
+ Assert.assertEquals(0, manager.getFolders().length);
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+
+ Assert.assertTrue(dataLocation.delete());
+
+ Assert.assertEquals(0, manager.getFolders().length);
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+ }
+
+
+ @Test
+ public void testNoFiles() throws Exception {
+ // nothing to be moved, so why to do a backup
+ manager.doMove();
+
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+ }
+
+ @Test
+ public void testMoveFiles() throws Exception {
+ manager.setMaxFolders(3);
+
+ for (int bkp = 1; bkp <= 10; bkp++) {
+ for (int i = 0; i < 100; i++) {
+ createFile(dataLocation, i);
+ }
+
+ manager.doMove();
+
+ // We will always have maximum of 3 folders
+ Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
+
+ File bkpFolder = manager.getFolder(bkp);
+
+ FileMoveManager bkp1Manager = new FileMoveManager(bkpFolder, 10);
+ String[] filesAfterMove = bkp1Manager.getFiles();
+
+ for (String file : filesAfterMove) {
+ checkFile(bkpFolder, file);
+ }
+ }
+
+ Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
+
+ manager.setMaxFolders(0).checkOldFolders();
+
+ Assert.assertEquals(3, manager.getNumberOfFolders());
+
+ manager.setMaxFolders(1).checkOldFolders();
+ Assert.assertEquals(1, manager.getNumberOfFolders());
+
+
+ Assert.assertEquals(10, manager.getMaxID());
+ Assert.assertEquals(10, manager.getMinID());
+ }
+
+
+ @Test
+ public void testMoveFolders() throws Exception {
+ manager.setMaxFolders(3);
+
+ int NUMBER_OF_FOLDERS = 10;
+ int FILES_PER_FOLDER = 10;
+
+ for (int bkp = 1; bkp <= 10; bkp++) {
+ for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
+ File folderF = new File(dataLocation, "folder" + f);
+ folderF.mkdirs();
+
+ // FILES_PER_FOLDER + f, I'm just creating more files as f grows.
+ // this is just to make each folder unique somehow
+ for (int i = 0; i < FILES_PER_FOLDER + f; i++) {
+ createFile(folderF, i);
+ }
+ }
+
+ manager.doMove();
+
+ // We will always have maximum of 3 folders
+ Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
+
+ File bkpFolder = manager.getFolder(bkp);
+
+ for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
+ File fileTmp = new File(bkpFolder, "folder" + f);
+
+ String[] filesOnFolder = fileTmp.list();
+
+ Assert.assertEquals(FILES_PER_FOLDER + f, filesOnFolder.length);
+
+ for (String file : filesOnFolder) {
+ checkFile(fileTmp, file);
+ }
+ }
+
+ }
+
+ Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
+
+ manager.setMaxFolders(0).checkOldFolders();
+
+ Assert.assertEquals(3, manager.getNumberOfFolders());
+
+ manager.setMaxFolders(1).checkOldFolders();
+ Assert.assertEquals(1, manager.getNumberOfFolders());
+
+
+ Assert.assertEquals(10, manager.getMaxID());
+ Assert.assertEquals(10, manager.getMinID());
+ }
+
+ @Test
+ public void testMoveOverPaging() throws Exception {
+ AssertionLoggerHandler.startCapture();
+
+ ExecutorService threadPool = Executors.newCachedThreadPool();
+ try {
+ manager.setMaxFolders(3);
+ for (int i = 1; i <= 10; i++) {
+ HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<>();
+ AddressSettings settings = new AddressSettings();
+ settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setDefault(settings);
+
+ final StorageManager storageManager = new NullStorageManager();
+
+ PagingStoreFactoryNIO storeFactory =
+ new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null,
+ new OrderedExecutorFactory(threadPool), true, null);
+
+ PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1);
+
+ managerImpl.start();
+
+ PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
+
+ store.startPaging();
+
+ store.stop();
+
+ managerImpl.stop();
+
+ manager.doMove();
+
+ Assert.assertEquals(Math.min(i, manager.getMaxFolders()), manager.getNumberOfFolders());
+ }
+
+ Assert.assertFalse("The loggers are complaining about address.txt", AssertionLoggerHandler.findText("address.txt"));
+ }
+ finally {
+ AssertionLoggerHandler.stopCapture();
+ threadPool.shutdown();
+ }
+
+
+ }
+
+
+ private void assertIDs(int[] originalFiles, int[] ids) {
+ Assert.assertEquals(originalFiles.length, ids.length);
+ for (int i = 0; i < ids.length; i++) {
+ Assert.assertEquals(originalFiles[i], ids[i]);
+ }
+ }
+
+ private void resetTmp() {
+ temporaryFolder.delete();
+ temporaryFolder.getRoot().mkdirs();
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+ }
+
+ private void createFile(File folder, int i) throws FileNotFoundException {
+ File dataFile = new File(folder, i + ".jrn");
+ PrintWriter outData = new PrintWriter(new FileOutputStream(dataFile));
+ outData.print(i);
+ outData.close();
+ }
+
+ private void checkFile(File bkpFolder, String file) throws IOException {
+ File fileRead = new File(bkpFolder, file);
+ InputStreamReader stream = new InputStreamReader(new FileInputStream(fileRead));
+ BufferedReader reader = new BufferedReader(stream);
+ String valueRead = reader.readLine();
+ int id = Integer.parseInt(file.substring(0, file.indexOf('.')));
+ Assert.assertEquals("content of the file wasn't the expected", id, Integer.parseInt(valueRead));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
new file mode 100644
index 0000000..9a47d05
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.files;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.FileStore;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStoreMonitorTest extends ActiveMQTestBase {
+
+ private ScheduledExecutorService scheduledExecutorService;
+
+ @Before
+ public void startScheduled() {
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
+ }
+
+ @After
+ public void stopScheduled() {
+ scheduledExecutorService.shutdown();
+ scheduledExecutorService = null;
+ }
+
+ @Test
+ public void testSimpleTick() throws Exception {
+ File garbageFile = new File(getTestDirfile(), "garbage.bin");
+ FileOutputStream garbage = new FileOutputStream(garbageFile);
+ BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage);
+ PrintStream out = new PrintStream(bufferedOutputStream);
+
+ // This is just to make sure there is at least something on the device.
+ // If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail.
+ for (int i = 0; i < 100; i++) {
+ out.println("Garbage " + i);
+ }
+
+ bufferedOutputStream.close();
+
+ final AtomicInteger over = new AtomicInteger(0);
+ final AtomicInteger under = new AtomicInteger(0);
+ final AtomicInteger tick = new AtomicInteger(0);
+
+ FileStoreMonitor.Callback callback = new FileStoreMonitor.Callback() {
+ @Override
+ public void tick(FileStore store, double usage) {
+ tick.incrementAndGet();
+ System.out.println("tick:: " + store + " usage::" + usage);
+ }
+
+ @Override
+ public void over(FileStore store, double usage) {
+ over.incrementAndGet();
+ System.out.println("over:: " + store + " usage::" + usage);
+ }
+
+ @Override
+ public void under(FileStore store, double usage) {
+ under.incrementAndGet();
+ System.out.println("under:: " + store + " usage::" + usage);
+ }
+ };
+
+ final AtomicBoolean fakeReturn = new AtomicBoolean(false);
+ FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 100, TimeUnit.MILLISECONDS, 0.999) {
+ @Override
+ protected double calculateUsage(FileStore store) throws IOException {
+ if (fakeReturn.get()) {
+ return 1f;
+ }
+ else {
+ return super.calculateUsage(store);
+ }
+ }
+ };
+ storeMonitor.addCallback(callback);
+ storeMonitor.addStore(getTestDirfile());
+
+ storeMonitor.tick();
+
+ Assert.assertEquals(0, over.get());
+ Assert.assertEquals(1, tick.get());
+ Assert.assertEquals(1, under.get());
+
+ fakeReturn.set(true);
+
+ storeMonitor.tick();
+
+ Assert.assertEquals(1, over.get());
+ Assert.assertEquals(2, tick.get());
+ Assert.assertEquals(1, under.get());
+ }
+
+ @Test
+ public void testScheduler() throws Exception {
+
+ FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 20, TimeUnit.MILLISECONDS, 0.9);
+
+ final ReusableLatch latch = new ReusableLatch(5);
+ storeMonitor.addStore(getTestDirfile());
+ storeMonitor.addCallback(new FileStoreMonitor.Callback() {
+ @Override
+ public void tick(FileStore store, double usage) {
+ System.out.println("TickS::" + usage);
+ latch.countDown();
+ }
+
+ @Override
+ public void over(FileStore store, double usage) {
+
+ }
+
+ @Override
+ public void under(FileStore store, double usage) {
+
+ }
+ });
+ storeMonitor.start();
+
+
+ Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
+
+ storeMonitor.stop();
+
+ latch.setCount(1);
+
+ Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
+
+// FileStoreMonitor monitor = new FileStoreMonitor()
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java
deleted file mode 100644
index 0935c38..0000000
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server.impl;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
-import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
-import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
-import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
-import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class FileMoveManagerTest {
-
- @Rule
- public TemporaryFolder temporaryFolder;
-
- @Rule
- public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
-
- private File dataLocation;
- private FileMoveManager manager;
-
-
- @Before
- public void setUp() {
- dataLocation = new File(temporaryFolder.getRoot(), "data");
- dataLocation.mkdirs();
- manager = new FileMoveManager(dataLocation, 10);
- }
-
-
- public FileMoveManagerTest() {
- File parent = new File("./target/tmp");
- parent.mkdirs();
- temporaryFolder = new TemporaryFolder(parent);
- }
-
- @Test
- public void testBackupFiles() {
- int[] originalFiles = new int[12];
- int count = 0;
-
- // It will fake folders creation
- for (int i = 0; i < 12; i++) {
- originalFiles[count++] = i;
- File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
- bkp.mkdirs();
- }
-
- Assert.assertEquals(12, manager.getFolders().length);
- Assert.assertEquals(12, manager.getNumberOfFolders());
-
-
- assertIDs(originalFiles, manager.getIDlist());
- }
-
- @Test
- public void testMinMax() {
- int[] originalFiles = new int[12];
- int count = 0;
-
- // It will fake folders creation
- for (int i = 0; i < 5; i++) {
- originalFiles[count++] = i;
- File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
- bkp.mkdirs();
- }
-
- // simulates a hole where someone removed a folder by hand
-
- // It will fake folders creation
- for (int i = 7; i < 14; i++) {
- originalFiles[count++] = i;
- File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
- bkp.mkdirs();
- }
-
- Assert.assertEquals(12, manager.getFolders().length);
- Assert.assertEquals(12, manager.getNumberOfFolders());
-
- int[] ids = manager.getIDlist();
-
- assertIDs(originalFiles, ids);
-
- Assert.assertEquals(0, manager.getMinID());
- Assert.assertEquals(13, manager.getMaxID());
-
- manager.setMaxFolders(3).checkOldFolders();
-
- Assert.assertEquals(3, manager.getNumberOfFolders());
- Assert.assertEquals(13, manager.getMaxID());
- Assert.assertEquals(11, manager.getMinID());
-
- }
-
- @Test
- public void testGarbageCreated() {
- // I'm pretending an admin created a folder here
- File garbage = new File(dataLocation, "bkp.zzz");
- garbage.mkdirs();
-
- testMinMax();
-
- resetTmp();
- // the admin renamed a folder maybe
- garbage = new File(dataLocation, "bkp.001.old");
- garbage.mkdirs();
-
- resetTmp();
-
- // the admin renamed a folder maybe
- garbage = new File(dataLocation, "bkp.1.5");
- garbage.mkdirs();
-
- testMinMax();
- }
-
-
- @Test
- public void testNoFolders() {
- Assert.assertEquals(0, manager.getFolders().length);
- Assert.assertEquals(0, manager.getNumberOfFolders());
-
- Assert.assertTrue(dataLocation.delete());
-
- Assert.assertEquals(0, manager.getFolders().length);
- Assert.assertEquals(0, manager.getNumberOfFolders());
- }
-
-
- @Test
- public void testNoFiles() throws Exception {
- // nothing to be moved, so why to do a backup
- manager.doMove();
-
- Assert.assertEquals(0, manager.getNumberOfFolders());
- }
-
- @Test
- public void testMoveFiles() throws Exception {
- manager.setMaxFolders(3);
-
- for (int bkp = 1; bkp <= 10; bkp++) {
- for (int i = 0; i < 100; i++) {
- createFile(dataLocation, i);
- }
-
- manager.doMove();
-
- // We will always have maximum of 3 folders
- Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
-
- File bkpFolder = manager.getFolder(bkp);
-
- FileMoveManager bkp1Manager = new FileMoveManager(bkpFolder, 10);
- String[] filesAfterMove = bkp1Manager.getFiles();
-
- for (String file : filesAfterMove) {
- checkFile(bkpFolder, file);
- }
- }
-
- Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
-
- manager.setMaxFolders(0).checkOldFolders();
-
- Assert.assertEquals(3, manager.getNumberOfFolders());
-
- manager.setMaxFolders(1).checkOldFolders();
- Assert.assertEquals(1, manager.getNumberOfFolders());
-
-
- Assert.assertEquals(10, manager.getMaxID());
- Assert.assertEquals(10, manager.getMinID());
- }
-
-
- @Test
- public void testMoveFolders() throws Exception {
- manager.setMaxFolders(3);
-
- int NUMBER_OF_FOLDERS = 10;
- int FILES_PER_FOLDER = 10;
-
- for (int bkp = 1; bkp <= 10; bkp++) {
- for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
- File folderF = new File(dataLocation, "folder" + f);
- folderF.mkdirs();
-
- // FILES_PER_FOLDER + f, I'm just creating more files as f grows.
- // this is just to make each folder unique somehow
- for (int i = 0; i < FILES_PER_FOLDER + f; i++) {
- createFile(folderF, i);
- }
- }
-
- manager.doMove();
-
- // We will always have maximum of 3 folders
- Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
-
- File bkpFolder = manager.getFolder(bkp);
-
- for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
- File fileTmp = new File(bkpFolder, "folder" + f);
-
- String[] filesOnFolder = fileTmp.list();
-
- Assert.assertEquals(FILES_PER_FOLDER + f, filesOnFolder.length);
-
- for (String file : filesOnFolder) {
- checkFile(fileTmp, file);
- }
- }
-
- }
-
- Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
-
- manager.setMaxFolders(0).checkOldFolders();
-
- Assert.assertEquals(3, manager.getNumberOfFolders());
-
- manager.setMaxFolders(1).checkOldFolders();
- Assert.assertEquals(1, manager.getNumberOfFolders());
-
-
- Assert.assertEquals(10, manager.getMaxID());
- Assert.assertEquals(10, manager.getMinID());
- }
-
- @Test
- public void testMoveOverPaging() throws Exception {
- AssertionLoggerHandler.startCapture();
-
- ExecutorService threadPool = Executors.newCachedThreadPool();
- try {
- manager.setMaxFolders(3);
- for (int i = 1; i <= 10; i++) {
- HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<>();
- AddressSettings settings = new AddressSettings();
- settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- addressSettings.setDefault(settings);
-
- final StorageManager storageManager = new NullStorageManager();
-
- PagingStoreFactoryNIO storeFactory =
- new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null,
- new OrderedExecutorFactory(threadPool), true, null);
-
- PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings);
-
- managerImpl.start();
-
- PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
-
- store.startPaging();
-
- store.stop();
-
- managerImpl.stop();
-
- manager.doMove();
-
- Assert.assertEquals(Math.min(i, manager.getMaxFolders()), manager.getNumberOfFolders());
- }
-
- Assert.assertFalse("The loggers are complaining about address.txt", AssertionLoggerHandler.findText("address.txt"));
- }
- finally {
- AssertionLoggerHandler.stopCapture();
- threadPool.shutdown();
- }
-
-
- }
-
-
- private void assertIDs(int[] originalFiles, int[] ids) {
- Assert.assertEquals(originalFiles.length, ids.length);
- for (int i = 0; i < ids.length; i++) {
- Assert.assertEquals(originalFiles[i], ids[i]);
- }
- }
-
- private void resetTmp() {
- temporaryFolder.delete();
- temporaryFolder.getRoot().mkdirs();
- Assert.assertEquals(0, manager.getNumberOfFolders());
- }
-
- private void createFile(File folder, int i) throws FileNotFoundException {
- File dataFile = new File(folder, i + ".jrn");
- PrintWriter outData = new PrintWriter(new FileOutputStream(dataFile));
- outData.print(i);
- outData.close();
- }
-
- private void checkFile(File bkpFolder, String file) throws IOException {
- File fileRead = new File(bkpFolder, file);
- InputStreamReader stream = new InputStreamReader(new FileInputStream(fileRead));
- BufferedReader reader = new BufferedReader(stream);
- String valueRead = reader.readLine();
- int id = Integer.parseInt(file.substring(0, file.indexOf('.')));
- Assert.assertEquals("content of the file wasn't the expected", id, Integer.parseInt(valueRead));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 9a66610..2a6837f 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -300,6 +301,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+
+ }
+
+ @Override
public void confirmPendingLargeMessage(long recordID) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 8cfc31b..b5fe7b0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1420,10 +1420,16 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
}
-
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final long pageSize,
+ final long maxAddressSize) {
+ return createServer(realFiles, configuration, pageSize, maxAddressSize, (Map<String, AddressSettings>)null);
+ }
+
+ protected ActiveMQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final long pageSize,
final long maxAddressSize,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
@@ -1458,20 +1464,20 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected final ActiveMQServer createServer(final boolean realFiles, final boolean netty) throws Exception {
- return createServer(realFiles, createDefaultConfig(netty), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
+ return createServer(realFiles, createDefaultConfig(netty), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration) {
- return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
+ return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected final ActiveMQServer createServer(final Configuration configuration) {
- return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
+ return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected ActiveMQServer createServer(final boolean realFiles, boolean isNetty, StoreConfiguration.StoreType storeType) throws Exception {
Configuration configuration = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty) : createDefaultConfig(isNetty);
- return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
+ return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
@@ -1559,7 +1565,7 @@ public abstract class ActiveMQTestBase extends Assert {
final boolean realFiles,
final Map<String, Object> params) throws Exception {
String acceptor = isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY;
- return createServer(realFiles, createDefaultConfig(index, params, acceptor), -1, -1, new HashMap<String, AddressSettings>());
+ return createServer(realFiles, createDefaultConfig(index, params, acceptor), -1, -1);
}
protected ActiveMQServer createClusteredServerWithParams(final boolean isNetty,
@@ -1568,7 +1574,7 @@ public abstract class ActiveMQTestBase extends Assert {
final int pageSize,
final int maxAddressSize,
final Map<String, Object> params) throws Exception {
- return createServer(realFiles, createDefaultConfig(index, params, (isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY)), pageSize, maxAddressSize, new HashMap<String, AddressSettings>());
+ return createServer(realFiles, createDefaultConfig(index, params, (isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY)), pageSize, maxAddressSize);
}
protected ServerLocator createFactory(final boolean isNetty) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 78d55c2..cc11029 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -53,6 +53,9 @@
<populate-validated-user>true</populate-validated-user>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
+ <global-max-size>1234567</global-max-size>
+ <max-disk-usage>37</max-disk-usage>
+ <disk-scan-period>123</disk-scan-period>
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/docs/user-manual/en/configuration-index.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 6660b5a..b9a49fb 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -41,7 +41,9 @@ Name | Description
[create-bindings-dir](persistence.md "Configuring the bindings journal") | true means that the server will create the bindings directory on start up. Default=true
[create-journal-dir](persistence.md) | true means that the journal directory will be created. Default=true
[discovery-groups](clusters.md "Clusters") | [a list of discovery-group](#discovery-group-type)
+[disk-scan-period](paging.md#max-disk-usage) | The interval where the disk is scanned for percentual usage. Default=5000 ms.
[diverts](diverts.md "Diverting and Splitting Message Flows") | [a list of diverts to use](#divert-type)
+[global-max-size](paging.md#global-max-size) | The amount in bytes before all addresses are considered full
[graceful-shutdown-enabled](graceful-shutdown.md "Graceful Server Shutdown") | true means that graceful shutdown is enabled. Default=true
[graceful-shutdown-timeout](graceful-shutdown.md "Graceful Server Shutdown") | Timeout on waitin for clients to disconnect before server shutdown. Default=-1
[grouping-handler](message-grouping.md "Message Grouping") | Message Group configuration
@@ -65,6 +67,7 @@ Name | Description
[management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications
[mask-password](configuration-index.md "Using Masked Passwords in Configuration Files") | This option controls whether passwords in server configuration need be masked. If set to "true" the passwords are masked. Default=false
[max-saved-replicated-journals-size]() | This specifies how many times a replicated backup server can restart after moving its files on start. Once there are this number of backup journal files the server will stop permanently after if fails back. Default=2
+[max-disk-usage](paging.md#max-disk-usage) | The max percentage of data we should use from disks. The System will block while the disk is full. Default=100
[memory-measure-interval](perf-tuning.md) | frequency to sample JVM memory in ms (or -1 to disable memory sampling). Default=-1
[memory-warning-threshold](perf-tuning.md) | Percentage of available memory which will trigger a warning log. Default=25
[message-counter-enabled](management.md "Configuring Message Counters") | true means that message counters are enabled. Default=false
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/docs/user-manual/en/paging.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/paging.md b/docs/user-manual/en/paging.md
index 6b5ddb5..a001a6f 100644
--- a/docs/user-manual/en/paging.md
+++ b/docs/user-manual/en/paging.md
@@ -11,8 +11,7 @@ a low memory footprint.
Apache ActiveMQ Artemis will start paging messages to disk, when the size of all
messages in memory for an address exceeds a configured maximum size.
-By default, Apache ActiveMQ Artemis does not page messages - this must be explicitly
-configured to activate it.
+The default configuration from Artemis has destinations with paging.
## Page Files
@@ -121,6 +120,12 @@ This is the list of available parameters on the address settings.
</tbody>
</table>
+## Global Max Size
+
+Beyond the max-size-bytes on the address you can also set the global-max-size on the main configuration. If you set max-size-bytes = -1 on paging the global-max-size can still be used.
+
+When you have more messages than what is configured global-max-size any new produced message will make that destination to go through its paging policy.
+
## Dropping messages
Instead of paging messages when the max size is reached, an address can
@@ -181,6 +186,12 @@ In this example all the other 9 queues will be consuming messages from
the page system. This may cause performance issues if this is an
undesirable state.
+## Max Disk Usage
+
+The System will perform scans on the disk to determine if the disk is beyond a configured limit.
+These are configured through 'max-disk-usage' in percentage. Once that limit is reached any
+message will be blocked. (unless the protocol doesn't support flow control on which case there will be an exception thrown and the connection for those clients dropped).
+
## Example
See the [examples](examples.md) chapter for an example which shows how to use paging with Apache ActiveMQ Artemis.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index bbccbf1..89432d9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -588,7 +588,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
SessionCallback callback,
OperationContext context,
boolean autoCreateQueue) throws Exception {
- return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
+ return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null, getPagingManager());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index abcf195..cd9978f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -57,7 +57,7 @@ import org.junit.Test;
public class LargeMessageTest extends LargeMessageTestBase {
- static final int RECEIVE_WAIT_TIME = 10000;
+ private static final int RECEIVE_WAIT_TIME = 10000;
private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java
deleted file mode 100644
index 8f87803..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingOrderTest.java
+++ /dev/null
@@ -1,714 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.client;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
-import org.apache.activemq.artemis.api.jms.JMSFactoryType;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.QueueImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
-import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.junit.Test;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A PagingOrderTest. PagingTest has a lot of tests already. I decided to create a newer one more
- * specialized on Ordering and counters
- */
-public class PagingOrderTest extends ActiveMQTestBase {
-
- private static final int PAGE_MAX = 100 * 1024;
-
- private static final int PAGE_SIZE = 10 * 1024;
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- private Connection conn;
-
- @Test
- public void testOrder1() throws Throwable {
- boolean persistentMessages = true;
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int messageSize = 1024;
-
- final int numberOfMessages = 500;
-
- ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
-
- ClientSessionFactory sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, false, false);
-
- server.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
- byte[] body = new byte[messageSize];
-
- ByteBuffer bb = ByteBuffer.wrap(body);
-
- for (int j = 1; j <= messageSize; j++) {
- bb.put(getSamplebyte(j));
- }
-
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage message = session.createMessage(persistentMessages);
-
- ActiveMQBuffer bodyLocal = message.getBodyBuffer();
-
- bodyLocal.writeBytes(body);
-
- message.putIntProperty(new SimpleString("id"), i);
-
- producer.send(message);
- if (i % 1000 == 0) {
- session.commit();
- }
- }
-
- session.commit();
-
- session.close();
-
- session = sf.createSession(true, true, 0);
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages / 2; i++) {
- ClientMessage message = consumer.receive(5000);
- assertNotNull(message);
- assertEquals(i, message.getIntProperty("id").intValue());
-
- if (i < 100) {
- // Do not consume the last one so we could restart
- message.acknowledge();
- }
- }
-
- session.close();
-
- sf.close();
- sf = createSessionFactory(locator);
-
- session = sf.createSession(true, true, 0);
-
- session.start();
-
- consumer = session.createConsumer(ADDRESS);
-
- for (int i = 100; i < numberOfMessages; i++) {
- ClientMessage message = consumer.receive(5000);
- assertNotNull(message);
- assertEquals(i, message.getIntProperty("id").intValue());
- message.acknowledge();
- }
-
- session.close();
- }
-
- @Test
- public void testPageCounter() throws Throwable {
- boolean persistentMessages = true;
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int messageSize = 1024;
-
- final int numberOfMessages = 500;
-
- ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
-
- ClientSessionFactory sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, false, false);
-
- Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
-
- ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
- byte[] body = new byte[messageSize];
-
- ByteBuffer bb = ByteBuffer.wrap(body);
-
- for (int j = 1; j <= messageSize; j++) {
- bb.put(getSamplebyte(j));
- }
-
- final AtomicInteger errors = new AtomicInteger(0);
-
- Thread t1 = new Thread() {
- @Override
- public void run() {
- try {
- ServerLocator sl = createInVMNonHALocator();
- ClientSessionFactory sf = sl.createSessionFactory();
- ClientSession sess = sf.createSession(true, true, 0);
- sess.start();
- ClientConsumer cons = sess.createConsumer(ADDRESS);
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage msg = cons.receive(5000);
- assertNotNull(msg);
- assertEquals(i, msg.getIntProperty("id").intValue());
- msg.acknowledge();
- }
-
- assertNull(cons.receiveImmediate());
- sess.close();
- sl.close();
- }
- catch (Throwable e) {
- e.printStackTrace();
- errors.incrementAndGet();
- }
-
- }
- };
-
- t1.start();
-
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage message = session.createMessage(persistentMessages);
-
- ActiveMQBuffer bodyLocal = message.getBodyBuffer();
-
- bodyLocal.writeBytes(body);
-
- message.putIntProperty(new SimpleString("id"), i);
-
- producer.send(message);
- if (i % 20 == 0) {
- session.commit();
- }
- }
-
- session.commit();
-
- t1.join();
-
- assertEquals(0, errors.get());
-
- assertEquals(numberOfMessages, getMessageCount(q2));
- assertEquals(numberOfMessages, getMessagesAdded(q2));
- assertEquals(0, getMessageCount(q1));
- assertEquals(numberOfMessages, getMessagesAdded(q1));
-
- session.close();
- sf.close();
- locator.close();
-
- server.stop();
-
- server.start();
-
- Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
-
- q1 = null;
- q2 = null;
-
- for (Binding bind : bindings.getBindings()) {
- if (bind instanceof LocalQueueBinding) {
- LocalQueueBinding qb = (LocalQueueBinding) bind;
- if (qb.getQueue().getName().equals(ADDRESS)) {
- q1 = qb.getQueue();
- }
-
- if (qb.getQueue().getName().equals(new SimpleString("inactive"))) {
- q2 = qb.getQueue();
- }
- }
- }
-
- assertNotNull(q1);
-
- assertNotNull(q2);
-
- assertEquals("q2 msg count", numberOfMessages, getMessageCount(q2));
- assertEquals("q2 msgs added", numberOfMessages, getMessagesAdded(q2));
- assertEquals("q1 msg count", 0, getMessageCount(q1));
- // 0, since nothing was sent to the queue after the server was restarted
- assertEquals("q1 msgs added", 0, getMessagesAdded(q1));
-
- }
-
- @Test
- public void testPageCounter2() throws Throwable {
- boolean persistentMessages = true;
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int messageSize = 1024;
-
- final int numberOfMessages = 500;
-
- ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
-
- ClientSessionFactory sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, false, false);
-
- Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
-
- ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
- byte[] body = new byte[messageSize];
-
- ByteBuffer bb = ByteBuffer.wrap(body);
-
- for (int j = 1; j <= messageSize; j++) {
- bb.put(getSamplebyte(j));
- }
-
- final AtomicInteger errors = new AtomicInteger(0);
-
- Thread t1 = new Thread() {
- @Override
- public void run() {
- try {
- ServerLocator sl = createInVMNonHALocator();
- ClientSessionFactory sf = sl.createSessionFactory();
- ClientSession sess = sf.createSession(true, true, 0);
- sess.start();
- ClientConsumer cons = sess.createConsumer(ADDRESS);
- for (int i = 0; i < 100; i++) {
- ClientMessage msg = cons.receive(5000);
- assertNotNull(msg);
- assertEquals(i, msg.getIntProperty("id").intValue());
- msg.acknowledge();
- }
- sess.close();
- sl.close();
- }
- catch (Throwable e) {
- e.printStackTrace();
- errors.incrementAndGet();
- }
-
- }
- };
-
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage message = session.createMessage(persistentMessages);
-
- ActiveMQBuffer bodyLocal = message.getBodyBuffer();
-
- bodyLocal.writeBytes(body);
-
- message.putIntProperty(new SimpleString("id"), i);
-
- producer.send(message);
- if (i % 20 == 0) {
- session.commit();
- }
- }
-
- session.commit();
-
- t1.start();
- t1.join();
-
- assertEquals(0, errors.get());
- long timeout = System.currentTimeMillis() + 10000;
- while (numberOfMessages - 100 != getMessageCount(q1) && System.currentTimeMillis() < timeout) {
- Thread.sleep(500);
-
- }
-
- assertEquals(numberOfMessages, getMessageCount(q2));
- assertEquals(numberOfMessages, getMessagesAdded(q2));
- assertEquals(numberOfMessages - 100, getMessageCount(q1));
- }
-
- @Test
- public void testOrderOverRollback() throws Throwable {
- boolean persistentMessages = true;
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int messageSize = 1024;
-
- final int numberOfMessages = 3000;
-
- ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
-
- ClientSessionFactory sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, false, false);
-
- server.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
- byte[] body = new byte[messageSize];
-
- ByteBuffer bb = ByteBuffer.wrap(body);
-
- for (int j = 1; j <= messageSize; j++) {
- bb.put(getSamplebyte(j));
- }
-
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage message = session.createMessage(persistentMessages);
-
- ActiveMQBuffer bodyLocal = message.getBodyBuffer();
-
- bodyLocal.writeBytes(body);
-
- message.putIntProperty(new SimpleString("id"), i);
-
- producer.send(message);
- if (i % 1000 == 0) {
- session.commit();
- }
- }
-
- session.commit();
-
- session.close();
-
- session = sf.createSession(false, false, 0);
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages / 2; i++) {
- ClientMessage message = consumer.receive(5000);
- assertNotNull(message);
- assertEquals(i, message.getIntProperty("id").intValue());
- message.acknowledge();
- }
-
- session.rollback();
-
- session.close();
-
- session = sf.createSession(false, false, 0);
-
- session.start();
-
- consumer = session.createConsumer(ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage message = consumer.receive(5000);
- assertNotNull(message);
- assertEquals(i, message.getIntProperty("id").intValue());
- message.acknowledge();
- }
-
- session.commit();
- }
-
- @Test
- public void testOrderOverRollback2() throws Throwable {
- boolean persistentMessages = true;
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int messageSize = 1024;
-
- final int numberOfMessages = 200;
-
- ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(0);
-
- ClientSessionFactory sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, false, false);
-
- QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
- byte[] body = new byte[messageSize];
-
- ByteBuffer bb = ByteBuffer.wrap(body);
-
- for (int j = 1; j <= messageSize; j++) {
- bb.put(getSamplebyte(j));
- }
-
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage message = session.createMessage(persistentMessages);
-
- ActiveMQBuffer bodyLocal = message.getBodyBuffer();
-
- bodyLocal.writeBytes(body);
-
- message.putIntProperty(new SimpleString("id"), i);
-
- producer.send(message);
- if (i % 1000 == 0) {
- session.commit();
- }
- }
-
- session.commit();
-
- session.close();
-
- session = sf.createSession(false, false, 0);
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- // number of references without paging
- int numberOfRefs = queue.getNumberOfReferences();
-
- // consume all non-paged references
- for (int ref = 0; ref < numberOfRefs; ref++) {
- ClientMessage msg = consumer.receive(5000);
- assertNotNull(msg);
- msg.acknowledge();
- }
-
- session.commit();
-
- session.close();
-
- session = sf.createSession(false, false, 0);
-
- session.start();
-
- consumer = session.createConsumer(ADDRESS);
-
- ClientMessage msg = consumer.receive(5000);
- assertNotNull(msg);
- int msgIDRolledBack = msg.getIntProperty("id").intValue();
- msg.acknowledge();
-
- session.rollback();
-
- msg = consumer.receive(5000);
-
- assertNotNull(msg);
-
- assertEquals(msgIDRolledBack, msg.getIntProperty("id").intValue());
-
- session.rollback();
-
- session.close();
-
- sf.close();
- locator.close();
-
- server.stop();
-
- server.start();
-
- locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(0);
-
- sf = createSessionFactory(locator);
-
- session = sf.createSession(false, false, 0);
-
- session.start();
-
- consumer = session.createConsumer(ADDRESS);
-
- for (int i = msgIDRolledBack; i < numberOfMessages; i++) {
- ClientMessage message = consumer.receive(5000);
- assertNotNull(message);
- assertEquals(i, message.getIntProperty("id").intValue());
- message.acknowledge();
- }
-
- session.commit();
-
- session.close();
- }
-
- @Test
- public void testPagingOverCreatedDestinationTopics() throws Exception {
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
-
- JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
- InVMNamingContext context = new InVMNamingContext();
- jmsServer.setRegistry(new JndiBindingRegistry(context));
- jmsServer.start();
-
- jmsServer.createTopic(true, "tt", "/topic/TT");
-
- server.getActiveMQServerControl().addAddressSettings("jms.topic.TT", "DLQ", "DLQ", -1, false, 5, 1024 * 1024, 1024 * 10, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
-
- ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-
- Connection conn = cf.createConnection();
- conn.setClientID("tst");
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = (Topic) context.lookup("/topic/TT");
- sess.createDurableSubscriber(topic, "t1");
-
- MessageProducer prod = sess.createProducer(topic);
- prod.setDeliveryMode(DeliveryMode.PERSISTENT);
- TextMessage txt = sess.createTextMessage("TST");
- prod.send(txt);
-
- PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.topic.TT"));
-
- assertEquals(1024 * 1024, store.getMaxSize());
- assertEquals(10 * 1024, store.getPageSizeBytes());
-
- jmsServer.stop();
-
- server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
-
- jmsServer = new JMSServerManagerImpl(server);
- context = new InVMNamingContext();
- jmsServer.setRegistry(new JndiBindingRegistry(context));
- jmsServer.start();
-
- AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.topic.TT");
-
- assertEquals(1024 * 1024, settings.getMaxSizeBytes());
- assertEquals(10 * 1024, settings.getPageSizeBytes());
- assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
-
- store = server.getPagingManager().getPageStore(new SimpleString("TT"));
-
- conn.close();
-
- server.stop();
-
- }
-
- @Test
- public void testPagingOverCreatedDestinationQueues() throws Exception {
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, -1, -1, new HashMap<String, AddressSettings>());
- server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-
- JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
- InVMNamingContext context = new InVMNamingContext();
- jmsServer.setRegistry(new JndiBindingRegistry(context));
- jmsServer.start();
-
- server.getActiveMQServerControl().addAddressSettings("jms.queue.Q1", "DLQ", "DLQ", -1, false, 5, 100 * 1024, 10 * 1024, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
-
- jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");
-
- ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-
- conn = cf.createConnection();
- conn.setClientID("tst");
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = (javax.jms.Queue) context.lookup("/queue/Q1");
-
- MessageProducer prod = sess.createProducer(queue);
- prod.setDeliveryMode(DeliveryMode.PERSISTENT);
- BytesMessage bmt = sess.createBytesMessage();
-
- bmt.writeBytes(new byte[1024]);
-
- for (int i = 0; i < 500; i++) {
- prod.send(bmt);
- }
-
- PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
-
- assertEquals(100 * 1024, store.getMaxSize());
- assertEquals(10 * 1024, store.getPageSizeBytes());
- assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
-
- jmsServer.stop();
-
- server = createServer(true, config, -1, -1, new HashMap<String, AddressSettings>());
- server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-
- jmsServer = new JMSServerManagerImpl(server);
- context = new InVMNamingContext();
- jmsServer.setRegistry(new JndiBindingRegistry(context));
- jmsServer.start();
-
- AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.queue.Q1");
-
- assertEquals(100 * 1024, settings.getMaxSizeBytes());
- assertEquals(10 * 1024, settings.getPageSizeBytes());
- assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
-
- store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
- assertEquals(100 * 1024, store.getMaxSize());
- assertEquals(10 * 1024, store.getPageSizeBytes());
- assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingSyncTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingSyncTest.java
deleted file mode 100644
index 635a911..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingSyncTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.client;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.junit.Test;
-
-/**
- * A PagingOrderTest.
- * <br>
- * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
- */
-public class PagingSyncTest extends ActiveMQTestBase {
-
- private static final int PAGE_MAX = 100 * 1024;
-
- private static final int PAGE_SIZE = 10 * 1024;
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- @Test
- public void testOrder1() throws Throwable {
- boolean persistentMessages = true;
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int messageSize = 1024;
-
- final int numberOfMessages = 500;
-
- ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false).setConsumerWindowSize(1024 * 1024);
-
- ClientSessionFactory sf = createSessionFactory(locator);
-
- ClientSession session = sf.createSession(false, false, false);
-
- server.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
- byte[] body = new byte[messageSize];
-
- ByteBuffer bb = ByteBuffer.wrap(body);
-
- for (int j = 1; j <= messageSize; j++) {
- bb.put(getSamplebyte(j));
- }
-
- for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage message = session.createMessage(persistentMessages);
-
- ActiveMQBuffer bodyLocal = message.getBodyBuffer();
-
- bodyLocal.writeBytes(body);
-
- message.putIntProperty(new SimpleString("id"), i);
-
- producer.send(message);
- }
-
- session.commit();
-
- session.close();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}