You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/08/10 23:29:36 UTC

[activemq-artemis] branch main updated (12b81e7a25 -> 252e5b0b14)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


    from 12b81e7a25 ARTEMIS-3928 Adding SoakTest for Paging with in many destinations
     new 568eb70fcd ARTEMIS-3929 Improving OpenWire clientIDSet
     new 252e5b0b14 ARTEMIS-3928 Limiting parallel IO in paging which would allow multiple destinations running all at once

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../artemis/cli/commands/tools/DBOption.java       |   4 +-
 .../artemis/cli/commands/tools/PrintData.java      |   2 +-
 .../artemis/utils/actors/ArtemisExecutor.java      |  18 ++++
 .../artemis/utils/actors/OrderedExecutor.java      |  17 ++++
 .../utils/actors/OrderedExecutorFactory.java       |  13 ++-
 .../artemis/utils/actors/ProcessorBase.java        |  36 ++++---
 .../utils/actors/OrderedExecutorSanityTest.java    |  63 +++++++++++-
 .../protocol/openwire/OpenWireProtocolManager.java |  68 ++++++-------
 .../deployers/impl/FileConfigurationParser.java    |   4 +
 .../paging/cursor/impl/PageCursorProviderImpl.java |  19 +---
 .../paging/impl/PagingStoreFactoryDatabase.java    |  24 ++---
 .../core/paging/impl/PagingStoreFactoryNIO.java    |  22 ++---
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  14 ---
 .../artemis/core/persistence/StorageManager.java   |  25 -----
 .../journal/AbstractJournalStorageManager.java     |  26 -----
 .../impl/journal/JournalStorageManager.java        |   8 --
 .../impl/nullpm/NullStorageManager.java            |  14 ---
 .../artemis/core/server/ServiceRegistry.java       |   6 ++
 .../core/server/impl/ActiveMQServerImpl.java       |  48 ++++++++-
 .../artemis/core/server/impl/QueueImpl.java        |  17 +++-
 .../core/server/impl/ServiceRegistryImpl.java      |  12 +++
 .../core/settings/impl/AddressSettings.java        |  33 ++++++-
 .../resources/schema/artemis-configuration.xsd     |   8 ++
 .../config/impl/FileConfigurationParserTest.java   |  24 +++++
 .../impl/journal/JournalStorageManagerTest.java    |  14 ---
 .../core/server/files/FileMoveManagerTest.java     |   2 +-
 .../core/transaction/impl/TransactionImplTest.java |  16 ---
 .../transport/amqp/client/AmqpReceiver.java        |   4 +
 .../amqp/paging/AmqpMaxReadPagingTest.java         | 110 +++++++++++++++++++++
 .../integration/amqp/paging/AmqpPagingTest.java    |  11 +--
 .../tests/integration/client/SendAckFailTest.java  |  15 ---
 .../failover/LiveCrashOnBackupSyncTest.java        |   8 +-
 .../tests/integration/paging/PagingTest.java       |  26 +++--
 .../integration/replication/ReplicationTest.java   |   2 +-
 .../SharedNothingReplicationFlowControlTest.java   |   2 +-
 .../resources/servers/horizontalPaging/broker.xml  |   1 +
 .../core/paging/impl/PagingManagerImplTest.java    |   2 +-
 .../unit/core/paging/impl/PagingStoreImplTest.java |  40 ++++----
 38 files changed, 485 insertions(+), 293 deletions(-)
 create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpMaxReadPagingTest.java


[activemq-artemis] 02/02: ARTEMIS-3928 Limiting parallel IO in paging which would allow multiple destinations running all at once

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 252e5b0b14231d80de6829e4cc5d5fc1a2df1ac1
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Aug 8 10:11:25 2022 -0400

    ARTEMIS-3928 Limiting parallel IO in paging which would allow multiple destinations running all at once
    
    Running HorizontalPagingTest with these variables would make the test to fail unless these changes are applied.
    
    export TEST_HORIZONTAL_SERVER_START_TIMEOUT=300000
    export TEST_HORIZONTAL_TIMEOUT_MINUTES=120
    export TEST_HORIZONTAL_PROTOCOL_LIST=OPENWIRE
    
    export TEST_HORIZONTAL_OPENWIRE_DESTINATIONS=200
    export TEST_HORIZONTAL_OPENWIRE_MESSAGES=1000
    export TEST_HORIZONTAL_OPENWIRE_COMMIT_INTERVAL=100
    export TEST_HORIZONTAL_OPENWIRE_RECEIVE_COMMIT_INTERVAL=0
    export TEST_HORIZONTAL_OPENWIRE_MESSAGE_SIZE=20000
    export TEST_HORIZONTAL_OPENWIRE_PARALLEL_SENDS=10
---
 .../artemis/cli/commands/tools/DBOption.java       |   4 +-
 .../artemis/cli/commands/tools/PrintData.java      |   2 +-
 .../artemis/utils/actors/ArtemisExecutor.java      |  18 ++++
 .../artemis/utils/actors/OrderedExecutor.java      |  17 ++++
 .../utils/actors/OrderedExecutorFactory.java       |  13 ++-
 .../artemis/utils/actors/ProcessorBase.java        |  36 ++++---
 .../utils/actors/OrderedExecutorSanityTest.java    |  63 +++++++++++-
 .../deployers/impl/FileConfigurationParser.java    |   4 +
 .../paging/cursor/impl/PageCursorProviderImpl.java |  19 +---
 .../paging/impl/PagingStoreFactoryDatabase.java    |  24 ++---
 .../core/paging/impl/PagingStoreFactoryNIO.java    |  22 ++---
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  14 ---
 .../artemis/core/persistence/StorageManager.java   |  25 -----
 .../journal/AbstractJournalStorageManager.java     |  26 -----
 .../impl/journal/JournalStorageManager.java        |   8 --
 .../impl/nullpm/NullStorageManager.java            |  14 ---
 .../artemis/core/server/ServiceRegistry.java       |   6 ++
 .../core/server/impl/ActiveMQServerImpl.java       |  48 ++++++++-
 .../artemis/core/server/impl/QueueImpl.java        |  17 +++-
 .../core/server/impl/ServiceRegistryImpl.java      |  12 +++
 .../core/settings/impl/AddressSettings.java        |  33 ++++++-
 .../resources/schema/artemis-configuration.xsd     |   8 ++
 .../config/impl/FileConfigurationParserTest.java   |  24 +++++
 .../impl/journal/JournalStorageManagerTest.java    |  14 ---
 .../core/server/files/FileMoveManagerTest.java     |   2 +-
 .../core/transaction/impl/TransactionImplTest.java |  16 ---
 .../transport/amqp/client/AmqpReceiver.java        |   4 +
 .../amqp/paging/AmqpMaxReadPagingTest.java         | 110 +++++++++++++++++++++
 .../integration/amqp/paging/AmqpPagingTest.java    |  11 +--
 .../tests/integration/client/SendAckFailTest.java  |  15 ---
 .../failover/LiveCrashOnBackupSyncTest.java        |   8 +-
 .../tests/integration/paging/PagingTest.java       |  26 +++--
 .../integration/replication/ReplicationTest.java   |   2 +-
 .../SharedNothingReplicationFlowControlTest.java   |   2 +-
 .../resources/servers/horizontalPaging/broker.xml  |   1 +
 .../core/paging/impl/PagingManagerImplTest.java    |   2 +-
 .../unit/core/paging/impl/PagingStoreImplTest.java |  40 ++++----
 37 files changed, 453 insertions(+), 257 deletions(-)

diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java
index 2f2d8e9140..037f7b8167 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java
@@ -239,12 +239,12 @@ public class DBOption extends OptionalLocking {
 
          PagingStoreFactory pageStoreFactory = new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) configuration.getStoreConfiguration(),
                                                                               storageManager, 1000L,
-                                                                              scheduledExecutorService, executorFactory,
+                                                                              scheduledExecutorService, executorFactory, executorFactory,
                                                                              false, null);
          pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
       } else {
          storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
-         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null);
+         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, executorFactory, true, null);
          pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
 
       }
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index e6f1303001..448c3b247c 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -238,7 +238,7 @@ public class PrintData extends DBOption {
       try {
 
          final StorageManager sm = new NullStorageManager();
-         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(sm, pageDirectory, 1000L, scheduled, execfactory, false, null);
+         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(sm, pageDirectory, 1000L, scheduled, execfactory, execfactory, false, null);
          HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<>();
          addressSettingsRepository.setDefault(new AddressSettings());
          PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index b07e02ecb3..26a4cdf918 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -80,6 +80,24 @@ public interface ArtemisExecutor extends Executor {
    default void shutdown() {
    }
 
+   /**
+    * It will give up the executor loop, giving a chance to other OrderedExecutors to run
+    */
+   default void yield() {
+   }
+
+
+
+   default boolean isFair() {
+      return false;
+   }
+
+   /** If this OrderedExecutor is fair, it will yield for another executors after each task ran */
+   default ArtemisExecutor setFair(boolean fair) {
+      return this;
+   }
+
+
 
    /**
     * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java
index 8a02497b00..754861b476 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutor.java
@@ -36,10 +36,27 @@ public class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisE
 
    private static final Logger logger = Logger.getLogger(OrderedExecutor.class);
 
+   private boolean fair;
+
+   @Override
+   public boolean isFair() {
+      return fair;
+   }
+
+   @Override
+   /** If this OrderedExecutor is fair, it will yield for another executors after each task ran */
+   public OrderedExecutor setFair(boolean fair) {
+      this.fair = fair;
+      return this;
+   }
+
    @Override
    protected final void doTask(Runnable task) {
       try {
          task.run();
+         if (fair) {
+            this.yield();
+         }
       } catch (ActiveMQInterruptedException e) {
          // This could happen during shutdowns. Nothing to be concerned about here
          logger.debug("Interrupted Thread", e);
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java
index 07e1e7bedb..6ce3cc9ccb 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorFactory.java
@@ -28,6 +28,8 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
 
    final Executor parent;
 
+   private boolean fair;
+
    public static boolean flushExecutor(Executor executor) {
       return flushExecutor(executor, 30, TimeUnit.SECONDS);
    }
@@ -58,12 +60,21 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
     */
    @Override
    public ArtemisExecutor getExecutor() {
-      return new OrderedExecutor(parent);
+      return new OrderedExecutor(parent).setFair(fair);
    }
 
    /** I couldn't figure out how to make a new method to return a generic Actor with a given type */
    public Executor getParent() {
       return parent;
    }
+
+   public boolean isFair() {
+      return fair;
+   }
+
+   public OrderedExecutorFactory setFair(boolean fair) {
+      this.fair = fair;
+      return this;
+   }
 }
 
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 67dcb5cdb3..b631f52906 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -40,15 +40,19 @@ public abstract class ProcessorBase<T> extends HandlerBase {
     * Using a method reference instead of an inner classes allows the caller to reduce the pointer chasing
     * when accessing ProcessorBase.this fields/methods.
     */
-   private final Runnable task = this::executePendingTasks;
+   private final Runnable mainTask = this::executePendingTasks;
 
    // used by stateUpdater
    @SuppressWarnings("unused")
    private volatile int state = STATE_NOT_RUNNING;
-   // Request of forced shutdown
-   private volatile boolean requestedForcedShutdown = false;
-   // Request of educated shutdown:
-   private volatile boolean requestedShutdown = false;
+
+   private enum request {
+      keepRunning,
+      shutdown,
+      yield
+   }
+
+   private volatile request loopRequest = request.keepRunning;
 
    private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
@@ -61,7 +65,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
                T task;
                //while the queue is not empty we process in order:
                //if requestedForcedShutdown==true than no new tasks will be drained from the tasks q.
-               while (!requestedForcedShutdown && (task = tasks.poll()) != null) {
+               while (loopRequest == request.keepRunning && (task = tasks.poll()) != null) {
                   doTask(task);
                }
             } finally {
@@ -79,7 +83,12 @@ public abstract class ProcessorBase<T> extends HandlerBase {
          //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
          //this check fixes the issue
       }
-      while (!tasks.isEmpty() && !requestedShutdown);
+      while (!tasks.isEmpty() && loopRequest == request.keepRunning);
+
+      if (loopRequest == request.yield) {
+         loopRequest = request.keepRunning;
+         delegate.execute(mainTask);
+      }
    }
 
    /**
@@ -90,7 +99,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    public void shutdown(long timeout, TimeUnit unit) {
-      requestedShutdown = true;
+      loopRequest = request.shutdown;
 
       if (!inHandler()) {
          // if it's in handler.. we just return
@@ -98,11 +107,14 @@ public abstract class ProcessorBase<T> extends HandlerBase {
       }
    }
 
+   public void yield() {
+      this.loopRequest = request.yield;
+   }
+
    /** It will shutdown the executor however it will not wait for finishing tasks*/
    public int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit unit) {
       //alert anyone that has been requested (at least) an immediate shutdown
-      requestedForcedShutdown = true;
-      requestedShutdown = true;
+      loopRequest = request.shutdown;
 
       if (!inHandler()) {
          // We don't have an option where we could do an immediate timeout
@@ -162,7 +174,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    }
 
    protected void task(T command) {
-      if (requestedShutdown) {
+      if (loopRequest == request.shutdown) {
          logAddOnShutdown();
          return;
       }
@@ -183,7 +195,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    private void onAddedTaskIfNotRunning(int state) {
       if (state == STATE_NOT_RUNNING) {
          //startPoller could be deleted but is maintained because is inherited
-         delegate.execute(task);
+         delegate.execute(mainTask);
       }
    }
 
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
index 86cdf81a31..a42a4fa227 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -184,4 +184,65 @@ public class OrderedExecutorSanityTest {
       }
    }
 
-}
+
+   @Test
+   public void testFair() throws InterruptedException {
+      AtomicInteger errors = new AtomicInteger(0);
+      final ExecutorService executorService = Executors.newSingleThreadExecutor();
+      try {
+         final ArtemisExecutor executor = new OrderedExecutor(executorService).setFair(true);
+         final ArtemisExecutor executor2 = new OrderedExecutor(executorService).setFair(true);
+
+         CountDownLatch latchDone1 = new CountDownLatch(1);
+         CountDownLatch latchBlock1 = new CountDownLatch(1);
+         CountDownLatch latchDone2 = new CountDownLatch(1);
+         CountDownLatch latchDone3 = new CountDownLatch(1);
+         CountDownLatch latchBlock3 = new CountDownLatch(1);
+         executor.execute(() -> {
+            try {
+               log.info("Exec 1");
+               latchDone1.countDown();
+               latchBlock1.await(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         });
+
+         Assert.assertTrue(latchDone1.await(10, TimeUnit.SECONDS));
+
+         executor.execute(() -> {
+            try {
+               // Exec 2 is supposed to yield to Exec3, so Exec3 will happen first
+               log.info("Exec 2");
+               latchDone2.countDown();
+            } catch (Exception e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         });
+
+         executor2.execute(() -> {
+            try {
+               log.info("Exec 3");
+               latchDone3.countDown();
+               latchBlock3.await(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         });
+
+         latchBlock1.countDown();
+         Assert.assertTrue(latchDone3.await(10, TimeUnit.SECONDS));
+         Assert.assertFalse(latchDone2.await(1, TimeUnit.MILLISECONDS));
+         latchBlock3.countDown();
+         Assert.assertTrue(latchDone2.await(10, TimeUnit.SECONDS));
+         Assert.assertEquals(0, errors.get());
+      } finally {
+         executorService.shutdownNow();
+      }
+   }
+
+
+}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index a8fa75d771..e5a6fd0e18 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -222,6 +222,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = "max-read-page-messages";
 
+   private static final String PAGE_FLOW_CONTROL_NAME = "page-flow-control";
+
    private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
 
    private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size";
@@ -1259,6 +1261,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child));
          } else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
             addressSettings.setMaxSizeBytes(ByteUtil.convertTextBytes(getTrimmedTextContent(child)));
+         } else if (PAGE_FLOW_CONTROL_NAME.equalsIgnoreCase(name)) {
+            addressSettings.setPageFlowControl(XMLUtil.parseBoolean(child));
          } else if (MAX_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
             addressSettings.setMaxSizeMessages(XMLUtil.parseInt(child));
          } else if (MAX_SIZE_BYTES_REJECT_THRESHOLD_NODE_NAME.equalsIgnoreCase(name)) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 0a939e8fd9..0df9b32f89 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.ArtemisCloseable;
-import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
 import org.apache.activemq.artemis.utils.collections.LinkedList;
 import org.apache.activemq.artemis.utils.collections.LongHashSet;
@@ -72,17 +71,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
 
    public PageCursorProviderImpl(final PagingStore pagingStore,
-                                 final StorageManager storageManager,
-                                 final ArtemisExecutor executor,
-                                 final int maxCacheSize) {
-      this(pagingStore, storageManager, executor, maxCacheSize, false);
-   }
-
-   public PageCursorProviderImpl(final PagingStore pagingStore,
-                                 final StorageManager storageManager,
-                                 final ArtemisExecutor executor,
-                                 final int maxCacheSize,
-                                 final boolean readWholePage) {
+                                 final StorageManager storageManager) {
       this.pagingStore = pagingStore;
       this.storageManager = storageManager;
    }
@@ -420,10 +409,6 @@ public class PageCursorProviderImpl implements PageCursorProvider {
       logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages);
       try {
          for (Page depagedPage : depagedPages) {
-            PagedMessage[] pgdMessages;
-
-            storageManager.beforePageRead();
-
             LinkedList<PagedMessage> pgdMessagesList = null;
             try {
                depagedPage.open(false);
@@ -433,8 +418,6 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                   depagedPage.close(false, false);
                } catch (Exception e) {
                }
-
-               storageManager.afterPageRead();
             }
 
             depagedPage.delete(pgdMessagesList);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 7336ac0634..d55e175e51 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -73,9 +73,9 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
 
    private ExecutorFactory executorFactory;
 
-   private JDBCSequentialFileFactory pagingFactoryFileFactory;
+   private ExecutorFactory ioExecutorFactory;
 
-   private final boolean readWholePage;
+   private JDBCSequentialFileFactory pagingFactoryFileFactory;
 
    @Override
    public ScheduledExecutorService getScheduledExecutor() {
@@ -98,28 +98,18 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
                                      final long syncTimeout,
                                      final ScheduledExecutorService scheduledExecutor,
                                      final ExecutorFactory executorFactory,
+                                     final ExecutorFactory ioExecutorFactory,
                                      final boolean syncNonTransactional,
                                      final IOCriticalErrorListener criticalErrorListener) throws Exception {
-      this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, criticalErrorListener, false);
-   }
-
-   public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
-                                     final StorageManager storageManager,
-                                     final long syncTimeout,
-                                     final ScheduledExecutorService scheduledExecutor,
-                                     final ExecutorFactory executorFactory,
-                                     final boolean syncNonTransactional,
-                                     final IOCriticalErrorListener criticalErrorListener,
-                                     final boolean readWholePage) throws Exception {
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
+      this.ioExecutorFactory = ioExecutorFactory;
       this.syncNonTransactional = syncNonTransactional;
       this.scheduledExecutor = scheduledExecutor;
       this.syncTimeout = syncTimeout;
       this.dbConf = dbConf;
       this.criticalErrorListener = criticalErrorListener;
       this.factoryToTableName = new HashMap<>();
-      this.readWholePage = readWholePage;
       start();
    }
 
@@ -157,13 +147,13 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
                                                StorageManager storageManager,
                                                AddressSettings addressSettings,
                                                ArtemisExecutor executor) {
-      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
+      return new PageCursorProviderImpl(store, storageManager);
    }
 
    @Override
    public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
 
-      return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
+      return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor().setFair(true), ioExecutorFactory.getExecutor(), syncNonTransactional);
    }
 
    @Override
@@ -243,7 +233,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
 
          AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
 
-         PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
+         PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor().setFair(true), ioExecutorFactory.getExecutor(), syncNonTransactional);
 
          storesReturn.add(store);
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 1e70fd2e6d..cdb7f76c8e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -62,6 +62,8 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
 
    private final ExecutorFactory executorFactory;
 
+   private final ExecutorFactory ioExecutorFactory;
+
    private final boolean syncNonTransactional;
 
    private PagingManager pagingManager;
@@ -74,8 +76,6 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
 
    private final IOCriticalErrorListener critialErrorListener;
 
-   private final boolean readWholePage;
-
    public File getDirectory() {
       return directory;
    }
@@ -109,27 +109,17 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
                                 final long syncTimeout,
                                 final ScheduledExecutorService scheduledExecutor,
                                 final ExecutorFactory executorFactory,
+                                final ExecutorFactory ioExecutorFactory,
                                 final boolean syncNonTransactional,
                                 final IOCriticalErrorListener critialErrorListener) {
-      this(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
-   }
-
-   public PagingStoreFactoryNIO(final StorageManager storageManager,
-                                final File directory,
-                                final long syncTimeout,
-                                final ScheduledExecutorService scheduledExecutor,
-                                final ExecutorFactory executorFactory,
-                                final boolean syncNonTransactional,
-                                final IOCriticalErrorListener critialErrorListener,
-                                final boolean readWholePage) {
       this.storageManager = storageManager;
       this.directory = directory;
       this.executorFactory = executorFactory;
+      this.ioExecutorFactory = ioExecutorFactory;
       this.syncNonTransactional = syncNonTransactional;
       this.scheduledExecutor = scheduledExecutor;
       this.syncTimeout = syncTimeout;
       this.critialErrorListener = critialErrorListener;
-      this.readWholePage = readWholePage;
    }
 
 
@@ -157,13 +147,13 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
                                                StorageManager storageManager,
                                                AddressSettings addressSettings,
                                                ArtemisExecutor executor) {
-      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
+      return new PageCursorProviderImpl(store, storageManager);
    }
 
    @Override
    public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
 
-      return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);
+      return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor().setFair(true), ioExecutorFactory.getExecutor(), syncNonTransactional);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index d60aa8cab2..6bae2a9a03 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -138,20 +138,6 @@ public class PagingStoreImpl implements PagingStore {
 
    private long rejectThreshold;
 
-   public PagingStoreImpl(final SimpleString address,
-                          final ScheduledExecutorService scheduledExecutor,
-                          final long syncTimeout,
-                          final PagingManager pagingManager,
-                          final StorageManager storageManager,
-                          final SequentialFileFactory fileFactory,
-                          final PagingStoreFactory storeFactory,
-                          final SimpleString storeName,
-                          final AddressSettings addressSettings,
-                          final ArtemisExecutor executor,
-                          final boolean syncNonTransactional) {
-      this(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, executor, syncNonTransactional);
-   }
-
    public PagingStoreImpl(final SimpleString address,
                           final ScheduledExecutorService scheduledExecutor,
                           final long syncTimeout,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 8a0cf7eb8d..259d6fdb57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -145,30 +144,6 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     */
    void waitOnOperations() throws Exception;
 
-   /**
-    * We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
-    * the system may become unresponsive if too many destinations are reading all the same time.
-    * This is called before we read, so we can limit concurrent reads
-    *
-    * @throws Exception
-    */
-   void beforePageRead() throws Exception;
-
-   /**
-    * Like {@link #beforePageRead()} but return {@code true} if acquired within {@code timeout},
-    * {@code false} otherwise.
-    */
-   boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException;
-
-   /**
-    * We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
-    * the system may become unresponsive if too many destinations are reading all the same time.
-    * This is called after we read, so we can limit concurrent reads
-    *
-    * @throws Exception
-    */
-   void afterPageRead() throws Exception;
-
    /**
     * AIO has an optimized buffer which has a method to release it
     * instead of the way NIO will release data based on GC.
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 636bef5d13..86a7a1123f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiConsumer;
@@ -166,8 +165,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
    private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
 
-   protected Semaphore pageMaxConcurrentIO;
-
    protected BatchingIDGenerator idGenerator;
 
    protected final ExecutorFactory ioExecutorFactory;
@@ -1708,29 +1705,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       }
    }
 
-   @Override
-   public void beforePageRead() throws Exception {
-      if (pageMaxConcurrentIO != null) {
-         pageMaxConcurrentIO.acquire();
-      }
-   }
-
-   @Override
-   public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
-      final Semaphore pageMaxConcurrentIO = this.pageMaxConcurrentIO;
-      if (pageMaxConcurrentIO == null) {
-         return true;
-      }
-      return pageMaxConcurrentIO.tryAcquire(timeout, unit);
-   }
-
-   @Override
-   public void afterPageRead() throws Exception {
-      if (pageMaxConcurrentIO != null) {
-         pageMaxConcurrentIO.release();
-      }
-   }
-
    @Override
    public Journal getMessageJournal() {
       return messageJournal;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 0ec47a0c15..5454774e97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -183,13 +182,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
       largeMessagesDirectory = config.getLargeMessagesDirectory();
 
       largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesLocation(), false, criticalErrorListener, 1);
-
-      // it doesn't make sense to limit paging concurrency < 0
-      if (config.getPageMaxConcurrentIO() >= 0) {
-         pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO());
-      } else {
-         pageMaxConcurrentIO = null;
-      }
    }
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 8ead36005a..0c479b12f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -669,19 +668,6 @@ public class NullStorageManager implements StorageManager {
 
    }
 
-   @Override
-   public void beforePageRead() throws Exception {
-   }
-
-   @Override
-   public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
-      return true;
-   }
-
-   @Override
-   public void afterPageRead() throws Exception {
-   }
-
    @Override
    public ByteBuffer allocateDirectBuffer(final int size) {
       return ByteBuffer.allocateDirect(size);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
index 32d2fcc0b7..c567abd068 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
@@ -33,6 +33,12 @@ import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
  */
 public interface ServiceRegistry {
 
+   ExecutorService getPageExecutorService();
+
+   /** Notice that if you want to provide your own PageExecutor, you should limit the number of threads to the number of
+    *  parallel reads you want to perform on paging */
+   void setPageExecutorService(ExecutorService executorService);
+
    ExecutorService getExecutorService();
 
    void setExecutorService(ExecutorService executorService);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 59c5b049e3..b7ca50308c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -276,6 +276,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
     */
    protected volatile ExecutorFactory ioExecutorFactory;
 
+   /**
+    * This is a thread pool for page only tasks only.
+    * This is because we have to limit parallel reads on paging.
+    */
+   protected volatile ExecutorFactory pageExecutorFactory;
+
+   protected volatile ExecutorService pageExecutorPool;
+
    private final NetworkHealthCheck networkHealthCheck = new NetworkHealthCheck(ActiveMQDefaultConfiguration.getDefaultNetworkCheckNic(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckPeriod(), ActiveMQDefaultConfiguration.getDefaultNetworkCheckTimeout());
 
    private final HierarchicalRepository<Set<Role>> securityRepository;
@@ -1356,6 +1364,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          shutdownPool(ioExecutorPool);
       }
 
+      if (pageExecutorPool != null) {
+         shutdownPool(pageExecutorPool);
+      }
+
       if (!scheduledPoolSupplied)
          scheduledPool = null;
 
@@ -2958,9 +2970,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    protected PagingStoreFactory getPagingStoreFactory() throws Exception {
       if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
          DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
-         return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, ioCriticalErrorListener, configuration.isReadWholePage());
+         return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, pageExecutorFactory, ioExecutorFactory, false, ioCriticalErrorListener);
+      } else {
+         return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, pageExecutorFactory, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener);
       }
-      return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener, configuration.isReadWholePage());
    }
 
    /**
@@ -3049,6 +3062,22 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       }
       this.executorFactory = new OrderedExecutorFactory(threadPool);
 
+      if (serviceRegistry.getIOExecutorService() != null) {
+         this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService());
+      } else {
+         ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
+            @Override
+            public ThreadFactory run() {
+               return new ActiveMQThreadFactory("ActiveMQ-IO-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
+            }
+         });
+
+         // Perhaps getPageMaxConcurrentIO should be deprecated and a new value added
+         int maxIO = configuration.getPageMaxConcurrentIO() <= 0 ? Integer.MAX_VALUE : configuration.getPageMaxConcurrentIO();
+         this.ioExecutorPool = new ActiveMQThreadPoolExecutor(0, maxIO, 60L, TimeUnit.SECONDS, tFactory);
+         this.ioExecutorFactory = new OrderedExecutorFactory(ioExecutorPool);
+      }
+
       if (serviceRegistry.getIOExecutorService() != null) {
          this.ioExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getIOExecutorService());
       } else {
@@ -3063,6 +3092,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          this.ioExecutorFactory = new OrderedExecutorFactory(ioExecutorPool);
       }
 
+      if (serviceRegistry.getPageExecutorService() != null) {
+         this.pageExecutorFactory = new OrderedExecutorFactory(serviceRegistry.getPageExecutorService()).setFair(true);
+      } else {
+         ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
+            @Override
+            public ThreadFactory run() {
+               return new ActiveMQThreadFactory("ActiveMQ-PageExecutor-server-" + this.toString(), false, ClientSessionFactoryImpl.class.getClassLoader());
+            }
+         });
+
+         int maxIO = configuration.getPageMaxConcurrentIO() <= 0 ? Integer.MAX_VALUE : configuration.getPageMaxConcurrentIO();
+         this.pageExecutorPool = new ActiveMQThreadPoolExecutor(0, maxIO, 60L, TimeUnit.SECONDS, tFactory);
+         this.pageExecutorFactory = new OrderedExecutorFactory(pageExecutorPool);
+      }
+
        /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry.  If so we use this
        * Scheduled ExecutorService otherwise we create a new one.
        */
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 767db5b1f2..61f9a4655c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -209,7 +209,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // The quantity of pagedReferences on messageReferences priority list
    private final AtomicInteger pagedReferences = new AtomicInteger(0);
 
-
    final SizeAwareMetric queueMemorySize = new SizeAwareMetric();
 
    protected final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
@@ -3171,8 +3170,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
     * @return
     */
    private boolean needsDepage() {
-      return queueMemorySize.getSize() < pageSubscription.getPagingStore().getMaxPageReadBytes() &&
-             queueMemorySize.getElements() < pageSubscription.getPagingStore().getMaxPageReadMessages();
+      AddressSettings thisSettings = this.addressSettings;
+      if (thisSettings != null && thisSettings.isPageFlowControl()) {
+         // if deliveringControl is set, we will use the deliveringMetrics to decide on depaging
+         // this is particularly needed when paging is used with a client that does not have flow control
+         // (OpenWire has it usually off by default)
+         return (queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < pageSubscription.getPagingStore().getMaxPageReadBytes() && (queueMemorySize.getElements() + deliveringMetrics.getMessageCount()) < pageSubscription.getPagingStore().getMaxPageReadMessages();
+      } else {
+         return queueMemorySize.getSize() < pageSubscription.getPagingStore().getMaxPageReadBytes() && queueMemorySize.getElements() < pageSubscription.getPagingStore().getMaxPageReadMessages();
+      }
    }
 
    private SimpleString extractGroupID(MessageReference ref) {
@@ -4429,6 +4435,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    public void decDelivering(final MessageReference reference) {
       deliveringMetrics.decrementMetrics(reference);
+      AddressSettings theSettings = this.addressSettings;
+      if (theSettings != null && theSettings.isPageFlowControl()) {
+         deliverAsync(); // we check for async delivery after acks
+                         // in case paging stopped for lack of space
+      }
    }
 
    private long getPersistentSize(final MessageReference reference) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
index 1597e08061..d71ee619d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
@@ -43,6 +43,8 @@ public class ServiceRegistryImpl implements ServiceRegistry {
 
    private ExecutorService executorService;
 
+   private ExecutorService pageExecutorService;
+
    private ExecutorService ioExecutorService;
 
    private ScheduledExecutorService scheduledExecutorService;
@@ -74,6 +76,16 @@ public class ServiceRegistryImpl implements ServiceRegistry {
       this.acceptorFactories = new ConcurrentHashMap<>();
    }
 
+   @Override
+   public ExecutorService getPageExecutorService() {
+      return pageExecutorService;
+   }
+
+   @Override
+   public void setPageExecutorService(ExecutorService executorService) {
+      this.pageExecutorService = executorService;
+   }
+
    @Override
    public ExecutorService getExecutorService() {
       return executorService;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 628c77f5c1..96463f0909 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -88,6 +88,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    public static final boolean DEFAULT_AUTO_CREATE_ADDRESSES = true;
 
+   public static final boolean DEFAULT_PAGE_FLOW_CONTROL = false;
+
    public static final boolean DEFAULT_AUTO_DELETE_ADDRESSES = true;
 
    public static final long DEFAULT_AUTO_DELETE_ADDRESSES_DELAY = 0;
@@ -147,6 +149,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Integer maxReadPageMessages = null;
 
+   private Boolean pageFlowControl = null;
+
    private Long maxSizeMessages = null;
 
    private Integer pageSizeBytes = null;
@@ -351,6 +355,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
       this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
       this.enableIngressTimestamp = other.enableIngressTimestamp;
+      this.pageFlowControl = other.pageFlowControl;
    }
 
    public AddressSettings() {
@@ -446,6 +451,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public boolean isPageFlowControl() {
+      return pageFlowControl != null ? pageFlowControl : AddressSettings.DEFAULT_PAGE_FLOW_CONTROL;
+   }
+
+   public AddressSettings setPageFlowControl(Boolean pageFlowControl) {
+      this.pageFlowControl = pageFlowControl;
+      return this;
+   }
 
    public DeletionPolicy getConfigDeleteQueues() {
       return configDeleteQueues != null ? configDeleteQueues : AddressSettings.DEFAULT_CONFIG_DELETE_QUEUES;
@@ -1224,6 +1237,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (enableIngressTimestamp == null) {
          enableIngressTimestamp = merged.enableIngressTimestamp;
       }
+      if (pageFlowControl == null) {
+         pageFlowControl = merged.pageFlowControl;
+      }
    }
 
    @Override
@@ -1473,6 +1489,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          maxReadPageMessages = BufferHelper.readNullableInteger(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         pageFlowControl = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -1543,7 +1563,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp) +
          BufferHelper.sizeOfNullableLong(maxSizeMessages) +
          BufferHelper.sizeOfNullableInteger(maxReadPageMessages) +
-         BufferHelper.sizeOfNullableInteger(maxReadPageBytes);
+         BufferHelper.sizeOfNullableInteger(maxReadPageBytes) +
+         BufferHelper.sizeOfNullableBoolean(pageFlowControl);
    }
 
    @Override
@@ -1683,6 +1704,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       BufferHelper.writeNullableInteger(buffer, maxReadPageBytes);
 
       BufferHelper.writeNullableInteger(buffer, maxReadPageMessages);
+
+      BufferHelper.writeNullableBoolean(buffer, pageFlowControl);
    }
 
    /* (non-Javadoc)
@@ -1759,6 +1782,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
       result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode());
       result = prime * result + ((maxSizeMessages == null) ? 0 : maxSizeMessages.hashCode());
+      result = prime * result + ((pageFlowControl == null) ? 0 : pageFlowControl.hashCode());
       return result;
    }
 
@@ -2131,6 +2155,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       } else if (!maxSizeMessages.equals(other.maxSizeMessages))
          return false;
 
+      if (pageFlowControl == null) {
+         if (other.pageFlowControl != null)
+            return false;
+      } else if (!pageFlowControl.equals(other.pageFlowControl))
+         return false;
+
       return true;
    }
 
@@ -2268,6 +2298,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          enableMetrics +
          ", enableIngressTime=" +
          enableIngressTimestamp +
+         ", deliveringControl=" + pageFlowControl +
          "]";
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 0095e05ce2..82d45a6d5f 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -4006,6 +4006,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="page-flow-control" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     If this is tru the system will check for deliverin statistics before issuing more depaging. This is particularly useful when you either want to ensure enough memory to read from paging
+                     or if you are using a client that does not allow flow control.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
 
             <xsd:element name="address-full-policy" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index b867f76fbe..77f7b61325 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -299,6 +299,30 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
       assertEquals(ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(), bconfig.getProducerWindowSize());
    }
 
+   @Test
+   public void testParseAddressSettingsFlowControl() throws Exception {
+      FileConfigurationParser parser = new FileConfigurationParser();
+
+      String configStr = "<core xmlns=\"urn:activemq:core\">\n" +
+         "      <address-settings>\n" +
+         "         <!-- if you define auto-create on certain queues, management has to be auto-create -->\n" +
+         "         <address-setting match=\"hello\">\n" +
+         "                 <page-flow-control>true</page-flow-control>\n" +
+         "         </address-setting>\n" +
+         "      </address-settings>\n" +
+         "</core>";
+      ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      Configuration config = parser.parseMainConfig(input);
+
+      Map<String, AddressSettings> addressSettings = config.getAddressSettings();
+      assertEquals(1, addressSettings.size());
+
+      AddressSettings settings = addressSettings.get("hello");
+      Assert.assertNotNull(settings);
+      Assert.assertTrue(settings.isPageFlowControl());
+   }
+
    @Test
    public void testParsingOverflowPageSize() throws Exception {
       testParsingOverFlow("<address-settings>" + "\n" + "<address-setting match=\"#\">" + "\n" + "<page-size-bytes>2147483648</page-size-bytes>\n" + "</address-setting>" + "\n" + "</address-settings>" + "\n");
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
index 3f65e885dd..a0580941be 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
@@ -91,20 +91,6 @@ public class JournalStorageManagerTest extends ActiveMQTestBase {
       testExecutor.shutdownNow();
    }
 
-   @Test
-   public void testDisablePageConcurrentMax() throws Exception {
-      if (journalType == JournalType.ASYNCIO) {
-         assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
-      }
-      final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
-      configuration.setPageMaxConcurrentIO(-1);
-      final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
-      final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
-      final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory);
-      // if -1 is being set it means that we should first call afterPageRead to acuire the permit to read a page
-      Assert.assertTrue(manager.beforePageRead(0, TimeUnit.NANOSECONDS));
-   }
-
    /**
     * Test of fixJournalFileSize method, of class JournalStorageManager.
     */
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
index d7e0920870..2f337439fb 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
@@ -307,7 +307,7 @@ public class FileMoveManagerTest {
 
             final StorageManager storageManager = new NullStorageManager();
 
-            PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null);
+            PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool),  new OrderedExecutorFactory(threadPool), true, null);
 
             PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index b18387d406..d00841ce1e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -313,21 +312,6 @@ public class TransactionImplTest extends ActiveMQTestBase {
 
       }
 
-      @Override
-      public void beforePageRead() throws Exception {
-
-      }
-
-      @Override
-      public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
-         return true;
-      }
-
-      @Override
-      public void afterPageRead() throws Exception {
-
-      }
-
       @Override
       public ByteBuffer allocateDirectBuffer(int size) {
          return null;
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 2f2ad7195f..20ecba968c 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -66,6 +66,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    private final AtomicBoolean closed = new AtomicBoolean();
    private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<>();
 
+   public int getPrefetchSize() {
+      return prefetch.size();
+   }
+
    private final AmqpSession session;
    private final String address;
    private final String receiverId;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpMaxReadPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpMaxReadPagingTest.java
new file mode 100644
index 0000000000..0a12172359
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpMaxReadPagingTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.paging;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AmqpMaxReadPagingTest extends AmqpClientTestSupport {
+
+   private static final Logger logger = Logger.getLogger(AmqpMaxReadPagingTest.class);
+
+   public AmqpMaxReadPagingTest() {
+   }
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      super.addConfiguration(server);
+      final Map<String, AddressSettings> addressesSettings = server.getConfiguration()
+         .getAddressSettings();
+      addressesSettings.get("#").setMaxSizeMessages(1)
+         .setMaxSizeBytes(100000)
+         .setPageSizeBytes(10000).setMaxReadPageMessages(10).setMaxReadPageBytes(10 * 1024 * 1024).setPageFlowControl(true);
+
+      server.getConfiguration().setMessageExpiryScanPeriod(-1);
+   }
+
+   @Test(timeout = 60000)
+   public void testMaxReadPage() throws Exception {
+      final int MSG_SIZE = 1000;
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0; i < MSG_SIZE; i++) {
+         builder.append('0');
+      }
+      final String data = builder.toString();
+      final int MSG_COUNT = 100;
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      runAfter(connection::close);
+      AmqpSession session = connection.createSession();
+
+      Queue queue = server.locateQueue(getQueueName());
+      Assert.assertNotNull(queue);
+      queue.getPagingStore().startPaging();
+
+      AmqpSender sender = session.createSender(getQueueName(), true);
+
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      receiver.setPresettle(true);
+      for (int i = 0; i < MSG_COUNT; i++) {
+         AmqpMessage message = new AmqpMessage();
+         message.setText(data);
+         message.setDurable(true);
+         sender.send(message);
+      }
+      sender.close();
+      Wait.assertEquals(MSG_COUNT, queue::getMessageCount);
+      receiver.flow(MSG_COUNT);
+      Assert.assertNotNull(receiver.receive(10, TimeUnit.SECONDS)); // wait some time so we have some data
+      if (receiver.getPrefetchSize() > 10) {
+         logger.warn("Receiver has an unexpected size of " + receiver.getPrefetchSize() + " elements on the client buffer");
+      }
+      PagingStore pagingStore = server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName()));
+      Assert.assertTrue(pagingStore.isPaging());
+      Assert.assertTrue(receiver.getPrefetchSize() <= 10); // we should not have more than page-read messages
+
+      Thread.sleep(500); // it is important to have some quiet period to make sure all previous tasks were emptied
+      for (int i = 0; i < MSG_COUNT - 1; i++) {
+         AmqpMessage message = receiver.receive(10, TimeUnit.SECONDS);
+         Assert.assertNotNull(message);
+         System.out.println("Received " + i);
+         message.accept();
+      }
+
+      receiver.close();
+      connection.close();
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
index 0319d347b7..177081878a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java
@@ -43,27 +43,22 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class AmqpPagingTest extends AmqpClientTestSupport {
 
-   @Parameterized.Parameters(name = "durability={0}, readWholePage={1}")
+   @Parameterized.Parameters(name = "durability={0}")
    public static Collection getParams() {
       return Arrays.asList(new Object[][]{
-         {Boolean.TRUE, true}, {Boolean.TRUE, false},
-         {Boolean.FALSE, true}, {Boolean.FALSE, false},
-         {null, true}, {null, false}});
+         {Boolean.TRUE}, {Boolean.FALSE}});
    }
 
    private final Boolean durable;
-   private final boolean readWholePage;
 
-   public AmqpPagingTest(Boolean durable, boolean readWholePage) {
+   public AmqpPagingTest(Boolean durable) {
       this.durable = durable;
-      this.readWholePage = readWholePage;
    }
 
    @Override
    protected void addConfiguration(ActiveMQServer server) {
       super.addConfiguration(server);
       final Map<String, AddressSettings> addressesSettings = server.getConfiguration()
-         .setReadWholePage(readWholePage)
          .getAddressSettings();
       addressesSettings.get("#")
          .setMaxSizeBytes(100000)
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 5f47270949..84f91b93ab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -389,21 +389,6 @@ public class SendAckFailTest extends SpawnedTestBase {
          manager.waitOnOperations();
       }
 
-      @Override
-      public void beforePageRead() throws Exception {
-         manager.beforePageRead();
-      }
-
-      @Override
-      public boolean beforePageRead(long timeout, TimeUnit unit) throws InterruptedException {
-         return manager.beforePageRead(timeout, unit);
-      }
-
-      @Override
-      public void afterPageRead() throws Exception {
-         manager.afterPageRead();
-      }
-
       @Override
       public ByteBuffer allocateDirectBuffer(int size) {
          return manager.allocateDirectBuffer(size);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java
index 426aea6c0f..b1a001b569 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java
@@ -210,10 +210,10 @@ public class LiveCrashOnBackupSyncTest extends ActiveMQTestBase {
          ActiveMQServer liveServer = new ActiveMQServerImpl(liveConfiguration, ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration())) {
             @Override
             protected PagingStoreFactoryNIO getPagingStoreFactory() {
-               return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+               return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(),  this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
                   @Override
                   public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
-                     return new DelayPagingStoreImpl(address, this.getScheduledExecutor(), liveConfiguration.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
+                     return new DelayPagingStoreImpl(address, this.getScheduledExecutor(), liveConfiguration.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
                   }
                };
             }
@@ -252,9 +252,9 @@ class DelayPagingStoreImpl extends PagingStoreImpl {
                                PagingStoreFactory storeFactory,
                                SimpleString storeName,
                                AddressSettings addressSettings,
-                               ArtemisExecutor executor,
+                               ArtemisExecutor executor, ArtemisExecutor ioExecutor,
                                boolean syncNonTransactional) {
-      super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional);
+      super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, ioExecutor, syncNonTransactional);
    }
 
    @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 1214df64f1..36d7fc3ca2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -2853,10 +2853,8 @@ public class PagingTest extends ActiveMQTestBase {
       class InterruptedCursorProvider extends PageCursorProviderImpl {
 
          InterruptedCursorProvider(PagingStore pagingStore,
-                                   StorageManager storageManager,
-                                   ArtemisExecutor executor,
-                                   int maxCacheSize) {
-            super(pagingStore, storageManager, executor, maxCacheSize);
+                                   StorageManager storageManager) {
+            super(pagingStore, storageManager);
          }
 
          @Override
@@ -2875,13 +2873,13 @@ public class PagingTest extends ActiveMQTestBase {
       server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
          @Override
          protected PagingStoreFactoryNIO getPagingStoreFactory() {
-            return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+            return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
                @Override
                public PageCursorProvider newCursorProvider(PagingStore store,
                                                            StorageManager storageManager,
                                                            AddressSettings addressSettings,
                                                            ArtemisExecutor executor) {
-                  return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+                  return new InterruptedCursorProvider(store, storageManager);
                }
             };
          }
@@ -4539,10 +4537,8 @@ public class PagingTest extends ActiveMQTestBase {
       class InterruptedCursorProvider extends PageCursorProviderImpl {
 
          InterruptedCursorProvider(PagingStore pagingStore,
-                                   StorageManager storageManager,
-                                   ArtemisExecutor executor,
-                                   int maxCacheSize) {
-            super(pagingStore, storageManager, executor, maxCacheSize);
+                                   StorageManager storageManager) {
+            super(pagingStore, storageManager);
          }
 
          @Override
@@ -4561,13 +4557,13 @@ public class PagingTest extends ActiveMQTestBase {
       server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
          @Override
          protected PagingStoreFactoryNIO getPagingStoreFactory() {
-            return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+            return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
                @Override
                public PageCursorProvider newCursorProvider(PagingStore store,
                                                            StorageManager storageManager,
                                                            AddressSettings addressSettings,
                                                            ArtemisExecutor executor) {
-                  return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+                  return new InterruptedCursorProvider(store, storageManager);
                }
             };
          }
@@ -7327,7 +7323,7 @@ public class PagingTest extends ActiveMQTestBase {
                                      AddressSettings addressSettings,
                                      ArtemisExecutor executor,
                                      boolean syncNonTransactional) {
-            super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional);
+            super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, executor, syncNonTransactional);
          }
 
          /**
@@ -7344,7 +7340,7 @@ public class PagingTest extends ActiveMQTestBase {
          server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
             @Override
             protected PagingStoreFactoryDatabase getPagingStoreFactory() throws Exception {
-               return new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) this.getConfiguration().getStoreConfiguration(), this.getStorageManager(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+               return new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) this.getConfiguration().getStoreConfiguration(), this.getStorageManager(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
                   @Override
                   public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
                      return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.syncNonTransactional);
@@ -7356,7 +7352,7 @@ public class PagingTest extends ActiveMQTestBase {
          server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
             @Override
             protected PagingStoreFactoryNIO getPagingStoreFactory() {
-               return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+               return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
                   @Override
                   public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
                      return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 2e0aecde59..f29ad471d5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -648,7 +648,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
                                              final ExecutorFactory executorFactory,
                                              final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
 
-      PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
+      PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
 
       paging.start();
       return paging;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
index 1d923731d6..1e74207fb4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
@@ -289,7 +289,7 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
                                       ExecutorFactory executorFactory,
                                       boolean syncNonTransactional,
                                       IOCriticalErrorListener critialErrorListener) {
-         super(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener);
+         super(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, executorFactory, syncNonTransactional, critialErrorListener);
       }
 
       PageStoreFactoryTestable(PagingStoreFactoryNIO other) {
diff --git a/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml b/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml
index 92d3bfa96a..0d4b5d16c4 100644
--- a/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml
+++ b/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml
@@ -229,6 +229,7 @@ under the License.
             <page-size-bytes>5M</page-size-bytes>
             <!-- how many messages are kept in memory from paging. The system will stop reading whenever this or max-read-page-bytes hits the max first. -->
             <max-read-page-messages>50</max-read-page-messages>
+            <page-flow-control>true</page-flow-control>
             <!-- how many bytes equivalent of messages are kept in memory from paging (based on memory estimate). The system will stop reading whenever this or max-read-page-messages hits the max first. -->
             <max-read-page-bytes>500K</max-read-page-bytes>
             <message-counter-history-day-limit>10</message-counter-history-day-limit>
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
index a272247f7a..713ce21d44 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
@@ -55,7 +55,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
 
       final StorageManager storageManager = new NullStorageManager();
 
-      PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, getPageDirFile(), 100, null, getOrderedExecutor(), true, null);
+      PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, getPageDirFile(), 100, null, getOrderedExecutor(), getOrderedExecutor(), true, null);
 
       PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings);
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 3c5f2b7491..ea397b742a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -135,7 +135,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
    public void testDoubleStart() throws Exception {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
 
-      PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, new FakeStoreFactory(factory), PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+      PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, new FakeStoreFactory(factory), PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -162,7 +162,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       AddressSettings addressSettings = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
-      PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
+      PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -190,7 +190,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       storeImpl.sync();
 
-      storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
+      storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -207,7 +207,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
 
-      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -271,7 +271,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
 
-      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
       PageSubscription subscription = storeImpl.getCursorProvider().createSubscription(1, null, true);
       FakeQueue fakeQueue = new FakeQueue(destination, 1).setDurable(true).setPageSubscription(subscription);
 
@@ -389,7 +389,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
 
-      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
       PageSubscription subscription = storeImpl.getCursorProvider().createSubscription(1, null, true);
       FakeQueue fakeQueue = new FakeQueue(destination, 1).setDurable(true).setPageSubscription(subscription);
 
@@ -495,7 +495,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       StorageManager storageManager = createStorageManagerMock();
 
-      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), storageManager, factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+      PagingStoreImpl storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), storageManager, factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
       PageSubscription subscription = storeImpl.getCursorProvider().createSubscription(1, null, true);
       FakeQueue fakeQueue = new FakeQueue(destination, 1).setDurable(true).setPageSubscription(subscription);
 
@@ -540,7 +540,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
 
-      PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+      PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       store.start();
 
@@ -682,7 +682,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
-      final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true);
+      final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -815,7 +815,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
          fileTmp.close();
       }
 
-      PagingStore storeImpl2 = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true);
+      PagingStore storeImpl2 = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
       storeImpl2.start();
 
       long numberOfPages = storeImpl2.getNumberOfPages();
@@ -886,7 +886,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
-      final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true);
+      final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -919,7 +919,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
          AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
-         final PagingStore store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), false);
+         final PagingStore store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), false);
 
          store.start();
 
@@ -1016,7 +1016,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       AddressSettings settings = new AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
-      final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), true);
+      final PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, new SimpleString("test"), settings, getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -1085,7 +1085,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                                                   PagingStoreImplTest.destinationTestName,
                                                   new AddressSettings()
                                                      .setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE),
-                                                  getExecutorFactory().getExecutor(), true);
+                                                  getExecutorFactory().getExecutor(),  getExecutorFactory().getExecutor(), true);
 
       store.start();
       AssertionLoggerHandler.startCapture();
@@ -1109,7 +1109,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                                                   PagingStoreImplTest.destinationTestName,
                                                   new AddressSettings()
                                                      .setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE),
-                                                  getExecutorFactory().getExecutor(), true);
+                                                  getExecutorFactory().getExecutor(),  getExecutorFactory().getExecutor(), true);
       store.start();
       AssertionLoggerHandler.startCapture();
       try {
@@ -1132,7 +1132,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                                                   PagingStoreImplTest.destinationTestName,
                                                   new AddressSettings()
                                                      .setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
-                                                  getExecutorFactory().getExecutor(), true);
+                                                  getExecutorFactory().getExecutor(),  getExecutorFactory().getExecutor(), true);
 
       store.start();
       try {
@@ -1203,7 +1203,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                                                   PagingStoreImplTest.destinationTestName,
                                                   new AddressSettings()
                                                      .setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
-                                                  getExecutorFactory().getExecutor(), true);
+                                                  getExecutorFactory().getExecutor(),  getExecutorFactory().getExecutor(), true);
 
       store.start();
       try {
@@ -1247,7 +1247,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                                                   PagingStoreImplTest.destinationTestName,
                                                   new AddressSettings()
                                                      .setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
-                                                  getExecutorFactory().getExecutor(), true);
+                                                  getExecutorFactory().getExecutor(),  getExecutorFactory().getExecutor(), true);
 
       store.start();
       try {
@@ -1433,7 +1433,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                                                   StorageManager storageManager,
                                                   AddressSettings addressSettings,
                                                   ArtemisExecutor executor) {
-         return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+         return new PageCursorProviderImpl(store, storageManager);
       }
 
       @Override
@@ -1489,7 +1489,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                                                   mockManager, createStorageManagerMock(), factory, storeFactory,
                                                   PagingStoreImplTest.destinationTestName,
                                                   new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
-                                                  sameThreadExecutor, true);
+                                                  sameThreadExecutor, sameThreadExecutor, true);
 
       store.start();
       try {


[activemq-artemis] 01/02: ARTEMIS-3929 Improving OpenWire clientIDSet

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 568eb70fcda9737b38871a3bff1e111bad784091
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sun Aug 7 22:20:45 2022 -0400

    ARTEMIS-3929 Improving OpenWire clientIDSet
    
    I have seen this contention while I was testing ARTEMIS-3928
    
    This does change any semantics and current tests should be enough to validate this change
---
 .../protocol/openwire/OpenWireProtocolManager.java | 68 ++++++++++------------
 1 file changed, 32 insertions(+), 36 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 8d26a8f197..1d645f7861 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -106,7 +106,7 @@ public class OpenWireProtocolManager  extends AbstractProtocolManager<Command, O
 
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<>();
 
-   private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<>();
+   private final Map<String, AMQConnectionContext> clientIdSet = new ConcurrentHashMap<>();
 
    private String brokerName;
 
@@ -226,18 +226,16 @@ public class OpenWireProtocolManager  extends AbstractProtocolManager<Command, O
    }
 
    public void removeConnection(ConnectionInfo info, Throwable error) throws InvalidClientIDException {
-      synchronized (clientIdSet) {
-         String clientId = info.getClientId();
-         if (clientId != null) {
-            AMQConnectionContext context = this.clientIdSet.remove(clientId);
-            if (context != null) {
-               //connection is still there and need to close
-               context.getConnection().disconnect(error != null);
-               this.connections.remove(context.getConnection());
-            }
-         } else {
-            throw new InvalidClientIDException("No clientID specified for connection disconnect request");
+      String clientId = info.getClientId();
+      if (clientId != null) {
+         AMQConnectionContext context = this.clientIdSet.remove(clientId);
+         if (context != null) {
+            //connection is still there and need to close
+            context.getConnection().disconnect(error != null);
+            this.connections.remove(context.getConnection());
          }
+      } else {
+         throw new InvalidClientIDException("No clientID specified for connection disconnect request");
       }
    }
 
@@ -404,35 +402,33 @@ public class OpenWireProtocolManager  extends AbstractProtocolManager<Command, O
          throw new InvalidClientIDException("No clientID specified for connection request");
       }
 
-      synchronized (clientIdSet) {
-         AMQConnectionContext context;
-         context = clientIdSet.get(clientId);
-         if (context != null) {
-            if (info.isFailoverReconnect()) {
-               OpenWireConnection oldConnection = context.getConnection();
-               oldConnection.disconnect(true);
-               connections.remove(oldConnection);
-               connection.reconnect(context, info);
-            } else {
-               throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
-            }
+      AMQConnectionContext context;
+      context = clientIdSet.get(clientId);
+      if (context != null) {
+         if (info.isFailoverReconnect()) {
+            OpenWireConnection oldConnection = context.getConnection();
+            oldConnection.disconnect(true);
+            connections.remove(oldConnection);
+            connection.reconnect(context, info);
          } else {
-            //new connection
-            context = connection.initContext(info);
-            clientIdSet.put(clientId, context);
+            throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress());
          }
+      } else {
+         //new connection
+         context = connection.initContext(info);
+         clientIdSet.put(clientId, context);
+      }
 
-         connections.add(connection);
+      connections.add(connection);
 
-         ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
-         // do not distribute passwords in advisory messages. usernames okay
-         ConnectionInfo copy = info.copy();
-         copy.setPassword("");
-         fireAdvisory(context, topic, copy);
+      ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
+      // do not distribute passwords in advisory messages. usernames okay
+      ConnectionInfo copy = info.copy();
+      copy.setPassword("");
+      fireAdvisory(context, topic, copy);
 
-         // init the conn
-         context.getConnection().addSessions(context.getConnectionState().getSessionIds());
-      }
+      // init the conn
+      context.getConnection().addSessions(context.getConnectionState().getSessionIds());
    }
 
    public void fireAdvisory(AMQConnectionContext context, ActiveMQTopic topic, Command copy) throws Exception {