You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/03/19 09:35:25 UTC

[ignite] branch master updated: IGNITE-7792 Generalize logic for checkpointing machinery - Fixes #7513.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2261487  IGNITE-7792 Generalize logic for checkpointing machinery - Fixes #7513.
2261487 is described below

commit 2261487c8504fd5d00d1f624c43b95c45f7c05b4
Author: zstan <st...@gmail.com>
AuthorDate: Thu Mar 19 11:15:33 2020 +0300

    IGNITE-7792 Generalize logic for checkpointing machinery - Fixes #7513.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../cache/persistence/CheckpointProgress.java      |  15 +
 .../GridCacheDatabaseSharedManager.java            | 416 ++++++++++-----------
 .../cache/persistence/metastorage/MetaStorage.java |   2 +-
 .../cache/persistence/pagemem/PageMemoryImpl.java  |  12 +-
 .../util/GridConcurrentMultiPairQueue.java         | 202 ++++++++++
 .../internal/util/GridReadOnlyArrayView.java       |  80 ----
 .../persistence/IgnitePdsCorruptedStoreTest.java   |   3 +-
 .../util/GridConcurrentMultiPairQueueTest.java     | 155 ++++++++
 .../ignite/internal/util/IgniteUtilsSelfTest.java  |   5 +-
 .../ignite/testsuites/IgniteUtilSelfTestSuite.java |   2 +
 10 files changed, 587 insertions(+), 305 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
index 52db794b..17a0b58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointProgress.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence;
 
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Represents information of a progress of a given checkpoint and
@@ -29,4 +30,18 @@ public interface CheckpointProgress {
 
     /** */
     public GridFutureAdapter futureFor(CheckpointState state);
+
+    /**
+     * Mark this checkpoint execution as failed.
+     *
+     * @param error Causal error of fail.
+     */
+    public void fail(Throwable error);
+
+    /**
+     * Changing checkpoint state if order of state is correct.
+     *
+     * @param newState New checkpoint state.
+     */
+    public void transitTo(@NotNull CheckpointState newState);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index ea7af29..8bb2bcc 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -48,10 +48,10 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
@@ -68,6 +68,7 @@ import java.util.function.ToLongFunction;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+
 import org.apache.ignite.DataRegionMetricsProvider;
 import org.apache.ignite.DataStorageMetrics;
 import org.apache.ignite.IgniteCheckedException;
@@ -155,8 +156,8 @@ import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridCountDownCallback;
+import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
-import org.apache.ignite.internal.util.GridReadOnlyArrayView;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.TimeBag;
@@ -539,6 +540,36 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
+     *
+     * @return holder of FullPageIds obtained from each PageMemory, overall number of dirty
+     * pages, and flag defines at least one user page became a dirty since last checkpoint.
+     */
+    private CheckpointPagesInfoHolder beginAllCheckpoints(IgniteInternalFuture<?> allowToReplace) {
+        Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> res =
+            new ArrayList<>(dataRegions().size());
+
+        int pagesNum = 0;
+
+        boolean hasUserDirtyPages = false;
+
+        for (DataRegion reg : dataRegions()) {
+            if (!reg.config().isPersistenceEnabled())
+                continue;
+
+            GridMultiCollectionWrapper<FullPageId> nextCpPages = ((PageMemoryEx)reg.pageMemory()).beginCheckpoint(allowToReplace);
+
+            pagesNum += nextCpPages.size();
+
+            res.add(new T2<>((PageMemoryEx)reg.pageMemory(), nextCpPages));
+        }
+
+        currCheckpointPagesCnt = pagesNum;
+
+        return new CheckpointPagesInfoHolder(res, pagesNum, hasUserDirtyPages);
+    }
+
+    /**
      * Create metastorage data region configuration with enabled persistence by default.
      *
      * @param storageCfg Data storage configuration.
@@ -2640,6 +2671,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @return PageMemoryEx instance.
      * @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
      */
+    // TODO IGNITE-12722: Get rid of GridCacheDatabaseSharedManager#getPageMemoryForCacheGroup functionality.
     private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
         if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
             return (PageMemoryEx)dataRegion(METASTORE_DATA_REGION_NAME).pageMemory();
@@ -3056,27 +3088,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         Collection<DataRegion> regions = dataRegions();
 
-        Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(regions.size());
-
-        int pagesNum = 0;
-
-        GridFinishedFuture finishedFuture = new GridFinishedFuture();
-
-        // Collect collection of dirty pages from all regions.
-        for (DataRegion memPlc : regions) {
-            if (memPlc.config().isPersistenceEnabled()){
-                GridMultiCollectionWrapper<FullPageId> nextCpPagesCol =
-                    ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint(finishedFuture);
-
-                pagesNum += nextCpPagesCol.size();
-
-                res.add(nextCpPagesCol);
-            }
-        }
+        CheckpointPagesInfoHolder cpPagesHolder = beginAllCheckpoints(new GridFinishedFuture<>());
 
         // Sort and split all dirty pages set to several stripes.
-        GridMultiCollectionWrapper<FullPageId> pages = splitAndSortCpPagesIfNeeded(
-            new IgniteBiTuple<>(res, pagesNum), exec.stripesCount());
+        GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> pages =
+            splitAndSortCpPagesIfNeeded(cpPagesHolder);
 
         // Identity stores set for future fsync.
         Collection<PageStore> updStores = new GridConcurrentHashSet<>();
@@ -3086,13 +3102,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         // Shared refernce for tracking exception during write pages.
         AtomicReference<IgniteCheckedException> writePagesError = new AtomicReference<>();
 
-        for (int i = 0; i < pages.collectionsSize(); i++) {
-            // Calculate stripe index.
-            int stripeIdx = i % exec.stripesCount();
-
-            // Inner collection index.
-            int innerIdx = i;
-
+        for (int stripeIdx = 0; stripeIdx < exec.stripesCount(); stripeIdx++) {
             exec.execute(stripeIdx, () -> {
                 PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
                     assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;
@@ -3112,34 +3122,34 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 writePageBuf.order(ByteOrder.nativeOrder());
 
-                Collection<FullPageId> pages0 = pages.innerCollection(innerIdx);
+                GridConcurrentMultiPairQueue.Result<PageMemoryEx, FullPageId> res =
+                    new GridConcurrentMultiPairQueue.Result<>();
 
-                FullPageId fullPageId = null;
+                int pagesWritten = 0;
 
                 try {
-                    for (FullPageId fullId : pages0) {
+                    while (pages.next(res)) {
                         // Fail-fast break if some exception occurred.
                         if (writePagesError.get() != null)
                             break;
 
-                        // Save pageId to local variable for future using if exception occurred.
-                        fullPageId = fullId;
-
-                        PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullId.groupId());
+                        PageMemoryEx pageMem = res.getKey();
 
                         // Write page content to page store via pageStoreWriter.
                         // Tracker is null, because no need to track checkpoint metrics on recovery.
-                        pageMem.checkpointWritePage(fullId, writePageBuf, pageStoreWriter, null);
-                    }
+                        pageMem.checkpointWritePage(res.getValue(), writePageBuf, pageStoreWriter, null);
 
-                    // Add number of handled pages.
-                    cpPagesCnt.addAndGet(pages0.size());
+                        // Add number of handled pages.
+                        pagesWritten++;
+                    }
                 }
                 catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to write page to pageStore, pageId=" + fullPageId);
+                    U.error(log, "Failed to write page to pageStore: " + res);
 
                     writePagesError.compareAndSet(null, e);
                 }
+
+                cpPagesCnt.addAndGet(pagesWritten);
             });
         }
 
@@ -3769,26 +3779,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         ConcurrentLinkedHashMap<PageStore, LongAdder> updStores = new ConcurrentLinkedHashMap<>();
 
                         CountDownFuture doneWriteFut = new CountDownFuture(
-                            asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
+                            asyncRunner == null ? 1 : persistenceCfg.getCheckpointThreads());
 
                         tracker.onPagesWriteStart();
 
-                        final int totalPagesToWriteCnt = chp.cpPages.size();
+                        final int totalPagesToWriteCnt = chp.pagesSize;
 
                         if (asyncRunner != null) {
-                            for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
+                            for (int i = 0; i < persistenceCfg.getCheckpointThreads(); i++) {
                                 Runnable write = new WriteCheckpointPages(
                                     tracker,
-                                    chp.cpPages.innerCollection(i),
+                                    chp.cpPages,
                                     updStores,
                                     doneWriteFut,
                                     totalPagesToWriteCnt,
-                                    new Runnable() {
-                                        @Override public void run() {
-                                            updateHeartbeat();
-                                        }
-                                    },
-                                    asyncRunner
+                                    () -> updateHeartbeat()
                                 );
 
                                 try {
@@ -3812,12 +3817,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                                 updStores,
                                 doneWriteFut,
                                 totalPagesToWriteCnt,
-                                new Runnable() {
-                                    @Override public void run() {
-                                        updateHeartbeat();
-                                    }
-                                },
-                                null);
+                                () -> updateHeartbeat());
 
                             write.run();
                         }
@@ -4118,9 +4118,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             IgniteFuture snapFut = null;
 
-            IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
+            CheckpointPagesInfoHolder cpPagesHolder;
+
+            int dirtyPagesCount;
 
-            boolean hasPages, hasPartitionsToDestroy;
+            boolean hasUserPages, hasPartitionsToDestroy;
 
             DbCheckpointContextImpl ctx0 = new DbCheckpointContextImpl(curr, new PartitionAllocationMap());
 
@@ -4161,15 +4163,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 fillCacheGroupState(cpRec);
 
                 //There are allowable to replace pages only after checkpoint entry was stored to disk.
-                cpPagesTuple = beginAllCheckpoints(curr.futureFor(MARKER_STORED_TO_DISK));
+                cpPagesHolder = beginAllCheckpoints(curr.futureFor(MARKER_STORED_TO_DISK));
+
+                dirtyPagesCount = cpPagesHolder.pagesNum();
 
-                hasPages = hasPageForWrite(cpPagesTuple.get1());
+                hasUserPages = !cpPagesHolder.onlySystemPages();
 
                 hasPartitionsToDestroy = !curr.destroyQueue.pendingReqs.isEmpty();
 
                 WALPointer cpPtr = null;
 
-                if (hasPages || curr.nextSnapshot || hasPartitionsToDestroy) {
+                if (dirtyPagesCount > 0 || curr.nextSnapshot || hasPartitionsToDestroy) {
                     // No page updates for this checkpoint are allowed from now on.
                     cpPtr = cctx.wal().log(cpRec);
 
@@ -4177,7 +4181,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         cpPtr = CheckpointStatus.NULL_PTR;
                 }
 
-                if (hasPages || hasPartitionsToDestroy) {
+                if (dirtyPagesCount > 0 || hasPartitionsToDestroy) {
                     cp = prepareCheckpointEntry(
                         tmpWriteBuf,
                         cpTs,
@@ -4195,7 +4199,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 tracker.onLockRelease();
             }
 
-            DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasPages);
+            DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasUserPages);
 
             curr.transitTo(LOCK_RELEASED);
 
@@ -4212,7 +4216,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 }
             }
 
-            if (hasPages || hasPartitionsToDestroy) {
+            if (dirtyPagesCount > 0 || hasPartitionsToDestroy) {
                 assert cp != null;
                 assert cp.checkpointMark() != null;
 
@@ -4229,31 +4233,32 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 tracker.onSplitAndSortCpPagesStart();
 
-                GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(
-                    cpPagesTuple, persistenceCfg.getCheckpointThreads());
+                GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages =
+                    splitAndSortCpPagesIfNeeded(cpPagesHolder);
 
                 tracker.onSplitAndSortCpPagesEnd();
 
                 if (printCheckpointStats && log.isInfoEnabled()) {
                     long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker);
 
-                    log.info(
-                        String.format(
-                            CHECKPOINT_STARTED_LOG_FORMAT,
-                            cpRec.checkpointId(),
-                            cp.checkpointMark(),
-                            tracker.beforeLockDuration(),
-                            tracker.lockWaitDuration(),
-                            tracker.listenersExecuteDuration(),
-                            tracker.lockHoldDuration(),
-                            tracker.walCpRecordFsyncDuration(),
-                            tracker.writeCheckpointEntryDuration(),
-                            tracker.splitAndSortCpPagesDuration(),
-                            possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "",
-                            cpPages.size(),
-                            curr.reason
-                        )
-                    );
+                    if (log.isInfoEnabled())
+                        log.info(
+                            String.format(
+                                CHECKPOINT_STARTED_LOG_FORMAT,
+                                cpRec.checkpointId(),
+                                cp.checkpointMark(),
+                                tracker.beforeLockDuration(),
+                                tracker.lockWaitDuration(),
+                                tracker.listenersExecuteDuration(),
+                                tracker.lockHoldDuration(),
+                                tracker.walCpRecordFsyncDuration(),
+                                tracker.writeCheckpointEntryDuration(),
+                                tracker.splitAndSortCpPagesDuration(),
+                                possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "",
+                                dirtyPagesCount,
+                                curr.reason
+                            )
+                        );
                 }
 
                 return new Checkpoint(cp, cpPages, curr);
@@ -4274,7 +4279,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                             curr.reason));
                 }
 
-                return new Checkpoint(null, new GridMultiCollectionWrapper<>(new Collection[0]), curr);
+                return new Checkpoint(null, GridConcurrentMultiPairQueue.EMPTY, curr);
             }
         }
 
@@ -4456,53 +4461,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         }
 
         /**
-         * Check that at least one collection is not empty.
-         *
-         * @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages.
-         */
-        private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>> cpPagesCollWrapper) {
-            boolean hasPages = false;
-
-            for (Collection c : cpPagesCollWrapper)
-                if (!c.isEmpty()) {
-                    hasPages = true;
-
-                    break;
-                }
-
-            return hasPages;
-        }
-
-        /**
-         * @return tuple with collections of FullPageIds obtained from each PageMemory, overall number of dirty
-         * pages, and flag defines at least one user page became a dirty since last checkpoint.
-         * @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
-         */
-        private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints(
-            IgniteInternalFuture allowToReplace
-        ) {
-            Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(dataRegions().size());
-
-            int pagesNum = 0;
-
-            for (DataRegion memPlc : dataRegions()) {
-                if (!memPlc.config().isPersistenceEnabled())
-                    continue;
-
-                GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx)memPlc.pageMemory())
-                    .beginCheckpoint(allowToReplace);
-
-                pagesNum += nextCpPagesCol.size();
-
-                res.add(nextCpPagesCol);
-            }
-
-            currCheckpointPagesCnt = pagesNum;
-
-            return new IgniteBiTuple<>(res, pagesNum);
-        }
-
-        /**
          * @param chp Checkpoint snapshot.
          */
         private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
@@ -4650,68 +4608,60 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
-     * Reorders list of checkpoint pages and splits them into needed number of sublists according to
+     * Reorders list of checkpoint pages and splits them into appropriate number of sublists according to
      * {@link DataStorageConfiguration#getCheckpointThreads()} and
      * {@link DataStorageConfiguration#getCheckpointWriteOrder()}.
      *
-     * @param cpPagesTuple Checkpoint pages tuple.
-     * @param threads Checkpoint runner threads.
+     * @param cpPages Checkpoint pages with overall count and user pages info.
      */
-    private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
-        IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple,
-        int threads
+    private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> splitAndSortCpPagesIfNeeded(
+        CheckpointPagesInfoHolder cpPages
     ) throws IgniteCheckedException {
-        FullPageId[] pagesArr = new FullPageId[cpPagesTuple.get2()];
+        Set<T2<PageMemoryEx, FullPageId[]>> cpPagesPerRegion = new HashSet<>();
 
         int realPagesArrSize = 0;
 
-        for (GridMultiCollectionWrapper<FullPageId> colWrapper : cpPagesTuple.get1()) {
-            for (int i = 0; i < colWrapper.collectionsSize(); i++)
-                for (FullPageId page : colWrapper.innerCollection(i)) {
-                    if (realPagesArrSize == pagesArr.length)
-                        throw new AssertionError("Incorrect estimated dirty pages number: " + pagesArr.length);
-
-                    pagesArr[realPagesArrSize++] = page;
-                }
-        }
+        int totalPagesCnt = cpPages.pagesNum();
 
-        FullPageId fakeMaxFullPageId = new FullPageId(Long.MAX_VALUE, Integer.MAX_VALUE);
+        for (Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>> regPages : cpPages.cpPages()) {
+            FullPageId[] pages = new FullPageId[regPages.getValue().size()];
 
-        // Some pages may have been replaced, need to fill end of array with fake ones to prevent NPE during sort.
-        for (int i = realPagesArrSize; i < pagesArr.length; i++)
-            pagesArr[i] = fakeMaxFullPageId;
+            int pagePos = 0;
 
-        if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
-            Comparator<FullPageId> cmp = new Comparator<FullPageId>() {
-                @Override public int compare(FullPageId o1, FullPageId o2) {
-                    int cmp = Long.compare(o1.groupId(), o2.groupId());
-                    if (cmp != 0)
-                        return cmp;
+            for (int i = 0; i < regPages.getValue().collectionsSize(); i++) {
+                for (FullPageId page : regPages.getValue().innerCollection(i)) {
+                    if (realPagesArrSize++ == totalPagesCnt)
+                        throw new AssertionError("Incorrect estimated dirty pages number: " + totalPagesCnt);
 
-                    return Long.compare(o1.effectivePageId(), o2.effectivePageId());
+                    pages[pagePos++] = page;
                 }
-            };
+            }
 
-            if (pagesArr.length >= parallelSortThreshold)
-                parallelSortInIsolatedPool(pagesArr, cmp);
+            // Some pages may have been already replaced.
+            if (pagePos != pages.length)
+                cpPagesPerRegion.add(new T2<>(regPages.getKey(), Arrays.copyOf(pages, pagePos)));
             else
-                Arrays.sort(pagesArr, cmp);
+                cpPagesPerRegion.add(new T2<>(regPages.getKey(), pages));
         }
 
-        int pagesSubLists = threads == 1 ? 1 : threads * 4;
-        // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
-
-        Collection[] pagesSubListArr = new Collection[pagesSubLists];
+        if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
+            Comparator<FullPageId> cmp = Comparator.comparingInt(FullPageId::groupId)
+                .thenComparingLong(FullPageId::effectivePageId);
 
-        for (int i = 0; i < pagesSubLists; i++) {
-            int from = (int)((long)realPagesArrSize * i / pagesSubLists);
+            ForkJoinPool pool = null;
 
-            int to = (int)((long)realPagesArrSize * (i + 1) / pagesSubLists);
+            for (T2<PageMemoryEx, FullPageId[]> pagesPerReg : cpPagesPerRegion) {
+                if (pagesPerReg.getValue().length >= parallelSortThreshold)
+                    pool = parallelSortInIsolatedPool(pagesPerReg.get2(), cmp, pool);
+                else
+                    Arrays.sort(pagesPerReg.get2(), cmp);
+            }
 
-            pagesSubListArr[i] = new GridReadOnlyArrayView(pagesArr, from, to);
+            if (pool != null)
+                pool.shutdown();
         }
 
-        return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
+        return new GridConcurrentMultiPairQueue<>(cpPagesPerRegion);
     }
 
     /**
@@ -4719,10 +4669,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      *
      * @param pagesArr Pages array.
      * @param cmp Cmp.
+     *
+     * @return ForkJoinPool instance, check {@link ForkJoinTask#fork()} realization.
      */
-    private static void parallelSortInIsolatedPool(
+    private static ForkJoinPool parallelSortInIsolatedPool(
         FullPageId[] pagesArr,
-        Comparator<FullPageId> cmp
+        Comparator<FullPageId> cmp,
+        ForkJoinPool pool
     ) throws IgniteCheckedException {
         ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
             @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
@@ -4734,9 +4687,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
         };
 
-        ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false);
+        ForkJoinPool execPool = pool == null ?
+            new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false) : pool;
 
-        ForkJoinTask sortTask = forkJoinPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
+        Future<?> sortTask = execPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
 
         try {
             sortTask.get();
@@ -4748,7 +4702,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             throw new IgniteCheckedException("Failed to perform pages array parallel sort", e.getCause());
         }
 
-        forkJoinPool.shutdown();
+        return execPool;
     }
 
     /** Pages write task */
@@ -4757,7 +4711,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private final CheckpointMetricsTracker tracker;
 
         /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection */
-        private final Collection<FullPageId> writePageIds;
+        private final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds;
 
         /** */
         private final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
@@ -4771,9 +4725,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         /** */
         private final Runnable beforePageWrite;
 
-        /** If any pages were skipped, new task with remaining pages will be submitted here. */
-        private final ExecutorService retryWriteExecutor;
-
         /**
          * Creates task for write pages
          *
@@ -4783,16 +4734,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          * @param doneFut
          * @param totalPagesToWrite total pages to be written under this checkpoint
          * @param beforePageWrite Action to be performed before every page write.
-         * @param retryWriteExecutor Retry write executor.
          */
         private WriteCheckpointPages(
             final CheckpointMetricsTracker tracker,
-            final Collection<FullPageId> writePageIds,
+            final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds,
             final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores,
             final CountDownFuture doneFut,
             final int totalPagesToWrite,
-            final Runnable beforePageWrite,
-            final ExecutorService retryWriteExecutor
+            final Runnable beforePageWrite
         ) {
             this.tracker = tracker;
             this.writePageIds = writePageIds;
@@ -4800,22 +4749,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             this.doneFut = doneFut;
             this.totalPagesToWrite = totalPagesToWrite;
             this.beforePageWrite = beforePageWrite;
-            this.retryWriteExecutor = retryWriteExecutor;
         }
 
         /** {@inheritDoc} */
         @Override public void run() {
             snapshotMgr.beforeCheckpointPageWritten();
 
-            Collection<FullPageId> writePageIds = this.writePageIds;
+            GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds = this.writePageIds;
 
             try {
-                List<FullPageId> pagesToRetry = writePages(writePageIds);
+                GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> pagesToRetry = writePages(writePageIds);
 
                 if (pagesToRetry.isEmpty())
                     doneFut.onDone();
                 else {
-                    LT.warn(log, pagesToRetry.size() + " checkpoint pages were not written yet due to unsuccessful " +
+                    LT.warn(log, pagesToRetry.initialSize() + " checkpoint pages were not written yet due to unsuccessful " +
                         "page write lock acquisition and will be retried");
 
                     while (!pagesToRetry.isEmpty())
@@ -4833,8 +4781,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          * @param writePageIds Collections of pages to write.
          * @return pagesToRetry Pages which should be retried.
          */
-        private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
-            List<FullPageId> pagesToRetry = new ArrayList<>();
+        private GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePages(
+            GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> writePageIds
+        ) throws IgniteCheckedException {
+            Map<PageMemoryEx, List<FullPageId>> pagesToRetry = new HashMap<>();
 
             CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;
 
@@ -4844,31 +4794,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             boolean throttlingEnabled = resolveThrottlingPolicy() != PageMemoryImpl.ThrottlingPolicy.DISABLED;
 
-            for (FullPageId fullId : writePageIds) {
+            GridConcurrentMultiPairQueue.Result<PageMemoryEx, FullPageId> res =
+                new GridConcurrentMultiPairQueue.Result<>();
+
+            while (writePageIds.next(res)) {
                 if (checkpointer.shutdownNow)
                     break;
 
                 beforePageWrite.run();
 
-                int grpId = fullId.groupId();
-
-                PageMemoryEx pageMem;
-
-                // TODO IGNITE-7792 add generic mapping.
-                if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
-                    pageMem = (PageMemoryEx)metaStorage.pageMemory();
-                else if (grpId == TxLog.TX_LOG_CACHE_ID)
-                    pageMem = (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
-                else {
-                    CacheGroupContext grp = context().cache().cacheGroup(grpId);
+                FullPageId fullId = res.getValue();
 
-                    DataRegion region = grp != null ? grp.dataRegion() : null;
-
-                    if (region == null || !region.config().isPersistenceEnabled())
-                        continue;
-
-                    pageMem = (PageMemoryEx)region.pageMemory();
-                }
+                PageMemoryEx pageMem = res.getKey();
 
                 snapshotMgr.beforePageWrite(fullId);
 
@@ -4892,7 +4829,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 }
             }
 
-            return pagesToRetry;
+            return pagesToRetry.isEmpty() ?
+                GridConcurrentMultiPairQueue.EMPTY :
+                new GridConcurrentMultiPairQueue<>(pagesToRetry);
         }
 
         /**
@@ -4901,12 +4840,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          * @param pagesToRetry List pages for retry.
          * @return Checkpoint page write context.
          */
-        private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
+        private PageStoreWriter createPageStoreWriter(Map<PageMemoryEx, List<FullPageId>> pagesToRetry) {
             return new PageStoreWriter() {
                 /** {@inheritDoc} */
                 @Override public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException {
                     if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
-                        pagesToRetry.add(fullPageId);
+                        PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullPageId.groupId());
+
+                        pagesToRetry.computeIfAbsent(pageMem, k -> new ArrayList<>()).add(fullPageId);
 
                         return;
                     }
@@ -4942,10 +4883,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         @Nullable private final CheckpointEntry cpEntry;
 
         /** Checkpoint pages. */
-        private final GridMultiCollectionWrapper<FullPageId> cpPages;
+        private final GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages;
 
         /** */
-        private final CheckpointProgressImpl progress;
+        private final CheckpointProgress progress;
 
         /** Number of deleted WAL files. */
         private int walFilesDeleted;
@@ -4963,14 +4904,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          */
         private Checkpoint(
             @Nullable CheckpointEntry cpEntry,
-            @NotNull GridMultiCollectionWrapper<FullPageId> cpPages,
-            CheckpointProgressImpl progress
+            @NotNull GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages,
+            CheckpointProgress progress
         ) {
             this.cpEntry = cpEntry;
             this.cpPages = cpPages;
             this.progress = progress;
 
-            pagesSize = cpPages.size();
+            pagesSize = cpPages.initialSize();
         }
 
         /**
@@ -5118,7 +5059,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          *
          * @param error Causal error of fail.
          */
-        public void fail(Throwable error) {
+        @Override public void fail(Throwable error) {
             failCause = error;
 
             transitTo(FINISHED);
@@ -5129,7 +5070,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          *
          * @param newState New checkpoint state.
          */
-        public void transitTo(@NotNull CheckpointState newState) {
+        @Override public void transitTo(@NotNull CheckpointState newState) {
             CheckpointState state = this.state.get();
 
             if (state.ordinal() < newState.ordinal()) {
@@ -5895,4 +5836,41 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             super(msg);
         }
     }
+
+    /** Current checkpoint pages information. */
+    private static class CheckpointPagesInfoHolder {
+        /** If {@code true} there are user pages in checkpoint. */
+        private final boolean hasUserDirtyPages;
+
+        /** Total pages count in cp. */
+        private final int pagesNum;
+
+        /** Collection of pages per PageMemory distribution. */
+        private final Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> cpPages;
+
+        /** */
+        private CheckpointPagesInfoHolder(
+            Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> pages,
+            int num,
+            boolean hasUserPages) {
+            cpPages = pages;
+            pagesNum = num;
+            hasUserDirtyPages = hasUserPages;
+        }
+
+        /** If {@code true} there are user pages in checkpoint. */
+        private boolean onlySystemPages() {
+            return !hasUserDirtyPages;
+        }
+
+        /** Total pages count in cp. */
+        private int pagesNum() {
+            return pagesNum;
+        }
+
+        /** Collection of pages per PageMemory distribution. */
+        private Collection<Map.Entry<PageMemoryEx, GridMultiCollectionWrapper<FullPageId>>> cpPages() {
+            return cpPages;
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index c9e9ec4..caccbbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -655,7 +655,7 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
      * @throws IgniteCheckedException If failed.
      */
     private void saveStoreMetadata() throws IgniteCheckedException {
-        PageMemoryEx pageMem = (PageMemoryEx) pageMemory();
+        PageMemoryEx pageMem = (PageMemoryEx)pageMemory();
 
         long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, partId);
         long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, partMetaId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index d2a8562..32c235d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -1133,8 +1133,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             seg.checkpointPages = new CheckpointPages(dirtyPages, allowToReplace);
 
-            seg.dirtyPages = new GridConcurrentHashSet<>();
-            seg.dirtyPagesCntr.set(0);
+            seg.resetDirtyPages();
         }
 
         safeToUpdate.set(true);
@@ -2101,6 +2100,15 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
 
         /**
+         * Clear dirty pages collection and reset counter.
+         */
+        private void resetDirtyPages() {
+            dirtyPages = new GridConcurrentHashSet<>();
+
+            dirtyPagesCntr.set(0);
+        }
+
+        /**
          * Prepares a page removal for page replacement, if needed.
          *
          * @param fullPageId Candidate page full ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java
new file mode 100644
index 0000000..7ea9e79
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueue.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}
+ * The only garantee {@link #next} provided is sequentially emptify values per key array.
+ * i.e. input like: <br>
+ * p1 = new Pair<1, [1, 3, 5, 7]> <br>
+ * p2 = new Pair<2, [2, 3]> <br>
+ * p3 = new Pair<3, [200, 100]> <br>
+ * and further sequence of {@code poll} or {@code forEach} calls may produce output like: <br>
+ * [3, 200], [3, 100], [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3]
+ *
+ * @param <K> The type of key in input pair collection.
+ * @param <V> The type of value array.
+ */
+public class GridConcurrentMultiPairQueue<K, V> {
+    /** */
+    public static final GridConcurrentMultiPairQueue EMPTY =
+        new GridConcurrentMultiPairQueue<>(Collections.emptyMap());
+
+    /** Inner holder. */
+    private final V[][] vals;
+
+    /** Storage for every array length. */
+    private final int[] lenSeq;
+
+    /** Current absolute position. */
+    private final AtomicInteger pos = new AtomicInteger();
+
+    /** Precalculated max position. */
+    private final int maxPos;
+
+    /** Keys array. */
+    private final K[] keysArr;
+
+    /** */
+    public GridConcurrentMultiPairQueue(Map<K, ? extends Collection<V>> items) {
+        int pairCnt = (int)items.entrySet().stream().map(Map.Entry::getValue).filter(k -> k.size() > 0).count();
+
+        vals = (V[][])new Object[pairCnt][];
+
+        keysArr = (K[])new Object[pairCnt];
+
+        lenSeq = new int[pairCnt];
+
+        int keyPos = 0;
+
+        int size = -1;
+
+        for (Map.Entry<K, ? extends Collection<V>> p : items.entrySet()) {
+            if (p.getValue().isEmpty())
+                continue;
+
+            keysArr[keyPos] = p.getKey();
+
+            lenSeq[keyPos] = size += p.getValue().size();
+
+            vals[keyPos++] = (V[])p.getValue().toArray();
+        }
+
+        maxPos = size + 1;
+    }
+
+    /** */
+    public GridConcurrentMultiPairQueue(Collection<T2<K, V[]>> items) {
+        int pairCnt = (int)items.stream().map(Map.Entry::getValue).filter(k -> k.length > 0).count();
+
+        vals = (V[][])new Object[pairCnt][];
+
+        keysArr = (K[])new Object[pairCnt];
+
+        lenSeq = new int[pairCnt];
+
+        int keyPos = 0;
+
+        int size = -1;
+
+        for (Map.Entry<K, V[]> p : items) {
+            if (p.getValue().length == 0)
+                continue;
+
+            keysArr[keyPos] = p.getKey();
+
+            lenSeq[keyPos] = size += p.getValue().length;
+
+            vals[keyPos++] = p.getValue();
+        }
+
+        maxPos = size + 1;
+    }
+
+    /**
+     * Retrieves and removes the head of this queue,
+     * or returns {@code false} if this queue is empty.
+     *
+     * @return {@code true} if {@link #next} return non empty result, or {@code false} if this queue is empty
+     */
+    public boolean next(Result<K, V> res) {
+        int absPos = pos.getAndIncrement();
+
+        if (absPos >= maxPos) {
+            res.set(null, null, 0);
+
+            return false;
+        }
+
+        int segment = res.getSegment();
+
+        if (absPos > lenSeq[segment]) {
+            segment = Arrays.binarySearch(lenSeq, segment, lenSeq.length - 1, absPos);
+
+            segment = segment < 0 ? -segment - 1 : segment;
+        }
+
+        int relPos = segment == 0 ? absPos : (absPos - lenSeq[segment - 1] - 1);
+
+        K key = keysArr[segment];
+
+        res.set(key, vals[segment][relPos], segment);
+
+        return true;
+    }
+
+    /**
+     * @return {@code true} if empty.
+     */
+    public boolean isEmpty() {
+        return pos.get() >= maxPos;
+    }
+
+    /**
+     * @return Constant initialisation size.
+     */
+    public int initialSize() {
+        return maxPos;
+    }
+
+    /** State holder. */
+    public static class Result<K, V> {
+        /** Current segment. */
+        private int segment;
+
+        /** Key holder. */
+        private K key;
+
+        /** Value holeder. */
+        private V val;
+
+        /** Current state setter. */
+        public void set(K k, V v, int seg) {
+            key = k;
+            val = v;
+            segment = seg;
+        }
+
+        /** Current segment. */
+        private int getSegment() {
+            return segment;
+        }
+
+        /** Current key. */
+        public K getKey() {
+            return key;
+        }
+
+        /** Current value. */
+        public V getValue() {
+            return val;
+        }
+
+        /** */
+        @Override public String toString() {
+            return S.toString(Result.class, this);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridReadOnlyArrayView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridReadOnlyArrayView.java
deleted file mode 100644
index 14ae0d5..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridReadOnlyArrayView.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*      http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.ignite.internal.util;
-
-import java.util.AbstractCollection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import org.apache.ignite.internal.util.typedef.internal.A;
-
-/**
- * Provides read-only collection interface to objects subarray.
- */
-public class GridReadOnlyArrayView<T> extends AbstractCollection<T> {
-    /** Array. */
-    private final T[] arr;
-
-    /** Array index view starts from (inclusive). */
-    private final int from;
-
-    /** Array index view ends with (exclusive). */
-    private final int to;
-
-    /**
-     * @param arr Array.
-     * @param from Array index view starts from (inclusive).
-     * @param to Array index view ends with (exclusive).
-     */
-    public GridReadOnlyArrayView(T[] arr, int from, int to) {
-        A.ensure(from <= to, "Lower bound is greater than upper bound [from=" + from + ", to=" + to + ']');
-
-        this.arr = arr;
-        this.from = from;
-        this.to = to;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return to - from;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<T> iterator() {
-        return new Itr();
-    }
-
-    /**
-     * Iterator.
-     */
-    private class Itr implements Iterator<T> {
-        /** Cursor index. */
-        int cursor = from;
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return cursor < to;
-        }
-
-        /** {@inheritDoc} */
-        @Override public T next() {
-            if (cursor >= to)
-                throw new NoSuchElementException();
-
-            return arr[cursor++];
-        }
-    }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index 318cb61..691c35f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -245,7 +245,8 @@ public class IgnitePdsCorruptedStoreTest extends GridCommonAbstractTest {
 
         MetaStorage metaStorage = ignite.context().cache().context().database().metaStorage();
 
-        corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID, PageIdAllocator.METASTORE_PARTITION);
+        corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID,
+            PageIdAllocator.METASTORE_PARTITION);
 
         stopGrid(0);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueueTest.java
new file mode 100644
index 0000000..743ed0b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/GridConcurrentMultiPairQueueTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * GridConcurrentMultiPairQueue test.
+ **/
+public class GridConcurrentMultiPairQueueTest extends GridCommonAbstractTest {
+    /** */
+    GridConcurrentMultiPairQueue<Integer, Integer> queue;
+
+    /** */
+    GridConcurrentMultiPairQueue<Integer, Integer> queue2;
+
+    /** */
+    Map<Integer, Collection<Integer>> mapForCheck;
+
+    /** */
+    Map<Integer, Collection<Integer>>mapForCheck2;
+
+    /** */
+    Integer[] arr2 = {2, 4};
+
+    /** */
+    Integer[] arr1 = {1, 3, 5, 7, 9, 11, 13, 15, 17, 19};
+
+    /** */
+    Integer[] arr4 = {};
+
+    /** */
+    Integer[] arr5 = {};
+
+    /** */
+    Integer[] arr3 = {100, 200, 300, 400, 500, 600, 600, 700};
+
+    /** */
+    Integer[] arr6 = {};
+
+    /** */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        Collection<T2<Integer, Integer[]>> keyWithArr = new HashSet<>();
+
+        mapForCheck = new ConcurrentHashMap<>();
+
+        mapForCheck2 = new ConcurrentHashMap<>();
+
+        keyWithArr.add(new T2<>(10, arr2));
+        keyWithArr.add(new T2<>(20, arr1));
+        keyWithArr.add(new T2<>(30, arr4));
+        keyWithArr.add(new T2<>(40, arr5));
+        keyWithArr.add(new T2<>(50, arr3));
+        keyWithArr.add(new T2<>(60, arr6));
+
+        mapForCheck.put(10, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+        mapForCheck.put(20, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+        mapForCheck.put(50, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+        mapForCheck2.put(10, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+        mapForCheck2.put(20, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+        mapForCheck2.put(50, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+        queue = new GridConcurrentMultiPairQueue<>(keyWithArr);
+
+        Map<Integer, Collection<Integer>> keyWithColl = new HashMap<>();
+
+        keyWithColl.put(10, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+        keyWithColl.put(20, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+        keyWithColl.put(30, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr4))));
+        keyWithColl.put(40, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr5))));
+        keyWithColl.put(50, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+        keyWithColl.put(60, Collections.synchronizedCollection(new ArrayList<>(Arrays.asList(arr6))));
+
+        queue2 = new GridConcurrentMultiPairQueue<>(keyWithColl);
+    }
+
+    /** */
+    @Test
+    public void testGridConcurrentMultiPairQueueCorrectness() throws Exception {
+        GridTestUtils.runMultiThreaded(() -> {
+            GridConcurrentMultiPairQueue.Result<Integer, Integer> res =
+                new GridConcurrentMultiPairQueue.Result<>();
+
+            while (queue.next(res)) {
+                assertTrue(mapForCheck.containsKey(res.getKey()));
+
+                assertTrue(mapForCheck.get(res.getKey()).remove(res.getValue()));
+
+                Collection<Integer> coll = mapForCheck.get(res.getKey());
+
+                if (coll != null && coll.isEmpty())
+                    mapForCheck.remove(res.getKey(), coll);
+            }
+        }, ThreadLocalRandom.current().nextInt(1, 20), "GridConcurrentMultiPairQueue arr test");
+
+        assertTrue(mapForCheck.isEmpty());
+
+        assertTrue(queue.isEmpty());
+
+        assertTrue(queue.initialSize() == arr1.length + arr2.length + arr3.length + arr4.length);
+
+        GridTestUtils.runMultiThreaded(() -> {
+            GridConcurrentMultiPairQueue.Result<Integer, Integer> res =
+                new GridConcurrentMultiPairQueue.Result<>();
+
+            while (queue2.next(res)) {
+                assertTrue(mapForCheck2.containsKey(res.getKey()));
+
+                assertTrue(mapForCheck2.get(res.getKey()).remove(res.getValue()));
+
+                Collection<Integer> coll = mapForCheck2.get(res.getKey());
+
+                if (coll != null && coll.isEmpty())
+                    mapForCheck2.remove(res.getKey(), coll);
+            }
+        }, ThreadLocalRandom.current().nextInt(1, 20), "GridConcurrentMultiPairQueue coll test");
+
+        assertTrue(mapForCheck2.isEmpty());
+
+        assertTrue(queue2.isEmpty());
+
+        assertTrue(queue2.initialSize() == arr1.length + arr2.length + arr3.length + arr4.length);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 7168e0b..772d234 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -373,7 +374,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
 
             ref = this;
 
-            arr = new SelfReferencedJob[] {this, this};
+            arr = new SelfReferencedJob[]{this, this};
 
             col = asList(this, this, this);
 
@@ -1197,7 +1198,7 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
                     if (Integer.valueOf(1).equals(i))
                         throw new IgniteCheckedException(expectedException);
 
-                    return  null;
+                    return null;
                 }
             );
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index ba8f3fe..c6d3705 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtilsSelfTest;
 import org.apache.ignite.internal.util.GridArraysSelfTest;
 import org.apache.ignite.internal.util.GridCountDownCallbackTest;
 import org.apache.ignite.internal.util.IgniteDevOnlyLogTest;
+import org.apache.ignite.internal.util.GridConcurrentMultiPairQueueTest;
 import org.apache.ignite.internal.util.IgniteExceptionRegistrySelfTest;
 import org.apache.ignite.internal.util.IgniteUtilsSelfTest;
 import org.apache.ignite.internal.util.nio.GridNioDelimitedBufferSelfTest;
@@ -100,6 +101,7 @@ import org.junit.runners.Suite;
     GridTopologyHeapSizeSelfTest.class,
     GridTransientTest.class,
     IgniteDevOnlyLogTest.class,
+    GridConcurrentMultiPairQueueTest.class,
 
     // Sensitive toString.
     IncludeSensitiveAtomicTest.class,