You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/08/19 15:44:37 UTC

[activemq-artemis] branch master updated: ARTEMIS-2399 Improve performance when there are a lot of subscribers

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 76d4205  ARTEMIS-2399 Improve performance when there are a lot of subscribers
     new db7eb87  This closes #2750
76d4205 is described below

commit 76d420590fa73aefb41713a5589dcec22588c594
Author: yang wei <wy...@gmail.com>
AuthorDate: Mon Jun 24 20:13:57 2019 +0800

    ARTEMIS-2399 Improve performance when there are a lot of subscribers
---
 .../api/config/ActiveMQDefaultConfiguration.java   |   8 ++
 artemis-distribution/src/main/assembly/dep.xml     |   1 +
 artemis-features/src/main/resources/features.xml   |   1 +
 artemis-server/pom.xml                             |   4 +
 .../artemis/core/config/Configuration.java         |  11 ++
 .../core/config/impl/ConfigurationImpl.java        |  13 ++
 .../deployers/impl/FileConfigurationParser.java    |   2 +
 .../artemis/core/paging/cursor/PageCache.java      |   4 +-
 .../artemis/core/paging/cursor/PagePosition.java   |   2 +
 .../core/paging/cursor/impl/LivePageCacheImpl.java |   5 +-
 .../core/paging/cursor/impl/PageCacheImpl.java     |   7 +-
 .../paging/cursor/impl/PageCursorProviderImpl.java |  30 +++-
 .../core/paging/cursor/impl/PagePositionImpl.java  |  20 ++-
 .../core/paging/cursor/impl/PageReader.java        | 141 ++++++++++++++++++
 .../paging/cursor/impl/PageSubscriptionImpl.java   |  92 +++++++++---
 .../activemq/artemis/core/paging/impl/Page.java    | 159 +++++++++++++++++++--
 .../paging/impl/PagingStoreFactoryDatabase.java    |  16 ++-
 .../core/paging/impl/PagingStoreFactoryNIO.java    |  16 ++-
 .../core/server/impl/ActiveMQServerImpl.java       |   4 +-
 .../artemis/core/server/impl/QueueImpl.java        |  31 ++--
 .../resources/schema/artemis-configuration.xsd     |   8 ++
 .../core/config/impl/ConfigurationImplTest.java    |   1 +
 .../core/config/impl/FileConfigurationTest.java    |   1 +
 .../cursor/impl/PageCursorProviderImplTest.java    |  37 +++++
 .../core/paging/cursor/impl/PageReaderTest.java    | 145 +++++++++++++++++++
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 .../ConfigurationTest-xinclude-config.xml          |   1 +
 .../src/test/resources/artemis-configuration.xsd   |   8 ++
 docs/user-manual/en/configuration-index.md         |   1 +
 pom.xml                                            |   7 +
 .../artemis/tests/smoke/jmx/JmxConnectionTest.java |   2 +-
 31 files changed, 721 insertions(+), 58 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 543dff0..30f4246 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -251,6 +251,9 @@ public final class ActiveMQDefaultConfiguration {
    // The max number of concurrent reads allowed on paging
    private static int DEFAULT_MAX_CONCURRENT_PAGE_IO = 5;
 
+   // If true the whole page would be read, otherwise just seek and read while getting message
+   private static boolean DEFAULT_READ_WHOLE_PAGE = false;
+
    // the directory to store the journal files in
    private static String DEFAULT_JOURNAL_DIR = "data/journal";
 
@@ -843,6 +846,11 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_MAX_CONCURRENT_PAGE_IO;
    }
 
+
+   public static boolean isDefaultReadWholePage() {
+      return DEFAULT_READ_WHOLE_PAGE;
+   }
+
    /**
     * the directory to store the journal files in
     */
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index a732656..c877861 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -82,6 +82,7 @@
             <include>org.jboss.logmanager:jboss-logmanager</include>
             <include>org.jboss.logging:jboss-logging</include>
             <include>org.jboss.slf4j:slf4j-jboss-logmanager</include>
+            <include>org.jctools:jctools-core</include>
             <include>io.netty:netty-all</include>
             <include>io.netty:netty-tcnative-boringssl-static</include>
             <include>org.apache.qpid:proton-j</include>
diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml
index 53e8dd7..eb79e75 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -64,6 +64,7 @@
 		<bundle dependency="true">mvn:org.apache.commons/commons-configuration2/${commons.config.version}</bundle>
 		<bundle dependency="true">mvn:org.apache.commons/commons-text/1.6</bundle>
 		<bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle>
+		<bundle dependency="true">mvn:org.jctools/jctools-core/${jctools.version}</bundle>
 		<!-- Micrometer can't be included until it supports OSGi. It is currently an "optional" Maven dependency. -->
 		<!--bundle dependency="true">mvn:io.micrometer/micrometer-core/${version.micrometer}</bundle-->
 
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index c8bb63b..23c44f0 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -90,6 +90,10 @@
          <scope>test</scope>
       </dependency>
       <dependency>
+         <groupId>org.jctools</groupId>
+         <artifactId>jctools-core</artifactId>
+      </dependency>
+      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-buffer</artifactId>
       </dependency>
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index c3fb985..9b1a75c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -588,6 +588,17 @@ public interface Configuration {
    Configuration setPageMaxConcurrentIO(int maxIO);
 
    /**
+    * Returns whether the whole page is read while getting message after page cache is evicted. <br>
+    * Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_READ_WHOLE_PAGE}.
+    */
+   boolean isReadWholePage();
+
+   /**
+    * Sets whether the whole page is read while getting message after page cache is evicted.
+    */
+   Configuration setReadWholePage(boolean read);
+
+   /**
     * Returns the file system directory used to store journal log. <br>
     * Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_DIR}.
     */
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index db83e5a..573ebe3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -176,6 +176,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    private int maxConcurrentPageIO = ActiveMQDefaultConfiguration.getDefaultMaxConcurrentPageIo();
 
+   private boolean readWholePage = ActiveMQDefaultConfiguration.isDefaultReadWholePage();
+
    protected String largeMessagesDirectory = ActiveMQDefaultConfiguration.getDefaultLargeMessagesDir();
 
    protected String bindingsDirectory = ActiveMQDefaultConfiguration.getDefaultBindingsDirectory();
@@ -812,6 +814,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
+   public boolean isReadWholePage() {
+      return readWholePage;
+   }
+
+   @Override
+   public ConfigurationImpl setReadWholePage(boolean read) {
+      readWholePage = read;
+      return this;
+   }
+
+   @Override
    public File getJournalLocation() {
       return subFolder(getJournalDirectory());
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 8171210..e733465 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -568,6 +568,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       config.setPageMaxConcurrentIO(getInteger(e, "page-max-concurrent-io", config.getPageMaxConcurrentIO(), Validators.MINUS_ONE_OR_GT_ZERO));
 
+      config.setReadWholePage(getBoolean(e, "read-whole-page", config.isReadWholePage()));
+
       config.setPagingDirectory(getString(e, "paging-directory", config.getPagingDirectory(), Validators.NOT_NULL_OR_EMPTY));
 
       config.setCreateJournalDir(getBoolean(e, "create-journal-dir", config.isCreateJournalDir()));
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
index 20c7888..646b568 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
@@ -36,10 +36,10 @@ public interface PageCache extends SoftValueLongObjectHashMap.ValueCache {
    boolean isLive();
 
    /**
-    * @param messageNumber The order of the message on the page
+    * @param pagePosition page position
     * @return
     */
-   PagedMessage getMessage(int messageNumber);
+   PagedMessage getMessage(PagePosition pagePosition);
 
    void close();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
index a9e0537..e794f90 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
@@ -28,6 +28,8 @@ public interface PagePosition extends Comparable<PagePosition> {
 
    int getMessageNr();
 
+   int getFileOffset();
+
    long getPersistentSize();
 
    void setPersistentSize(long persistentSize);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index 9d3fa72..6c98c12 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
 
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList;
 import org.jboss.logging.Logger;
@@ -61,8 +62,8 @@ public final class LivePageCacheImpl implements LivePageCache {
    }
 
    @Override
-   public PagedMessage getMessage(int messageNumber) {
-      return messages.get(messageNumber);
+   public PagedMessage getMessage(PagePosition pagePosition) {
+      return messages.get(pagePosition.getMessageNr());
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
index 3874e4f..a350ceb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
 
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 
 /**
  * The caching associated to a single page.
@@ -43,9 +44,9 @@ class PageCacheImpl implements PageCache {
    // Public --------------------------------------------------------
 
    @Override
-   public PagedMessage getMessage(final int messageNumber) {
-      if (messageNumber < messages.length) {
-         return messages[messageNumber];
+   public PagedMessage getMessage(PagePosition pagePosition) {
+      if (pagePosition.getMessageNr() < messages.length) {
+         return messages[pagePosition.getMessageNr()];
       } else {
          return null;
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 631de40..8f3987c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -74,6 +74,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
    private final SoftValueLongObjectHashMap<PageCache> softCache;
 
+   private LongObjectHashMap<Integer> numberOfMessages = null;
+
    private final LongObjectHashMap<CompletableFuture<PageCache>> inProgressReadPages;
 
    private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>();
@@ -90,15 +92,25 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-
    public PageCursorProviderImpl(final PagingStore pagingStore,
                                  final StorageManager storageManager,
                                  final ArtemisExecutor executor,
                                  final int maxCacheSize) {
+      this(pagingStore, storageManager, executor, maxCacheSize, false);
+   }
+
+   public PageCursorProviderImpl(final PagingStore pagingStore,
+                                 final StorageManager storageManager,
+                                 final ArtemisExecutor executor,
+                                 final int maxCacheSize,
+                                 final boolean readWholePage) {
       this.pagingStore = pagingStore;
       this.storageManager = storageManager;
       this.executor = executor;
       this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
+      if (!readWholePage) {
+         this.numberOfMessages = new LongObjectHashMap<>();
+      }
       this.inProgressReadPages = new LongObjectHashMap<>();
    }
 
@@ -133,7 +145,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache);
       }
 
-      return cache.getMessage(pos.getMessageNr());
+      return cache.getMessage(pos);
    }
 
    @Override
@@ -169,6 +181,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
             }
             inProgressReadPage = inProgressReadPages.get(pageId);
             if (inProgressReadPage == null) {
+               if (numberOfMessages != null && numberOfMessages.containsKey(pageId)) {
+                  return new PageReader(pagingStore.createPage((int) pageId), numberOfMessages.get(pageId));
+               }
                final CompletableFuture<PageCache> readPage = new CompletableFuture<>();
                cache = createPageCache(pageId);
                page = pagingStore.createPage((int) pageId);
@@ -203,6 +218,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                               CompletableFuture<PageCache> inProgressReadPage) throws Exception {
       logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
       boolean acquiredPageReadPermission = false;
+      int num = -1;
       try {
          final long startedRequest = System.nanoTime();
          while (!acquiredPageReadPermission) {
@@ -221,7 +237,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
             logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to read %d bytes", pageId,
                          pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
          }
-         cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
+         num = pgdMessages.size();
+         cache.setMessages(pgdMessages.toArray(new PagedMessage[num]));
       } catch (Throwable t) {
          inProgressReadPage.completeExceptionally(t);
          synchronized (softCache) {
@@ -243,6 +260,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
       synchronized (softCache) {
          inProgressReadPages.remove(pageId);
          softCache.put(pageId, cache);
+         if (numberOfMessages != null && num != -1) {
+            numberOfMessages.put(pageId, Integer.valueOf(num));
+         }
       }
       return cache;
    }
@@ -540,7 +560,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
             depagedPage.delete(pgdMessages);
             synchronized (softCache) {
-               softCache.remove((long) depagedPage.getPageId());
+               long pageId = (long) depagedPage.getPageId();
+               softCache.remove(pageId);
+               numberOfMessages.remove(pageId);
             }
             onDeletePage(depagedPage);
          }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
index 40890cf..50907db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
@@ -31,6 +31,8 @@ public class PagePositionImpl implements PagePosition {
     */
    private int messageNr;
 
+   private int fileOffset = -1;
+
    /**
     * ID used for storage
     */
@@ -45,11 +47,17 @@ public class PagePositionImpl implements PagePosition {
    /**
     * @param pageNr
     * @param messageNr
+    * @param fileOffset
     */
-   public PagePositionImpl(long pageNr, int messageNr) {
+   public PagePositionImpl(long pageNr, int messageNr, int fileOffset) {
       this();
       this.pageNr = pageNr;
       this.messageNr = messageNr;
+      this.fileOffset = fileOffset;
+   }
+
+   public PagePositionImpl(long pageNr, int messageNr) {
+      this(pageNr, messageNr, -1);
    }
 
    public PagePositionImpl() {
@@ -88,6 +96,11 @@ public class PagePositionImpl implements PagePosition {
       return messageNr;
    }
 
+   @Override
+   public int getFileOffset() {
+      return fileOffset;
+   }
+
    /**
     * @return the persistentSize
     */
@@ -120,7 +133,7 @@ public class PagePositionImpl implements PagePosition {
 
    @Override
    public PagePosition nextPage() {
-      return new PagePositionImpl(this.pageNr + 1, 0);
+      return new PagePositionImpl(this.pageNr + 1, 0, 0);
    }
 
    @Override
@@ -150,7 +163,8 @@ public class PagePositionImpl implements PagePosition {
 
    @Override
    public String toString() {
-      return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID + "]";
+      return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID +
+         ", fileOffset=" + fileOffset + "]";
    }
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
new file mode 100644
index 0000000..f518f75
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
+import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.jboss.logging.Logger;
+
+public class PageReader implements PageCache {
+   private static final Logger logger = Logger.getLogger(PageReader.class);
+
+   private final Page page;
+   private final int numberOfMessages;
+   private PagedMessage[] pagedMessages = null;
+
+   public PageReader(Page page, int numberOfMessages) {
+      this.page = page;
+      this.numberOfMessages = numberOfMessages;
+   }
+
+   @Override
+   public long getPageId() {
+      return page.getPageId();
+   }
+
+   @Override
+   public int getNumberOfMessages() {
+      return numberOfMessages;
+   }
+
+   @Override
+   public void setMessages(PagedMessage[] messages) {
+      this.pagedMessages = messages;
+   }
+
+   @Override
+   public synchronized PagedMessage[] getMessages() {
+      if (pagedMessages != null) {
+         return pagedMessages;
+      } else {
+         try {
+            openPage();
+            return page.read().toArray(new PagedMessage[numberOfMessages]);
+         } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+         } finally {
+            close();
+         }
+      }
+   }
+
+   @Override
+   public boolean isLive() {
+      return false;
+   }
+
+   /**
+    * @param pagePosition   page position
+    * @param throwException if {@code true} exception will be thrown when message number is beyond the page
+    * @param keepOpen       if {@code true} page file would keep open after reading message
+    * @return the paged message
+    */
+   public synchronized PagedMessage getMessage(PagePosition pagePosition, boolean throwException, boolean keepOpen) {
+      if (pagePosition.getMessageNr() >= getNumberOfMessages()) {
+         if (throwException) {
+            throw new NonExistentPage("Invalid messageNumber passed = " + pagePosition + " on " + this);
+         }
+         return null;
+      }
+
+      boolean previouslyClosed = true;
+      try {
+         previouslyClosed = openPage();
+         PagedMessage msg;
+         if (pagePosition.getFileOffset() != -1) {
+            msg = page.readMessage(pagePosition.getFileOffset(), pagePosition.getMessageNr(), pagePosition.getMessageNr());
+         } else {
+            if (logger.isTraceEnabled()) {
+               logger.trace("get message from pos " + pagePosition, new Exception("trace get message without file offset"));
+            }
+            msg = page.readMessage(0, 0, pagePosition.getMessageNr());
+         }
+         return msg;
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
+      } finally {
+         if (!keepOpen && previouslyClosed) {
+            close();
+         }
+      }
+   }
+
+   @Override
+   public synchronized PagedMessage getMessage(PagePosition pagePosition) {
+      return getMessage(pagePosition, false, false);
+   }
+
+   /**
+    * @return true if file was previously closed
+    * @throws Exception
+    */
+   boolean openPage() throws Exception {
+      if (!page.getFile().isOpen()) {
+         page.getFile().open();
+         return true;
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized void close() {
+      try {
+         page.close(false, false);
+      } catch (Exception e) {
+         logger.warn("Closing page " + page.getPageId() + " occurs exception:", e);
+      }
+   }
+
+   @Override
+   public String toString() {
+      return "PageReader::page=" + getPageId() + " numberOfMessages = " + numberOfMessages;
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 3aec7ea..7fee465 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
 import org.jboss.logging.Logger;
 
 public final class PageSubscriptionImpl implements PageSubscription {
@@ -99,6 +100,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
 
    private final AtomicLong deliveredSize = new AtomicLong(0);
 
+   // Each CursorIterator will record their current PageReader in this map
+   private final ConcurrentLongHashMap<PageReader> pageReaders = new ConcurrentLongHashMap<>();
+
    PageSubscriptionImpl(final PageCursorProvider cursorProvider,
                         final PagingStore pageStore,
                         final StorageManager store,
@@ -366,7 +370,15 @@ public final class PageSubscriptionImpl implements PageSubscription {
    }
 
    private PagedReference getReference(PagePosition pos) {
-      return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this);
+      PagedMessage pagedMessage;
+      PageReader pageReader = pageReaders.get(pos.getPageNr());
+      if (pageReader != null) {
+         pagedMessage = pageReader.getMessage(pos, true, false);
+      } else {
+         pagedMessage = cursorProvider.getMessage(pos);
+      }
+
+      return cursorProvider.newReference(pos, pagedMessage, this);
    }
 
    @Override
@@ -379,13 +391,19 @@ public final class PageSubscriptionImpl implements PageSubscription {
       return new CursorIterator(browsing);
    }
 
-   private PagedReference internalGetNext(final PagePosition pos) {
-      PagePosition retPos = pos.nextMessage();
+   private PagedReference internalGetNext(final PagePositionAndFileOffset pos) {
+      PagePosition retPos = pos.nextPagePostion();
 
       PageCache cache = null;
 
       while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
-         cache = cursorProvider.getPageCache(retPos.getPageNr());
+         PageReader pageReader = pageReaders.get(retPos.getPageNr());
+         if (pageReader == null) {
+            cache = cursorProvider.getPageCache(retPos.getPageNr());
+         } else {
+            cache = pageReader;
+         }
+
          /**
           * In following cases, we should move to the next page
           * case 1: cache == null means file might be deleted unexpectedly.
@@ -404,7 +422,17 @@ public final class PageSubscriptionImpl implements PageSubscription {
       }
 
       if (cache != null) {
-         PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+         PagedMessage serverMessage;
+         if (cache instanceof PageReader) {
+            serverMessage = ((PageReader) cache).getMessage(retPos, false, true);
+            PageCache previousPageCache = pageReaders.putIfAbsent(retPos.getPageNr(), (PageReader) cache);
+            if (previousPageCache != null && previousPageCache != cache) {
+               // Maybe other cursor iterators have added page reader, we have to close this one to avoid file leak
+               cache.close();
+            }
+         } else {
+            serverMessage = cache.getMessage(retPos);
+         }
 
          if (serverMessage != null) {
             return cursorProvider.newReference(retPos, serverMessage, this);
@@ -415,6 +443,10 @@ public final class PageSubscriptionImpl implements PageSubscription {
 
    private PagePosition moveNextPage(final PagePosition pos) {
       PagePosition retPos = pos;
+      PageReader pageReader = pageReaders.remove(pos.getPageNr());
+      if (pageReader != null) {
+         pageReader.close();
+      }
       while (true) {
          retPos = retPos.nextPage();
          synchronized (consumedPages) {
@@ -441,8 +473,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
    /**
     *
     */
-   private synchronized PagePosition getStartPosition() {
-      return new PagePositionImpl(pageStore.getFirstPage(), -1);
+   private synchronized PagePositionAndFileOffset getStartPosition() {
+      return new PagePositionAndFileOffset(-1, new PagePositionImpl(pageStore.getFirstPage(), -1));
    }
 
    @Override
@@ -585,7 +617,12 @@ public final class PageSubscriptionImpl implements PageSubscription {
 
    @Override
    public PagedMessage queryMessage(PagePosition pos) {
-      return cursorProvider.getMessage(pos);
+      PageReader pageReader = pageReaders.get(pos.getPageNr());
+      if (pageReader != null) {
+         return pageReader.getMessage(pos, true, false);
+      } else {
+         return cursorProvider.getMessage(pos);
+      }
    }
 
    /**
@@ -892,7 +929,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
          if (persistentSize < 0) {
             //cache.getMessage is potentially expensive depending
             //on the current cache size and which message is queried
-            size = getPersistentSize(cache.getMessage(position.getMessageNr()));
+            size = getPersistentSize(cache.getMessage(position));
          } else {
             size = persistentSize;
          }
@@ -1209,9 +1246,9 @@ public final class PageSubscriptionImpl implements PageSubscription {
 
    private class CursorIterator implements PageIterator {
 
-      private PagePosition position = null;
+      private PagePositionAndFileOffset position = null;
 
-      private PagePosition lastOperation = null;
+      private PagePositionAndFileOffset lastOperation = null;
 
       private volatile boolean isredelivery = false;
 
@@ -1234,7 +1271,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
          this.browsing = browsing;
       }
 
-
       private CursorIterator() {
          this.browsing = false;
       }
@@ -1284,8 +1320,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
 
             PagedReference message;
 
-            PagePosition lastPosition = position;
-            PagePosition tmpPosition = position;
+            PagePositionAndFileOffset lastPosition = position;
+            PagePositionAndFileOffset tmpPosition = position;
 
             do {
                synchronized (redeliveries) {
@@ -1310,7 +1346,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
                   break;
                }
 
-               tmpPosition = message.getPosition();
+               int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getEncodeSize() + Page.SIZE_RECORD;
+               tmpPosition = new PagePositionAndFileOffset(nextFileOffset, message.getPosition());
 
                boolean valid = true;
                boolean ignored = false;
@@ -1361,7 +1398,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
                   }
                }
 
-               position = message.getPosition();
+               position = tmpPosition;
 
                if (valid) {
                   match = match(message.getMessage());
@@ -1417,6 +1454,13 @@ public final class PageSubscriptionImpl implements PageSubscription {
 
       @Override
       public void close() {
+         // When the CursorIterator(especially browse one) is closed, we need to close page they opened
+         if (position != null) {
+            PageReader pageReader = pageReaders.remove(position.pagePosition.getPageNr());
+            if (pageReader != null) {
+               pageReader.close();
+            }
+         }
       }
    }
 
@@ -1458,4 +1502,20 @@ public final class PageSubscriptionImpl implements PageSubscription {
          return 0;
       }
    }
+
+   protected static class PagePositionAndFileOffset {
+
+      private final int nextFileOffset;
+      private final PagePosition pagePosition;
+
+      PagePositionAndFileOffset(int nextFileOffset, PagePosition pagePosition) {
+         this.nextFileOffset = nextFileOffset;
+         this.pagePosition = pagePosition;
+      }
+
+      PagePosition nextPagePostion() {
+         int messageNr = pagePosition.getMessageNr();
+         return new PagePositionImpl(pagePosition.getPageNr(), messageNr + 1, messageNr + 1 == 0 ? 0 : nextFileOffset);
+      }
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 47385d9..ca6ad9a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -80,6 +80,12 @@ public final class Page implements Comparable<Page> {
     */
    private Set<PageSubscriptionCounter> pendingCounters;
 
+   private int lastReadMessageNumber;
+   private ByteBuffer readFileBuffer;
+   private final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+   private ChannelBufferWrapper readFileBufferWrapper;
+   private int readProcessedBytes;
+
    public Page(final SimpleString storeName,
                final StorageManager storageManager,
                final SequentialFileFactory factory,
@@ -90,6 +96,7 @@ public final class Page implements Comparable<Page> {
       fileFactory = factory;
       this.storageManager = storageManager;
       this.storeName = storeName;
+      resetReadMessageStatus();
    }
 
    public int getPageId() {
@@ -104,6 +111,129 @@ public final class Page implements Comparable<Page> {
       return pageCache;
    }
 
+   private synchronized void resetReadMessageStatus() {
+      lastReadMessageNumber = -3;
+      readProcessedBytes = 0;
+   }
+
+   public synchronized PagedMessage readMessage(int startOffset,
+                                                int startMessageNumber,
+                                                int targetMessageNumber) throws Exception {
+      assert startMessageNumber <= targetMessageNumber;
+
+      if (!file.isOpen()) {
+         throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
+      }
+      final int fileSize = (int) file.size();
+      try {
+         if (readFileBuffer == null) {
+            readProcessedBytes = startOffset;
+            file.position(readProcessedBytes);
+            readFileBuffer = fileFactory.allocateDirectBuffer(Math.min(fileSize - readProcessedBytes, MIN_CHUNK_SIZE));
+            //the wrapper is reused to avoid unnecessary allocations
+            readFileBufferWrapper = wrapWhole(readFileBuffer);
+            readFileBuffer.limit(0);
+         } else if (lastReadMessageNumber + 1 != targetMessageNumber) {
+            readProcessedBytes = startOffset;
+            file.position(readProcessedBytes);
+            readFileBuffer.limit(0);
+         } else {
+            startMessageNumber = targetMessageNumber;
+         }
+
+         int remainingBytes = fileSize - readProcessedBytes;
+         int currentMessageNumber = startMessageNumber;
+         // First we search forward for the file position of the target number message
+         while (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE && currentMessageNumber < targetMessageNumber) {
+            headerBuffer.clear();
+            file.read(headerBuffer);
+            headerBuffer.position(0);
+
+            if (headerBuffer.remaining() >= HEADER_SIZE && headerBuffer.get() == START_BYTE) {
+               final int encodedSize = headerBuffer.getInt();
+               final int nextPosition = readProcessedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
+               if (nextPosition <= fileSize) {
+                  final int endPosition = nextPosition - 1;
+                  file.position(endPosition);
+                  headerBuffer.rewind();
+                  headerBuffer.limit(1);
+                  file.read(headerBuffer);
+                  headerBuffer.position(0);
+
+                  if (headerBuffer.remaining() >= 1 && headerBuffer.get() == END_BYTE) {
+                     readProcessedBytes = nextPosition;
+                     currentMessageNumber++;
+                  } else {
+                     markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+                     break;
+                  }
+               } else {
+                  markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+                  break;
+               }
+            } else {
+               markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+               break;
+            }
+            remainingBytes = fileSize - readProcessedBytes;
+         }
+
+         // Then we read the target message
+         if (currentMessageNumber == targetMessageNumber && remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
+            final ByteBuffer oldFileBuffer = readFileBuffer;
+            readFileBuffer = readIntoFileBufferIfNecessary(readFileBuffer, MINIMUM_MSG_PERSISTENT_SIZE, true);
+            //change wrapper if fileBuffer has changed
+            if (readFileBuffer != oldFileBuffer) {
+               readFileBufferWrapper = wrapWhole(readFileBuffer);
+            }
+            final byte startByte = readFileBuffer.get();
+            if (startByte == Page.START_BYTE) {
+               final int encodedSize = readFileBuffer.getInt();
+               final int nextPosition = readProcessedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
+               if (nextPosition <= fileSize) {
+                  final ByteBuffer currentFileBuffer = readFileBuffer;
+                  readFileBuffer = readIntoFileBufferIfNecessary(readFileBuffer, encodedSize + 1, true);
+                  //change wrapper if fileBuffer has changed
+                  if (readFileBuffer != currentFileBuffer) {
+                     readFileBufferWrapper = wrapWhole(readFileBuffer);
+                  }
+                  final int endPosition = readFileBuffer.position() + encodedSize;
+                  //this check must be performed upfront decoding
+                  if (readFileBuffer.remaining() >= (encodedSize + 1) && readFileBuffer.get(endPosition) == Page.END_BYTE) {
+                     final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
+                     readFileBufferWrapper.setIndex(readFileBuffer.position(), endPosition);
+                     msg.decode(readFileBufferWrapper);
+                     readFileBuffer.position(endPosition + 1);
+                     assert readFileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
+                     msg.initMessage(storageManager);
+                     if (logger.isTraceEnabled()) {
+                        logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
+                     }
+                     readProcessedBytes = nextPosition;
+                     lastReadMessageNumber = targetMessageNumber;
+                     return msg;
+                  } else {
+                     markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+                  }
+               } else {
+                  markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+               }
+            } else {
+               markFileAsSuspect(file.getFileName(), readProcessedBytes, currentMessageNumber);
+            }
+         }
+      } catch (Exception e) {
+         resetReadMessageStatus();
+         throw e;
+      }
+      resetReadMessageStatus();
+      throw new RuntimeException("target message no." + targetMessageNumber + " not found from start offset " + startOffset + " and start message number " + startMessageNumber);
+   }
+
+   public synchronized List<PagedMessage> read() throws Exception {
+      return read(storageManager);
+   }
+
    public synchronized List<PagedMessage> read(StorageManager storage) throws Exception {
       if (logger.isDebugEnabled()) {
          logger.debug("reading page " + this.pageId + " on address = " + storeName);
@@ -122,10 +252,17 @@ public final class Page implements Comparable<Page> {
       return messages;
    }
 
-   private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
-      final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
-      newFileBuffer.put(fileBuffer);
-      fileFactory.releaseBuffer(fileBuffer);
+   private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes, boolean direct) throws Exception {
+      ByteBuffer newFileBuffer;
+      if (direct) {
+         newFileBuffer = fileFactory.allocateDirectBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
+         newFileBuffer.put(fileBuffer);
+         fileFactory.releaseDirectBuffer(fileBuffer);
+      } else {
+         newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
+         newFileBuffer.put(fileBuffer);
+         fileFactory.releaseBuffer(fileBuffer);
+      }
       fileBuffer = newFileBuffer;
       //move the limit to allow reading as much as possible from the file
       fileBuffer.limit(fileBuffer.capacity());
@@ -138,7 +275,7 @@ public final class Page implements Comparable<Page> {
     * It returns a {@link ByteBuffer} that has {@link ByteBuffer#remaining()} bytes >= {@code requiredBytes}
     * of valid data from {@link #file}.
     */
-   private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
+   private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes, boolean direct) throws Exception {
       final int remaining = fileBuffer.remaining();
       //fileBuffer::remaining is the current size of valid data
       final int bytesToBeRead = requiredBytes - remaining;
@@ -162,7 +299,7 @@ public final class Page implements Comparable<Page> {
             file.read(fileBuffer);
             fileBuffer.position(0);
          } else {
-            fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes);
+            fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes, direct);
          }
       }
       return fileBuffer;
@@ -189,6 +326,7 @@ public final class Page implements Comparable<Page> {
    //sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
    private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
    private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
+   private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
    private static final int MIN_CHUNK_SIZE = Env.osPageSize();
 
    private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws Exception {
@@ -208,7 +346,7 @@ public final class Page implements Comparable<Page> {
             fileBuffer.limit(0);
             do {
                final ByteBuffer oldFileBuffer = fileBuffer;
-               fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE);
+               fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE, false);
                //change wrapper if fileBuffer has changed
                if (fileBuffer != oldFileBuffer) {
                   fileBufferWrapper = wrapWhole(fileBuffer);
@@ -219,7 +357,7 @@ public final class Page implements Comparable<Page> {
                   final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
                   if (nextPosition <= fileSize) {
                      final ByteBuffer currentFileBuffer = fileBuffer;
-                     fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1);
+                     fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1, false);
                      //change wrapper if fileBuffer has changed
                      if (fileBuffer != currentFileBuffer) {
                         fileBufferWrapper = wrapWhole(fileBuffer);
@@ -317,6 +455,11 @@ public final class Page implements Comparable<Page> {
     * While reading the cache we don't need (and shouldn't inform the backup
     */
    public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
+      if (readFileBuffer != null) {
+         fileFactory.releaseDirectBuffer(readFileBuffer);
+         readFileBuffer = null;
+      }
+
       if (sendEvent && storageManager != null) {
          storageManager.pageClosed(storeName, pageId);
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
index 3bedc92..2c6dbdd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java
@@ -82,6 +82,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
 
    private JDBCSequentialFile directoryList;
 
+   private final boolean readWholePage;
+
    @Override
    public ScheduledExecutorService getScheduledExecutor() {
       return scheduledExecutor;
@@ -105,6 +107,17 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
                                      final ExecutorFactory executorFactory,
                                      final boolean syncNonTransactional,
                                      final IOCriticalErrorListener critialErrorListener) throws Exception {
+      this(dbConf, storageManager, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
+   }
+
+   public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
+                                     final StorageManager storageManager,
+                                     final long syncTimeout,
+                                     final ScheduledExecutorService scheduledExecutor,
+                                     final ExecutorFactory executorFactory,
+                                     final boolean syncNonTransactional,
+                                     final IOCriticalErrorListener critialErrorListener,
+                                     final boolean readWholePage) throws Exception {
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
       this.syncNonTransactional = syncNonTransactional;
@@ -113,6 +126,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
       this.dbConf = dbConf;
       this.criticalErrorListener = critialErrorListener;
       this.factoryToTableName = new HashMap<>();
+      this.readWholePage = readWholePage;
       start();
    }
 
@@ -160,7 +174,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
                                                StorageManager storageManager,
                                                AddressSettings addressSettings,
                                                ArtemisExecutor executor) {
-      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 364f221..0a1119e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -76,6 +76,8 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
 
    private final IOCriticalErrorListener critialErrorListener;
 
+   private final boolean readWholePage;
+
    public File getDirectory() {
       return directory;
    }
@@ -111,6 +113,17 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
                                 final ExecutorFactory executorFactory,
                                 final boolean syncNonTransactional,
                                 final IOCriticalErrorListener critialErrorListener) {
+      this(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener, false);
+   }
+
+   public PagingStoreFactoryNIO(final StorageManager storageManager,
+                                final File directory,
+                                final long syncTimeout,
+                                final ScheduledExecutorService scheduledExecutor,
+                                final ExecutorFactory executorFactory,
+                                final boolean syncNonTransactional,
+                                final IOCriticalErrorListener critialErrorListener,
+                                final boolean readWholePage) {
       this.storageManager = storageManager;
       this.directory = directory;
       this.executorFactory = executorFactory;
@@ -118,6 +131,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
       this.scheduledExecutor = scheduledExecutor;
       this.syncTimeout = syncTimeout;
       this.critialErrorListener = critialErrorListener;
+      this.readWholePage = readWholePage;
    }
 
    // Public --------------------------------------------------------
@@ -146,7 +160,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
                                                StorageManager storageManager,
                                                AddressSettings addressSettings,
                                                ArtemisExecutor executor) {
-      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize(), readWholePage);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 70a4fbf..ba2bfdc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2571,9 +2571,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    protected PagingStoreFactory getPagingStoreFactory() throws Exception {
       if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
          DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
-         return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO);
+         return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
       }
-      return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
+      return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
    }
 
    /**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 998ef8c..d56e4db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -108,6 +107,7 @@ import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.jboss.logging.Logger;
+import org.jctools.queues.MpscUnboundedArrayQueue;
 
 /**
  * Implementation of a Queue
@@ -176,7 +176,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // Messages will first enter intermediateMessageReferences
    // Before they are added to messageReferences
    // This is to avoid locking the queue on the producer
-   private final ConcurrentLinkedQueue<MessageReference> intermediateMessageReferences = new ConcurrentLinkedQueue<>();
+   private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
 
    // This is where messages are stored
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES);
@@ -209,7 +209,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private long pauseStatusRecord = -1;
 
-   private static final int MAX_SCHEDULED_RUNNERS = 2;
+   private static final int MAX_SCHEDULED_RUNNERS = 1;
+   private static final int MAX_DEPAGE_NUM = MAX_DELIVERIES_IN_LOOP * MAX_SCHEDULED_RUNNERS;
 
    // We don't ever need more than two DeliverRunner on the executor's list
    // that is getting the worse scenario possible when one runner is almost finishing before the second started
@@ -324,13 +325,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          out.println("consumer: " + holder.consumer.debug());
       }
 
-      for (MessageReference reference : intermediateMessageReferences) {
-         out.print("Intermediate reference:" + reference);
-      }
-
-      if (intermediateMessageReferences.isEmpty()) {
-         out.println("No intermediate references");
-      }
+      out.println("Intermediate reference size is " + intermediateMessageReferences.size());
 
       boolean foundRef = false;
 
@@ -1042,14 +1037,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void deliverAsync() {
       if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
          scheduledRunners.incrementAndGet();
+         checkDepage();
          try {
             getExecutor().execute(deliverRunner);
          } catch (RejectedExecutionException ignored) {
             // no-op
             scheduledRunners.decrementAndGet();
          }
-
-         checkDepage();
       }
 
    }
@@ -2513,6 +2507,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          logger.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
       }
 
+      scheduledRunners.decrementAndGet();
+
       doInternalPoll();
 
       // Either the iterator is empty or the consumer is busy
@@ -2699,7 +2695,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
     * @return
     */
    private boolean needsDepage() {
-      return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
+      return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize() &&
+         /**
+          * In most cases, one depage round following by at most MAX_SCHEDULED_RUNNERS deliver round,
+          * thus we just need to read MAX_DELIVERIES_IN_LOOP * MAX_SCHEDULED_RUNNERS messages. If we read too much, the message reference
+          * maybe discarded by gc collector in response to memory demand and we need to read it again at
+          * a great cost when delivering.
+          */
+         intermediateMessageReferences.size() + messageReferences.size() < MAX_DEPAGE_NUM;
    }
 
    private SimpleString extractGroupID(MessageReference ref) {
@@ -3634,8 +3637,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.errorDelivering(e);
-         } finally {
-            scheduledRunners.decrementAndGet();
          }
       }
    }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 78d5ba4..2ade964 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -625,6 +625,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="read-whole-page" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Whether the whole page is read while getting message after page cache is evicted.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="journal-directory" type="xsd:string" default="data/journal" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 22e577e..ab4a7ea 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -88,6 +88,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
       Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMemoryMeasureInterval(), conf.getMemoryMeasureInterval());
       Assert.assertEquals(conf.getJournalLocation(), conf.getNodeManagerLockLocation());
       Assert.assertNull(conf.getJournalDeviceBlockSize());
+      Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultReadWholePage(), conf.isReadWholePage());
    }
 
    @Test
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 91f92aa..98711f7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -125,6 +125,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming());
 
       Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
+      Assert.assertEquals(true, conf.isReadWholePage());
       Assert.assertEquals("somedir2", conf.getJournalDirectory());
       Assert.assertEquals(false, conf.isCreateJournalDir());
       Assert.assertEquals(JournalType.NIO, conf.getJournalType());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
index 4ed38e8..5a75241 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
@@ -74,4 +74,41 @@ public class PageCursorProviderImplTest {
       }
    }
 
+   @Test(timeout = 30_000)
+   public void returnPageCacheImplIfEvicted() throws Exception {
+      returnCacheIfEvicted(true);
+   }
+
+   @Test(timeout = 30_000)
+   public void returnPageReaderIfEvicted() throws Exception {
+      returnCacheIfEvicted(false);
+   }
+
+   private void returnCacheIfEvicted(boolean readWholePage) throws Exception {
+      final PagingStore pagingStore = mock(PagingStore.class);
+      final StorageManager storageManager = mock(StorageManager.class);
+      when(storageManager.beforePageRead(anyLong(), any(TimeUnit.class))).thenReturn(true);
+      final int pages = 2;
+      final ArtemisExecutor artemisExecutor = mock(ArtemisExecutor.class);
+      final PageCursorProviderImpl pageCursorProvider = new PageCursorProviderImpl(pagingStore, storageManager, artemisExecutor, 1, readWholePage);
+      when(pagingStore.getCurrentWritingPage()).thenReturn(pages);
+      when(pagingStore.checkPageFileExists(anyInt())).thenReturn(true);
+      final Page firstPage = mock(Page.class);
+      when(firstPage.getPageId()).thenReturn(1);
+      when(pagingStore.createPage(1)).thenReturn(firstPage);
+      final Page secondPage = mock(Page.class);
+      when(secondPage.getPageId()).thenReturn(2);
+      when(pagingStore.createPage(2)).thenReturn(secondPage);
+
+      Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageCacheImpl);
+      Assert.assertTrue(pageCursorProvider.getPageCache(2) instanceof PageCacheImpl);
+      if (readWholePage) {
+         Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageCacheImpl);
+      } else {
+         Assert.assertTrue(pageCursorProvider.getPageCache(1) instanceof PageReader);
+      }
+      Assert.assertEquals(pageCursorProvider.getCacheSize(), 1);
+      Assert.assertTrue(pageCursorProvider.getPageCache(2) instanceof PageCacheImpl);
+      pageCursorProvider.stop();
+   }
 }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
new file mode 100644
index 0000000..138bf5a
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.PagePositionAndFileOffset;
+import static org.apache.activemq.artemis.utils.RandomUtil.randomBoolean;
+
+public class PageReaderTest extends ActiveMQTestBase {
+
+   @Test
+   public void testPageReadMessage() throws Exception {
+      recreateDirectory(getTestDir());
+
+      int num = 50;
+      int[] offsets = createPage(num);
+      PageReader pageReader = getPageReader();
+
+      PagedMessage[] pagedMessages = pageReader.getMessages();
+      assertEquals(pagedMessages.length, num);
+
+      PagedMessage pagedMessage = null;
+      for (int i = 0; i < num; i++) {
+         if (randomBoolean()) {
+            PagePosition pagePosition = new PagePositionImpl(10, i);
+            pagedMessage = pageReader.getMessage(pagePosition);
+         } else {
+            int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
+            PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1));
+            PagePosition pagePosition = startPosition.nextPagePostion();
+            assertEquals(offsets[i], pagePosition.getFileOffset());
+            pagedMessage = pageReader.getMessage(pagePosition);
+         }
+         assertNotNull(pagedMessage);
+         assertEquals(pagedMessage.getMessage().getMessageID(), i);
+         assertEquals(pagedMessages[i].getMessage().getMessageID(), i);
+      }
+
+      pageReader.close();
+   }
+
+   @Test
+   public void testPageReadMessageBeyondPage() throws Exception {
+      recreateDirectory(getTestDir());
+
+      int num = 10;
+      createPage(num);
+      PageReader pageReader = getPageReader();
+
+      assertNull(pageReader.getMessage(new PagePositionImpl(10, num)));
+      try {
+         pageReader.getMessage(new PagePositionImpl(10, num), true, true);
+         assertFalse("Expect exception since message number is beyond page ", true);
+      } catch (NonExistentPage e) {
+      }
+
+      pageReader.close();
+   }
+
+   @Test
+   public void testPageReadMessageKeepOpen() throws Exception {
+      recreateDirectory(getTestDir());
+
+      int num = 10;
+      createPage(num);
+      PageReader pageReader = getPageReader();
+
+      pageReader.getMessage(new PagePositionImpl(10, 1), true, true);
+      assertFalse("Page file should keep open", pageReader.openPage());
+      pageReader.getMessage(new PagePositionImpl(10, 1), true, false);
+      assertFalse("Page file should preserve previous state", pageReader.openPage());
+
+      pageReader.close();
+      pageReader.getMessage(new PagePositionImpl(10, 1), true, false);
+      assertTrue("Page file should preserve previous state", pageReader.openPage());
+
+      pageReader.close();
+   }
+
+   private int[] createPage(int num) throws Exception {
+      SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
+      SequentialFile file = factory.createSequentialFile("00010.page");
+      Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
+      page.open();
+      SimpleString simpleDestination = new SimpleString("Test");
+      int[] offsets = new int[num];
+      for (int i = 0; i < num; i++) {
+         ICoreMessage msg = new CoreMessage().setMessageID(i).initBuffer(1024);
+
+         for (int j = 0; j < 100; j++) {
+            msg.getBodyBuffer().writeByte((byte) 'b');
+         }
+
+         msg.setAddress(simpleDestination);
+         offsets[i] = (int)page.getFile().position();
+         page.write(new PagedMessageImpl(msg, new long[0]));
+
+         Assert.assertEquals(i + 1, page.getNumberOfMessages());
+      }
+      page.close(false, false);
+      return offsets;
+   }
+
+   private PageReader getPageReader() throws Exception {
+      SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDirfile(), 1);
+      SequentialFile file = factory.createSequentialFile("00010.page");
+      file.open();
+      Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
+      page.open();
+      page.read(new NullStorageManager());
+      PageReader pageReader = new PageReader(page, page.getNumberOfMessages());
+      return pageReader;
+   }
+
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 9815511..c97bcbf 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -342,6 +342,7 @@
       <bindings-directory>somedir</bindings-directory>
       <create-bindings-dir>false</create-bindings-dir>
       <page-max-concurrent-io>17</page-max-concurrent-io>
+      <read-whole-page>true</read-whole-page>
       <journal-directory>somedir2</journal-directory>
       <create-journal-dir>false</create-journal-dir>
       <journal-type>NIO</journal-type>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 4a1ece0..23ab14f 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -256,6 +256,7 @@
       <bindings-directory>somedir</bindings-directory>
       <create-bindings-dir>false</create-bindings-dir>
       <page-max-concurrent-io>17</page-max-concurrent-io>
+      <read-whole-page>true</read-whole-page>
       <journal-directory>somedir2</journal-directory>
       <create-journal-dir>false</create-journal-dir>
       <journal-type>NIO</journal-type>
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 0ea3d1d..0a0f0fe 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -617,6 +617,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="read-whole-page" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Whether the whole page is read while getting message after page cache is evicted.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="journal-directory" type="xsd:string" default="data/journal" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index c18c082..3e116ec 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -159,6 +159,7 @@ log-delegate-factory-class-name | **deprecated** the name of the factory class t
 name | node name; used in topology notifications if set. | n/a
 [password-codec](masking-passwords.md) | the name of the class (and optional configuration properties) used to decode masked passwords. Only valid when `mask-password` is `true`. | n/a
 [page-max-concurrent-io](paging.md) | The max number of concurrent reads allowed on paging. | 5
+[read-whole-page](paging.md) | If true the whole page would be read, otherwise just seek and read while getting message. | `false`
 [paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging`
 [persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false`
 [persistence-enabled](persistence.md#zero-persistence)| true means that the server will use the file based journal for persistence. | `true`
diff --git a/pom.xml b/pom.xml
index 67ed992..f76821e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,6 +90,7 @@
       <jgroups.version>3.6.13.Final</jgroups.version>
       <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
       <mockito.version>2.25.0</mockito.version>
+      <jctools.version>2.1.1</jctools.version>
       <netty.version>4.1.34.Final</netty.version>
       <netty-tcnative-version>2.0.22.Final</netty-tcnative-version>
       <proton.version>0.33.2</proton.version>
@@ -494,6 +495,12 @@
          </dependency>
          <!--needed to compile transport jar-->
          <dependency>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
+            <version>${jctools.version}</version>
+            <!-- License: Apache 2.0 -->
+         </dependency>
+         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
             <version>${netty.version}</version>
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java
index c9ec140..db6ada0 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmx/JmxConnectionTest.java
@@ -27,8 +27,8 @@ import java.rmi.server.RemoteObject;
 import java.rmi.server.RemoteRef;
 
 import io.netty.util.internal.PlatformDependent;
-import io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
 import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.jctools.util.UnsafeAccess;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;