You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/11/18 11:53:46 UTC

[ignite] branch master updated: IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 35a6e5b  IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327.
35a6e5b is described below

commit 35a6e5b0c228e86b637bc2dcfdbfcd166836671f
Author: denis-chudov <mo...@gmail.com>
AuthorDate: Thu Nov 18 14:49:08 2021 +0300

    IGNITE-13558 Partitions restore process parallelization on node startup - Fixes #9327.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../processors/cache/GridCacheProcessor.java       | 162 +++++++++++++------
 .../cache/IgniteCacheOffheapManager.java           |  23 ++-
 .../cache/IgniteCacheOffheapManagerImpl.java       |  17 +-
 .../GridCacheDatabaseSharedManager.java            |   2 +-
 .../cache/persistence/GridCacheOffheapManager.java | 173 +++++++++++----------
 .../cache/RestorePartitionStateTest.java           |   8 +-
 6 files changed, 239 insertions(+), 146 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 5ff1d3e..346ca69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -41,7 +41,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -5552,66 +5552,75 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
+            Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new ConcurrentHashMap<>();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = grp.offheap().restorePartitionStates(partStates);
+            final int topPartRefLimit = 5;
 
-                        totalProcessed.addAndGet(processed.size());
+            for (CacheGroupContext grpCtx : forGroups) {
+                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+                    final int partId = i;
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+                    sysPool.execute(() -> {
+                        GroupPartitionId grpPartId = new GroupPartitionId(grpCtx.groupId(), partId);
 
-                            long ts = System.currentTimeMillis();
+                        try {
+                            long time = grpCtx.offheap().restoreStateOfPartition(partId, partStates.get(grpPartId));
 
-                            for (Map.Entry<Integer, Long> e : processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new GroupPartitionId(grp.groupId(), e.getKey())));
+                            if (log.isInfoEnabled()) {
+                                T3<Long, Long, GroupPartitionId> curPart = new T3<>(time, U.currentTimeMillis(), grpPartId);
 
-                                trimToSize(top, 5);
-                            }
+                                RestorePartitionStateThreadContext threadCtx = threadCtxs.computeIfAbsent(
+                                    Thread.currentThread(),
+                                    t -> new RestorePartitionStateThreadContext()
+                                );
+
+                                Comparator<T3<Long, Long, GroupPartitionId>> cmp = processedPartitionComparator();
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                                threadCtx.topPartRef.updateAndGet(prev -> {
+                                    if (prev == null ||
+                                        cmp.compare(prev.last(), curPart) < 0) {
+                                        SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(cmp);
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : top0) {
-                                    top.add(t2);
+                                        top.add(curPart);
 
-                                    trimToSize(top, 5);
-                                }
+                                        if (prev != null)
+                                            top.addAll(prev);
 
-                                return top;
-                            });
+                                        trimToSize(top, topPartRefLimit);
+
+                                        return top;
+                                    }
+                                    else
+                                        return prev;
+                                });
+
+                                threadCtx.incrementProcessedCnt();
+                            }
                         }
-                    }
-                    catch (IgniteCheckedException | RuntimeException | Error e) {
-                        U.error(log, "Failed to restore partition state for " +
-                            "groupName=" + grp.name() + " groupId=" + grp.groupId(), e);
+                        catch (IgniteCheckedException | RuntimeException | Error e) {
+                            U.error(log, "Failed to restore partition state for " +
+                                "groupName=" + grpCtx.name() + " groupId=" + grpCtx.groupId(), e);
 
-                        restoreStateError.compareAndSet(
-                            null,
-                            e instanceof IgniteCheckedException
+                            IgniteCheckedException ex = e instanceof IgniteCheckedException
                                 ? ((IgniteCheckedException)e)
-                                : new IgniteCheckedException(e)
-                        );
-                    }
-                    finally {
-                        completionLatch.countDown();
-                    }
-                });
+                                : new IgniteCheckedException(e);
+
+                            if (!restoreStateError.compareAndSet(null, ex))
+                                restoreStateError.get().addSuppressed(ex);
+                        }
+                        finally {
+                            completionLatch.countDown();
+                        }
+                    });
+                }
             }
 
             boolean printTop = false;
@@ -5625,15 +5634,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     while (!completionLatch.await(timeout, TimeUnit.MILLISECONDS)) {
                         if (log.isInfoEnabled()) {
-                            @Nullable SortedSet<T3<Long, Long, GroupPartitionId>> top = topPartRef.get();
+                            SortedSet<T3<Long, Long, GroupPartitionId>> top =
+                                collectTopProcessedParts(threadCtxs.values(), topPartRefLimit);
+
+                            long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();
 
                             log.info("Restore partitions state progress [grpCnt=" +
                                 (forGroups.size() - completionLatch.getCount()) + '/' + forGroups.size() +
-                                ", partitionCnt=" + totalProcessed.get() + '/' + totalPart + (top == null ? "" :
+                                ", partitionCnt=" + totalProcessed + '/' + totalPart + (top.isEmpty() ? "" :
                                 ", topProcessedPartitions=" + toStringTopProcessingPartitions(top, forGroups)) + ']');
                         }
 
                         timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS / 5;
+
                         printTop = true;
                     }
                 }
@@ -5642,16 +5655,22 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 throw new IgniteInterruptedException(e);
             }
 
+            for (CacheGroupContext grpCtx : forGroups)
+                grpCtx.offheap().confirmPartitionStatesRestored();
+
             // Checking error after all task applied.
             if (restoreStateError.get() != null)
                 throw restoreStateError.get();
 
             if (log.isInfoEnabled()) {
-                SortedSet<T3<Long, Long, GroupPartitionId>> t = printTop ? topPartRef.get() : null;
+                SortedSet<T3<Long, Long, GroupPartitionId>> t =
+                    printTop ? collectTopProcessedParts(threadCtxs.values(), topPartRefLimit) : null;
+
+                long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();
 
                 log.info("Finished restoring partition state for local groups [" +
                     "groupsProcessed=" + forGroups.size() +
-                    ", partitionsProcessed=" + totalProcessed.get() +
+                    ", partitionsProcessed=" + totalProcessed +
                     ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startRestorePart) +
                     (t == null ? "" : ", topProcessedPartitions=" + toStringTopProcessingPartitions(t, forGroups)) +
                     "]");
@@ -5659,6 +5678,30 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         /**
+         * Collects top processed partitions from thread local contexts of restore partition state process.
+         *
+         * @param threadCtxs Thread local contexts.
+         * @param topPartRefLimit Limit of top partitions collection size.
+         */
+        private SortedSet<T3<Long, Long, GroupPartitionId>> collectTopProcessedParts(
+            Collection<RestorePartitionStateThreadContext> threadCtxs,
+            int topPartRefLimit
+        ) {
+            SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(processedPartitionComparator());
+
+            for (RestorePartitionStateThreadContext threadCtx : threadCtxs) {
+                SortedSet<T3<Long, Long, GroupPartitionId>> threadTop = threadCtx.topPartRef.get();
+
+                if (threadTop != null)
+                    top.addAll(threadTop);
+            }
+
+            trimToSize(top, topPartRefLimit);
+
+            return top;
+        }
+
+        /**
          * Start warming up sequentially for each persist data region.
          *
          * @throws IgniteCheckedException If failed.
@@ -5972,13 +6015,36 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Comparator of processed partitions.
      * T3 -> 1 - duration, 2 - timestamp, 3 - partition of group.
-     * Sort order: duration -> timestamp -> partition of group.
+     * Sort order: duration -> timestamp (reversed order) -> partition of group.
      *
      * @return Comparator.
      */
     static Comparator<T3<Long, Long, GroupPartitionId>> processedPartitionComparator() {
         Comparator<T3<Long, Long, GroupPartitionId>> comp = Comparator.comparing(T3::get1);
 
-        return comp.thenComparing(T3::get2).thenComparing(T3::get3);
+        return comp.thenComparing(T3::get2, Comparator.reverseOrder()).thenComparing(T3::get3);
+    }
+
+    /**
+     * Thread local context of restore partition state progress.
+     */
+    private static class RestorePartitionStateThreadContext {
+        /** Field updater. */
+        static final AtomicLongFieldUpdater<RestorePartitionStateThreadContext> PROCESSED_CNT_UPD =
+            AtomicLongFieldUpdater.newUpdater(RestorePartitionStateThreadContext.class, "processedCnt");
+
+        /** Top partitions by processing time. */
+        final AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef =
+            new AtomicReference<>();
+
+        /** Processed partitions count. It is always updated from the same thread. */
+        volatile long processedCnt = 0;
+
+        /**
+         * Increment {@code processedCnt} field.
+         */
+        void incrementProcessedCnt() {
+            PROCESSED_CNT_UPD.incrementAndGet(this);
+        }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index bc08c15..d206164 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
@@ -90,15 +89,27 @@ public interface IgniteCacheOffheapManager {
     public void stop();
 
     /**
+     * Pre-create single partition that resides in page memory or WAL and restores their state.
+     *
+     * @param p Partition id.
+     * @param recoveryState Partition recovery state.
+     * @return Processing time in millis.
+     * @throws IgniteCheckedException If failed.
+     */
+    long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException;
+
+    /**
      * Pre-create partitions that resides in page memory or WAL and restores their state.
      *
-     * @param partRecoveryStates Partition recovery states.
-     * @return Processed partitions: partition id -> processing time in millis.
      * @throws IgniteCheckedException If failed.
      */
-    Map<Integer, Long> restorePartitionStates(
-        Map<GroupPartitionId, Integer> partRecoveryStates
-    ) throws IgniteCheckedException;
+    void restorePartitionStates() throws IgniteCheckedException;
+
+    /**
+     * Confirm that partition states are restored. This method should be called after restoring state of all partitions
+     * in group using {@link #restoreStateOfPartition(int, Integer)}.
+     */
+    void confirmPartitionStatesRestored();
 
     /**
      * @param entry Cache entry.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 8507b57..46ccede 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -68,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
@@ -264,10 +263,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> restorePartitionStates(
-        Map<GroupPartitionId, Integer> partRecoveryStates
-    ) throws IgniteCheckedException {
-        return Collections.emptyMap(); // No-op.
+    @Override public long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restorePartitionStates() throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void confirmPartitionStatesRestored() {
+        // No-op.
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index a63fdf5..ed92638 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1434,7 +1434,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     cctx.database().checkpointReadLock();
 
                     try {
-                        cacheGroup.offheap().restorePartitionStates(Collections.emptyMap());
+                        cacheGroup.offheap().restorePartitionStates();
 
                         if (cacheGroup.localStartVersion().equals(fut.initialVersion()))
                             cacheGroup.topology().afterStateRestored(fut.initialVersion());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 2532ae5..1319ae1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -623,131 +623,140 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> restorePartitionStates(
-        Map<GroupPartitionId, Integer> partRecoveryStates
-    ) throws IgniteCheckedException {
+    @Override public long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException {
         if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
             || partitionStatesRestored)
-            return Collections.emptyMap();
-
-        Map<Integer, Long> processed = new HashMap<>();
+            return 0;
 
         PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-        for (int p = 0; p < grp.affinity().partitions(); p++) {
-            Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
-
-            long startTime = U.currentTimeMillis();
+        long startTime = U.currentTimeMillis();
 
-            if (log.isDebugEnabled())
-                log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-
-            if (ctx.pageStore().exists(grp.groupId(), p)) {
-                ctx.pageStore().ensure(grp.groupId(), p);
+        long res = 0;
 
-                if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Skipping partition on recovery (pages less than 1) " +
-                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-                    }
+        if (log.isDebugEnabled())
+            log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
 
-                    continue;
-                }
+        if (ctx.pageStore().exists(grp.groupId(), p)) {
+            ctx.pageStore().ensure(grp.groupId(), p);
 
+            if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
                 if (log.isDebugEnabled()) {
-                    log.debug("Creating partition on recovery (exists in page store) " +
+                    log.debug("Skipping partition on recovery (pages less than or equals 1) " +
                         "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
                 }
 
-                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+                return 0;
+            }
 
-                // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
-                part.dataStore().init();
+            if (log.isDebugEnabled()) {
+                log.debug("Creating partition on recovery (exists in page store) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+            }
 
-                ctx.database().checkpointReadLock();
+            GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
 
-                try {
-                    long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
-                    long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+            // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
+            part.dataStore().init();
 
-                    try {
-                        long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
+            ctx.database().checkpointReadLock();
 
-                        boolean changed = false;
+            try {
+                long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+                long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
 
-                        try {
-                            PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+                try {
+                    long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
 
-                            if (recoverState != null) {
-                                changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue());
+                    boolean changed = false;
 
-                                updateState(part, recoverState);
+                    try {
+                        PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Restored partition state (from WAL) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() +
-                                        ", size=" + part.fullSize() + ']');
-                                }
-                            }
-                            else {
-                                int stateId = io.getPartitionState(pageAddr);
+                        if (recoveryState != null) {
+                            changed = io.setPartitionState(pageAddr, (byte)recoveryState.intValue());
 
-                                updateState(part, stateId);
+                            updateState(part, recoveryState);
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Restored partition state (from page memory) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
-                                        ", size=" + part.fullSize() + ']');
-                                }
+                            if (log.isDebugEnabled()) {
+                                log.debug("Restored partition state (from WAL) " +
+                                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                    ", updCntr=" + part.initialUpdateCounter() +
+                                    ", size=" + part.fullSize() + ']');
                             }
                         }
-                        finally {
-                            pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
+                        else {
+                            int stateId = io.getPartitionState(pageAddr);
+
+                            updateState(part, stateId);
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Restored partition state (from page memory) " +
+                                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                    ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
+                                    ", size=" + part.fullSize() + ']');
+                            }
                         }
                     }
                     finally {
-                        pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+                        pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
                     }
                 }
                 finally {
-                    ctx.database().checkpointReadUnlock();
+                    pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
                 }
-
-                processed.put(p, U.currentTimeMillis() - startTime);
             }
-            else if (recoverState != null) { // Pre-create partition if having valid state.
-                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
 
-                updateState(part, recoverState);
+            res = U.currentTimeMillis() - startTime;
+        }
+        else if (recoveryState != null) { // Pre-create partition if having valid state.
+            GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
 
-                processed.put(p, U.currentTimeMillis() - startTime);
+            updateState(part, recoveryState);
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Restored partition state (from WAL) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                        ", updCntr=" + part.initialUpdateCounter() +
-                        ", size=" + part.fullSize() + ']');
-                }
-            }
-            else {
-                if (log.isDebugEnabled()) {
-                    log.debug("Skipping partition on recovery (no page store OR wal state) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-                }
-            }
+            res = U.currentTimeMillis() - startTime;
 
             if (log.isDebugEnabled()) {
-                log.debug("Finished restoring partition state " +
-                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
-                    ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+                log.debug("Restored partition state (from WAL) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                    ", updCntr=" + part.initialUpdateCounter() +
+                    ", size=" + part.fullSize() + ']');
+            }
+        }
+        else {
+            if (log.isDebugEnabled()) {
+                log.debug("Skipping partition on recovery (no page store OR wal state) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
             }
         }
 
-        partitionStatesRestored = true;
+        if (log.isDebugEnabled()) {
+            log.debug("Finished restoring partition state " +
+                "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+                ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+        }
+
+        return res;
+    }
 
-        return processed;
+    /** {@inheritDoc} */
+    @Override public void restorePartitionStates() throws IgniteCheckedException {
+        if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
+            || partitionStatesRestored)
+            return;
+
+        for (int p = 0; p < grp.affinity().partitions(); p++)
+            restoreStateOfPartition(p, null);
+
+        confirmPartitionStatesRestored();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void confirmPartitionStatesRestored() {
+        partitionStatesRestored = true;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
index 319d885..eecd28d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/RestorePartitionStateTest.java
@@ -145,12 +145,12 @@ public class RestorePartitionStateTest extends GridCommonAbstractTest {
     @Test
     public void testProcessedPartitionComparator() {
         List<T3<Long, Long, GroupPartitionId>> exp = F.asList(
-            new T3<>(0L, 0L, new GroupPartitionId(0, 0)),
+            new T3<>(0L, 2L, new GroupPartitionId(0, 0)),
             new T3<>(0L, 1L, new GroupPartitionId(0, 0)),
             new T3<>(1L, 1L, new GroupPartitionId(0, 0)),
-            new T3<>(1L, 2L, new GroupPartitionId(0, 0)),
-            new T3<>(1L, 2L, new GroupPartitionId(1, 0)),
-            new T3<>(1L, 2L, new GroupPartitionId(1, 1))
+            new T3<>(1L, 0L, new GroupPartitionId(0, 0)),
+            new T3<>(1L, 0L, new GroupPartitionId(1, 0)),
+            new T3<>(1L, 0L, new GroupPartitionId(1, 1))
         );
 
         TreeSet<T3<Long, Long, GroupPartitionId>> act = new TreeSet<>(processedPartitionComparator());