You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/06 14:08:06 UTC

[4/7] activemq-artemis git commit: ARTEMIS-581 Implement max disk usage, and global-max-size

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index c32ebc1..b80df4e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
+import org.apache.activemq.artemis.core.server.files.FileMoveManager;
 import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
 import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
 import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index f915a31..9e4a650 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -51,7 +51,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
 import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
 import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
 import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
-import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
+import org.apache.activemq.artemis.core.server.files.FileMoveManager;
 import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java
new file mode 100644
index 0000000..f6003c5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AddressFullLoggingTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AddressFullLoggingTest extends ActiveMQTestBase {
+
+   @BeforeClass
+   public static void prepareLogger() {
+      AssertionLoggerHandler.startCapture();
+   }
+
+   @Test
+   public void testBlockLogging() throws Exception {
+      final int MAX_MESSAGES = 200;
+      final String MY_ADDRESS = "myAddress";
+      final String MY_QUEUE = "myQueue";
+
+      ActiveMQServer server = createServer(false);
+
+      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+      server.start();
+
+      internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server);
+   }
+
+   @Test
+   public void testGlobalBlockLogging() throws Exception {
+      final int MAX_MESSAGES = 200;
+      final String MY_ADDRESS = "myAddress";
+      final String MY_QUEUE = "myQueue";
+
+      ActiveMQServer server = createServer(false);
+
+      AddressSettings defaultSetting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+      server.getConfiguration().setGlobalMaxSize(20 * 1024);
+
+      server.start();
+
+
+      internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server);
+   }
+
+   private void internalTest(int MAX_MESSAGES,
+                             String MY_ADDRESS,
+                             String MY_QUEUE,
+                             ActiveMQServer server) throws Exception {
+      ServerLocator locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue(MY_ADDRESS, MY_QUEUE, true);
+
+      final ClientProducer producer = session.createProducer(MY_ADDRESS);
+
+      final ClientMessage message = session.createMessage(false);
+      message.getBodyBuffer().writeBytes(new byte[1024]);
+
+      ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
+      Callable<Object> sendMessageTask = new Callable<Object>() {
+         @Override
+         public Object call() throws ActiveMQException {
+            producer.send(message);
+            return null;
+         }
+      };
+
+      int sendCount = 0;
+
+      for (int i = 0; i < MAX_MESSAGES; i++) {
+         Future<Object> future = executor.submit(sendMessageTask);
+         try {
+            future.get(3, TimeUnit.SECONDS);
+            sendCount++;
+         }
+         catch (TimeoutException ex) {
+            // message sending has been blocked
+            break;
+         }
+         finally {
+            future.cancel(true); // may or may not desire this
+         }
+      }
+
+      executor.shutdown();
+      session.close();
+
+      session = factory.createSession(false, true, true);
+      session.start();
+      ClientConsumer consumer = session.createConsumer(MY_QUEUE);
+      for (int i = 0; i < sendCount; i++) {
+         ClientMessage msg = consumer.receive(250);
+         if (msg == null)
+            break;
+         msg.acknowledge();
+      }
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      // Using the code only so the test doesn't fail just because someone edits the log text
+      Assert.assertTrue("Expected to find AMQ222183", AssertionLoggerHandler.findText("AMQ222183", "myAddress"));
+      Assert.assertTrue("Expected to find AMQ221046", AssertionLoggerHandler.findText("AMQ221046", "myAddress"));
+   }
+
+   @AfterClass
+   public static void clearLogger() {
+      AssertionLoggerHandler.stopCapture();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
new file mode 100644
index 0000000..ee975ca
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GlobalPagingTest extends PagingTest {
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected ActiveMQServer createServer(final boolean realFiles,
+                                               final Configuration configuration,
+                                               final long pageSize,
+                                               final long maxAddressSize,
+                                               final Map<String, AddressSettings> settings) {
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
+
+      if (settings != null) {
+         for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
+            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         }
+      }
+
+      server.getConfiguration().setGlobalMaxSize(maxAddressSize);
+      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      return server;
+   }
+
+
+
+   @Test
+   public void testPagingOverFullDisk() throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+      server.getConfiguration().setGlobalMaxSize(-1);
+
+      server.start();
+
+      ActiveMQServerImpl serverImpl = (ActiveMQServerImpl)server;
+      serverImpl.getMonitor().stop(); // stop the scheduled executor, we will do it manually only
+      serverImpl.getMonitor().tick();
+
+      final int numberOfMessages = 500;
+
+      locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+
+      final ClientSession session = sf.createSession(false, false, false);
+
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+      final ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      ClientMessage message = null;
+
+      final byte[] body = new byte[MESSAGE_SIZE];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= MESSAGE_SIZE; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+
+      Queue queue = server.locateQueue(ADDRESS);
+      queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+      sendFewMessages(numberOfMessages, session, producer, body);
+
+      serverImpl.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+
+      serverImpl.getMonitor().tick();
+
+      Thread t = new Thread() {
+         public void run() {
+            try {
+               sendFewMessages(numberOfMessages, session, producer, body);
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      t.start();
+
+      t.join(1000);
+      Assert.assertTrue(t.isAlive());
+
+      // releasing the disk
+      serverImpl.getMonitor().setMaxUsage(1).tick();
+      t.join(5000);
+      Assert.assertFalse(t.isAlive());
+
+
+      session.start();
+
+      assertEquals(numberOfMessages * 2, getMessageCount(queue));
+
+      // The consumer has to be created after the getMessageCount(queue) assertion
+      // otherwise delivery could alter the messagecount and give us a false failure
+      ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+      ClientMessage msg = null;
+
+      for (int i = 0; i < numberOfMessages * 2; i++) {
+         msg = consumer.receive(1000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         if (i % 500 == 0) {
+            session.commit();
+         }
+      }
+      session.commit();
+
+      assertEquals(0, getMessageCount(queue));
+   }
+
+   protected void sendFewMessages(int numberOfMessages,
+                                  ClientSession session,
+                                  ClientProducer producer,
+                                  byte[] body) throws ActiveMQException {
+      ClientMessage message;
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+      session.commit();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java
new file mode 100644
index 0000000..54c658f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingOrderTest.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A PagingOrderTest. PagingTest has a lot of tests already. I decided to create a newer one more
+ * specialized on Ordering and counters
+ */
+public class PagingOrderTest extends ActiveMQTestBase {
+
+   private static final int PAGE_MAX = 100 * 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   private Connection conn;
+
+   @Test
+   public void testOrder1() throws Throwable {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = session.createMessage(persistentMessages);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      session.close();
+
+      session = sf.createSession(true, true, 0);
+
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      for (int i = 0; i < numberOfMessages / 2; i++) {
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+         assertEquals(i, message.getIntProperty("id").intValue());
+
+         if (i < 100) {
+            // Do not consume the last one so we could restart
+            message.acknowledge();
+         }
+      }
+
+      session.close();
+
+      sf.close();
+      sf = createSessionFactory(locator);
+
+      session = sf.createSession(true, true, 0);
+
+      session.start();
+
+      consumer = session.createConsumer(ADDRESS);
+
+      for (int i = 100; i < numberOfMessages; i++) {
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+         assertEquals(i, message.getIntProperty("id").intValue());
+         message.acknowledge();
+      }
+
+      session.close();
+   }
+
+   @Test
+   public void testPageCounter() throws Throwable {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+      Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      Thread t1 = new Thread() {
+         @Override
+         public void run() {
+            try {
+               ServerLocator sl = createInVMNonHALocator();
+               ClientSessionFactory sf = sl.createSessionFactory();
+               ClientSession sess = sf.createSession(true, true, 0);
+               sess.start();
+               ClientConsumer cons = sess.createConsumer(ADDRESS);
+               for (int i = 0; i < numberOfMessages; i++) {
+                  ClientMessage msg = cons.receive(5000);
+                  assertNotNull(msg);
+                  assertEquals(i, msg.getIntProperty("id").intValue());
+                  msg.acknowledge();
+               }
+
+               assertNull(cons.receiveImmediate());
+               sess.close();
+               sl.close();
+            }
+            catch (Throwable e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+
+         }
+      };
+
+      t1.start();
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = session.createMessage(persistentMessages);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+         if (i % 20 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      t1.join();
+
+      assertEquals(0, errors.get());
+
+      assertEquals(numberOfMessages, getMessageCount(q2));
+      assertEquals(numberOfMessages, getMessagesAdded(q2));
+      assertEquals(0, getMessageCount(q1));
+      assertEquals(numberOfMessages, getMessagesAdded(q1));
+
+      session.close();
+      sf.close();
+      locator.close();
+
+      server.stop();
+
+      server.start();
+
+      Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
+
+      q1 = null;
+      q2 = null;
+
+      for (Binding bind : bindings.getBindings()) {
+         if (bind instanceof LocalQueueBinding) {
+            LocalQueueBinding qb = (LocalQueueBinding) bind;
+            if (qb.getQueue().getName().equals(ADDRESS)) {
+               q1 = qb.getQueue();
+            }
+
+            if (qb.getQueue().getName().equals(new SimpleString("inactive"))) {
+               q2 = qb.getQueue();
+            }
+         }
+      }
+
+      assertNotNull(q1);
+
+      assertNotNull(q2);
+
+      assertEquals("q2 msg count", numberOfMessages, getMessageCount(q2));
+      assertEquals("q2 msgs added", numberOfMessages, getMessagesAdded(q2));
+      assertEquals("q1 msg count", 0, getMessageCount(q1));
+      // 0, since nothing was sent to the queue after the server was restarted
+      assertEquals("q1 msgs added", 0, getMessagesAdded(q1));
+
+   }
+
+   @Test
+   public void testPageCounter2() throws Throwable {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+      Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      Thread t1 = new Thread() {
+         @Override
+         public void run() {
+            try {
+               ServerLocator sl = createInVMNonHALocator();
+               ClientSessionFactory sf = sl.createSessionFactory();
+               ClientSession sess = sf.createSession(true, true, 0);
+               sess.start();
+               ClientConsumer cons = sess.createConsumer(ADDRESS);
+               for (int i = 0; i < 100; i++) {
+                  ClientMessage msg = cons.receive(5000);
+                  assertNotNull(msg);
+                  assertEquals(i, msg.getIntProperty("id").intValue());
+                  msg.acknowledge();
+               }
+               sess.close();
+               sl.close();
+            }
+            catch (Throwable e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+
+         }
+      };
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = session.createMessage(persistentMessages);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+         if (i % 20 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      t1.start();
+      t1.join();
+
+      assertEquals(0, errors.get());
+      long timeout = System.currentTimeMillis() + 10000;
+      while (numberOfMessages - 100 != getMessageCount(q1) && System.currentTimeMillis() < timeout) {
+         Thread.sleep(500);
+
+      }
+
+      assertEquals(numberOfMessages, getMessageCount(q2));
+      assertEquals(numberOfMessages, getMessagesAdded(q2));
+      assertEquals(numberOfMessages - 100, getMessageCount(q1));
+   }
+
+   @Test
+   public void testOrderOverRollback() throws Throwable {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 3000;
+
+      ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(1024 * 1024);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = session.createMessage(persistentMessages);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      session.close();
+
+      session = sf.createSession(false, false, 0);
+
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      for (int i = 0; i < numberOfMessages / 2; i++) {
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+         assertEquals(i, message.getIntProperty("id").intValue());
+         message.acknowledge();
+      }
+
+      session.rollback();
+
+      session.close();
+
+      session = sf.createSession(false, false, 0);
+
+      session.start();
+
+      consumer = session.createConsumer(ADDRESS);
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+         assertEquals(i, message.getIntProperty("id").intValue());
+         message.acknowledge();
+      }
+
+      session.commit();
+   }
+
+   @Test
+   public void testOrderOverRollback2() throws Throwable {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 200;
+
+      ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(0);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      QueueImpl queue = (QueueImpl) server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = session.createMessage(persistentMessages);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      session.close();
+
+      session = sf.createSession(false, false, 0);
+
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      // number of references without paging
+      int numberOfRefs = queue.getNumberOfReferences();
+
+      // consume all non-paged references
+      for (int ref = 0; ref < numberOfRefs; ref++) {
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+
+      session.commit();
+
+      session.close();
+
+      session = sf.createSession(false, false, 0);
+
+      session.start();
+
+      consumer = session.createConsumer(ADDRESS);
+
+      ClientMessage msg = consumer.receive(5000);
+      assertNotNull(msg);
+      int msgIDRolledBack = msg.getIntProperty("id").intValue();
+      msg.acknowledge();
+
+      session.rollback();
+
+      msg = consumer.receive(5000);
+
+      assertNotNull(msg);
+
+      assertEquals(msgIDRolledBack, msg.getIntProperty("id").intValue());
+
+      session.rollback();
+
+      session.close();
+
+      sf.close();
+      locator.close();
+
+      server.stop();
+
+      server.start();
+
+      locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setConsumerWindowSize(0);
+
+      sf = createSessionFactory(locator);
+
+      session = sf.createSession(false, false, 0);
+
+      session.start();
+
+      consumer = session.createConsumer(ADDRESS);
+
+      for (int i = msgIDRolledBack; i < numberOfMessages; i++) {
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+         assertEquals(i, message.getIntProperty("id").intValue());
+         message.acknowledge();
+      }
+
+      session.commit();
+
+      session.close();
+   }
+
+   @Test
+   public void testPagingOverCreatedDestinationTopics() throws Exception {
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+      JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+      InVMNamingContext context = new InVMNamingContext();
+      jmsServer.setRegistry(new JndiBindingRegistry(context));
+      jmsServer.start();
+
+      jmsServer.createTopic(true, "tt", "/topic/TT");
+
+      server.getActiveMQServerControl().addAddressSettings("jms.topic.TT", "DLQ", "DLQ", -1, false, 5, 1024 * 1024, 1024 * 10, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
+
+      ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      Connection conn = cf.createConnection();
+      conn.setClientID("tst");
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic topic = (Topic) context.lookup("/topic/TT");
+      sess.createDurableSubscriber(topic, "t1");
+
+      MessageProducer prod = sess.createProducer(topic);
+      prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+      TextMessage txt = sess.createTextMessage("TST");
+      prod.send(txt);
+
+      PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.topic.TT"));
+
+      assertEquals(1024 * 1024, store.getMaxSize());
+      assertEquals(10 * 1024, store.getPageSizeBytes());
+
+      jmsServer.stop();
+
+      server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+      jmsServer = new JMSServerManagerImpl(server);
+      context = new InVMNamingContext();
+      jmsServer.setRegistry(new JndiBindingRegistry(context));
+      jmsServer.start();
+
+      AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.topic.TT");
+
+      assertEquals(1024 * 1024, settings.getMaxSizeBytes());
+      assertEquals(10 * 1024, settings.getPageSizeBytes());
+      assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+      store = server.getPagingManager().getPageStore(new SimpleString("TT"));
+
+      conn.close();
+
+      server.stop();
+
+   }
+
+   @Test
+   public void testPagingOverCreatedDestinationQueues() throws Exception {
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, -1, -1, new HashMap<String, AddressSettings>());
+      server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+
+      JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+      InVMNamingContext context = new InVMNamingContext();
+      jmsServer.setRegistry(new JndiBindingRegistry(context));
+      jmsServer.start();
+
+      server.getActiveMQServerControl().addAddressSettings("jms.queue.Q1", "DLQ", "DLQ", -1, false, 5, 100 * 1024, 10 * 1024, 5, 5, 1, 1000, 0, false, "PAGE", -1, 10, "KILL", true, true, true, true);
+
+      jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");
+
+      ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      conn = cf.createConnection();
+      conn.setClientID("tst");
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = (javax.jms.Queue) context.lookup("/queue/Q1");
+
+      MessageProducer prod = sess.createProducer(queue);
+      prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+      BytesMessage bmt = sess.createBytesMessage();
+
+      bmt.writeBytes(new byte[1024]);
+
+      for (int i = 0; i < 500; i++) {
+         prod.send(bmt);
+      }
+
+      PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+
+      assertEquals(100 * 1024, store.getMaxSize());
+      assertEquals(10 * 1024, store.getPageSizeBytes());
+      assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+
+      jmsServer.stop();
+
+      server = createServer(true, config, -1, -1, new HashMap<String, AddressSettings>());
+      server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+
+      jmsServer = new JMSServerManagerImpl(server);
+      context = new InVMNamingContext();
+      jmsServer.setRegistry(new JndiBindingRegistry(context));
+      jmsServer.start();
+
+      AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.queue.Q1");
+
+      assertEquals(100 * 1024, settings.getMaxSizeBytes());
+      assertEquals(10 * 1024, settings.getPageSizeBytes());
+      assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+      store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+      assertEquals(100 * 1024, store.getMaxSize());
+      assertEquals(10 * 1024, store.getPageSizeBytes());
+      assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4472aa0e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java
new file mode 100644
index 0000000..bb94634
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSyncTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Test;
+
+/**
+ * A PagingOrderTest.
+ * <br>
+ * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
+ */
+public class PagingSyncTest extends ActiveMQTestBase {
+
+   private static final int PAGE_MAX = 100 * 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   @Test
+   public void testOrder1() throws Throwable {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(1000).setConnectionTTL(2000).setReconnectAttempts(0).setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false).setConsumerWindowSize(1024 * 1024);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      byte[] body = new byte[messageSize];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= messageSize; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = session.createMessage(persistentMessages);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+      }
+
+      session.commit();
+
+      session.close();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}