You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/08/06 08:16:01 UTC

[GitHub] [ignite] tkalkirill commented on a change in pull request #9243: IGNITE-13558-3

tkalkirill commented on a change in pull request #9243:
URL: https://github.com/apache/ignite/pull/9243#discussion_r684011627



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
+
+            int cntr = 0;
+
+            // Group id -> (group ctx, completed partitions counter)
+            Map<Integer, T2<CacheGroupContext, AtomicInteger>> grps = new HashMap<>();
+
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new T2<>(ctx, new AtomicInteger()));
+
+                for (int i = 0; i < ctx.affinity().partitions(); i++) {
+                    partIds.get(cntr % poolSize).add(new GroupPartitionId(ctx.groupId(), i));
+
+                    cntr++;
+                }
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.submit(() -> {

Review comment:
       Since you are not using **Future**, you should replace it with `sysPool.execute(() -> {`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
+
+            int cntr = 0;
+
+            // Group id -> (group ctx, completed partitions counter)
+            Map<Integer, T2<CacheGroupContext, AtomicInteger>> grps = new HashMap<>();
+
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new T2<>(ctx, new AtomicInteger()));
+
+                for (int i = 0; i < ctx.affinity().partitions(); i++) {
+                    partIds.get(cntr % poolSize).add(new GroupPartitionId(ctx.groupId(), i));

Review comment:
       You can remove line 5551 and replace it with **partIds.get(cntr++ % poolSize).add(new GroupPartitionId(ctx.groupId(), i));**

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
+
+            int cntr = 0;
+
+            // Group id -> (group ctx, completed partitions counter)
+            Map<Integer, T2<CacheGroupContext, AtomicInteger>> grps = new HashMap<>();
+
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new T2<>(ctx, new AtomicInteger()));
+
+                for (int i = 0; i < ctx.affinity().partitions(); i++) {
+                    partIds.get(cntr % poolSize).add(new GroupPartitionId(ctx.groupId(), i));
+
+                    cntr++;
+                }
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.submit(() -> {
+                    List<GroupPartitionId> batch = partIds.get(batchIdx);
+
+                    Map<GroupPartitionId, Long> processed = new HashMap<>();
+
                     try {
-                        Map<Integer, Long> processed = grp.offheap().restorePartitionStates(partStates);
+                        for (GroupPartitionId grpPartId : batch) {
+                            T2<CacheGroupContext, AtomicInteger> grpTuple = grps.get(grpPartId.getGroupId());
+
+                            CacheGroupContext grpCtx = grpTuple.get1();
+
+                            try {
+                                long time = grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(), partStates);
+
+                                processed.put(grpPartId, time);
 
-                        totalProcessed.addAndGet(processed.size());
+                                totalProcessed.incrementAndGet();
+                            }
+                            catch (IgniteCheckedException | RuntimeException | Error e) {
+                                U.error(log, "Failed to restore partition state for " +
+                                    "groupName=" + grpCtx.name() + " groupId=" + grpCtx.groupId(), e);
 
+                                IgniteCheckedException ex = e instanceof IgniteCheckedException
+                                    ? ((IgniteCheckedException)e)
+                                    : new IgniteCheckedException(e);
+
+                                if (!restoreStateError.compareAndSet(null, ex))
+                                    restoreStateError.get().addSuppressed(ex);
+                            }
+                            finally {
+                                completionLatch.countDown();
+
+                                AtomicInteger completedCntr = grpTuple.get2();
+
+                                if (completedCntr.incrementAndGet() == grpCtx.affinity().partitions())
+                                    grpCtx.offheap().confirmPartitionStatesRestored();
+                            }
+                        }
+                    }
+                    finally {

Review comment:
       This is not correct, since the buckets can be large and when outputting on line 5642, we will not see the actual information.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
+
+            int cntr = 0;
+
+            // Group id -> (group ctx, completed partitions counter)
+            Map<Integer, T2<CacheGroupContext, AtomicInteger>> grps = new HashMap<>();

Review comment:
       I think now the `AtomicLong totalProcessed = new AtomicLong();` is not needed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
+
+            int cntr = 0;
+
+            // Group id -> (group ctx, completed partitions counter)
+            Map<Integer, T2<CacheGroupContext, AtomicInteger>> grps = new HashMap<>();
+
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new T2<>(ctx, new AtomicInteger()));
+
+                for (int i = 0; i < ctx.affinity().partitions(); i++) {
+                    partIds.get(cntr % poolSize).add(new GroupPartitionId(ctx.groupId(), i));
+
+                    cntr++;
+                }
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.submit(() -> {
+                    List<GroupPartitionId> batch = partIds.get(batchIdx);

Review comment:
       To reduce the heap pressure, please make: `partIds.remove(batchIdx);`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
##########
@@ -89,6 +89,19 @@
      */
     public void stop();
 
+    /**
+     * Pre-create single partition that resides in page memory or WAL and restores their state.
+     *
+     * @param p Partition id.
+     * @param partRecoveryStates Partition recovery states.
+     * @return Processing time in millis.
+     * @throws IgniteCheckedException If failed.
+     */
+    long restoreStateOfPartition(
+        int p,
+        Map<GroupPartitionId, Integer> partRecoveryStates

Review comment:
       We only need a **recoverState** not the whole map.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
+
+            int cntr = 0;
+
+            // Group id -> (group ctx, completed partitions counter)
+            Map<Integer, T2<CacheGroupContext, AtomicInteger>> grps = new HashMap<>();
+
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new T2<>(ctx, new AtomicInteger()));
+
+                for (int i = 0; i < ctx.affinity().partitions(); i++) {
+                    partIds.get(cntr % poolSize).add(new GroupPartitionId(ctx.groupId(), i));
+
+                    cntr++;
+                }
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.submit(() -> {
+                    List<GroupPartitionId> batch = partIds.get(batchIdx);
+
+                    Map<GroupPartitionId, Long> processed = new HashMap<>();
+
                     try {
-                        Map<Integer, Long> processed = grp.offheap().restorePartitionStates(partStates);
+                        for (GroupPartitionId grpPartId : batch) {

Review comment:
       To reduce the heap pressure, please re-do this to queue or iterate with deletion.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());

Review comment:
       Better use the ArrayList.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
+
+            int cntr = 0;
+
+            // Group id -> (group ctx, completed partitions counter)
+            Map<Integer, T2<CacheGroupContext, AtomicInteger>> grps = new HashMap<>();

Review comment:
       There is no need to store the **CacheGroupContext**, you can always get it from `ctx.cache().cacheGroup(grpId)`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);

Review comment:
       Second list can just be a collection.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -615,133 +615,156 @@ else if (needSnapshot)
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> restorePartitionStates(
+    @Override public long restoreStateOfPartition(
+        int p,
         Map<GroupPartitionId, Integer> partRecoveryStates
     ) throws IgniteCheckedException {
         if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
             || partitionStatesRestored)
-            return Collections.emptyMap();
-
-        Map<Integer, Long> processed = new HashMap<>();
+            return 0;
 
         PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-        for (int p = 0; p < grp.affinity().partitions(); p++) {
-            Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
+        Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
 
-            long startTime = U.currentTimeMillis();
+        long startTime = U.currentTimeMillis();
 
-            if (log.isDebugEnabled())
-                log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+        long res = 0;
 
-            if (ctx.pageStore().exists(grp.groupId(), p)) {
-                ctx.pageStore().ensure(grp.groupId(), p);
+        if (log.isDebugEnabled())
+            log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
 
-                if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Skipping partition on recovery (pages less than 1) " +
-                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-                    }
-
-                    continue;
-                }
+        if (ctx.pageStore().exists(grp.groupId(), p)) {
+            ctx.pageStore().ensure(grp.groupId(), p);
 
+            if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
                 if (log.isDebugEnabled()) {
-                    log.debug("Creating partition on recovery (exists in page store) " +
+                    log.debug("Skipping partition on recovery (pages less than 1) " +
                         "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
                 }
 
-                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+                return 0;
+            }
 
-                // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
-                part.dataStore().init();
+            if (log.isDebugEnabled()) {
+                log.debug("Creating partition on recovery (exists in page store) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+            }
 
-                ctx.database().checkpointReadLock();
+            GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
 
-                try {
-                    long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
-                    long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+            // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
+            part.dataStore().init();
 
-                    try {
-                        long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
+            ctx.database().checkpointReadLock();
 
-                        boolean changed = false;
+            try {
+                long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+                long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
 
-                        try {
-                            PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+                try {
+                    long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
 
-                            if (recoverState != null) {
-                                changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue());
+                    boolean changed = false;
 
-                                updateState(part, recoverState);
+                    try {
+                        PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Restored partition state (from WAL) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() +
-                                        ", size=" + part.fullSize() + ']');
-                                }
-                            }
-                            else {
-                                int stateId = io.getPartitionState(pageAddr);
+                        if (recoverState != null) {
+                            changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue());
 
-                                updateState(part, stateId);
+                            updateState(part, recoverState);
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Restored partition state (from page memory) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
-                                        ", size=" + part.fullSize() + ']');
-                                }
+                            if (log.isDebugEnabled()) {
+                                log.debug("Restored partition state (from WAL) " +
+                                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                    ", updCntr=" + part.initialUpdateCounter() +
+                                    ", size=" + part.fullSize() + ']');
                             }
                         }
-                        finally {
-                            pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
+                        else {
+                            int stateId = io.getPartitionState(pageAddr);
+
+                            updateState(part, stateId);
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Restored partition state (from page memory) " +
+                                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                    ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
+                                    ", size=" + part.fullSize() + ']');
+                            }
                         }
                     }
                     finally {
-                        pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+                        pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
                     }
                 }
                 finally {
-                    ctx.database().checkpointReadUnlock();
+                    pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
                 }
-
-                processed.put(p, U.currentTimeMillis() - startTime);
             }
-            else if (recoverState != null) { // Pre-create partition if having valid state.
-                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
 
-                updateState(part, recoverState);
+            res = U.currentTimeMillis() - startTime;
+        }
+        else if (recoverState != null) { // Pre-create partition if having valid state.
+            GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
 
-                processed.put(p, U.currentTimeMillis() - startTime);
+            updateState(part, recoverState);
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Restored partition state (from WAL) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                        ", updCntr=" + part.initialUpdateCounter() +
-                        ", size=" + part.fullSize() + ']');
-                }
-            }
-            else {
-                if (log.isDebugEnabled()) {
-                    log.debug("Skipping partition on recovery (no page store OR wal state) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-                }
-            }
+            res = U.currentTimeMillis() - startTime;
 
             if (log.isDebugEnabled()) {
-                log.debug("Finished restoring partition state " +
-                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
-                    ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+                log.debug("Restored partition state (from WAL) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                    ", updCntr=" + part.initialUpdateCounter() +
+                    ", size=" + part.fullSize() + ']');
+            }
+        }
+        else {
+            if (log.isDebugEnabled()) {
+                log.debug("Skipping partition on recovery (no page store OR wal state) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
             }
         }
 
-        partitionStatesRestored = true;
+        if (log.isDebugEnabled()) {
+            log.debug("Finished restoring partition state " +
+                "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+                ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, Long> restorePartitionStates(
+        Map<GroupPartitionId, Integer> partRecoveryStates
+    ) throws IgniteCheckedException {
+        if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
+            || partitionStatesRestored)
+            return Collections.emptyMap();
+
+        Map<Integer, Long> processed = new HashMap<>();

Review comment:
       I don’t see the result used, I think it is not needed at all.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
##########
@@ -100,6 +113,12 @@
         Map<GroupPartitionId, Integer> partRecoveryStates
     ) throws IgniteCheckedException;
 
+    /**
+     * Confirm that partition states are restored. This method should be called after restoring partitions using

Review comment:
       It should be noted that this method should be called only after restoring the state of all partitions in the group.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -615,133 +615,156 @@ else if (needSnapshot)
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> restorePartitionStates(
+    @Override public long restoreStateOfPartition(
+        int p,
         Map<GroupPartitionId, Integer> partRecoveryStates
     ) throws IgniteCheckedException {
         if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled()
             || partitionStatesRestored)
-            return Collections.emptyMap();
-
-        Map<Integer, Long> processed = new HashMap<>();
+            return 0;
 
         PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-        for (int p = 0; p < grp.affinity().partitions(); p++) {
-            Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
+        Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
 
-            long startTime = U.currentTimeMillis();
+        long startTime = U.currentTimeMillis();
 
-            if (log.isDebugEnabled())
-                log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+        long res = 0;
 
-            if (ctx.pageStore().exists(grp.groupId(), p)) {
-                ctx.pageStore().ensure(grp.groupId(), p);
+        if (log.isDebugEnabled())
+            log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
 
-                if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Skipping partition on recovery (pages less than 1) " +
-                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-                    }
-
-                    continue;
-                }
+        if (ctx.pageStore().exists(grp.groupId(), p)) {
+            ctx.pageStore().ensure(grp.groupId(), p);
 
+            if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
                 if (log.isDebugEnabled()) {
-                    log.debug("Creating partition on recovery (exists in page store) " +
+                    log.debug("Skipping partition on recovery (pages less than 1) " +
                         "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
                 }
 
-                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+                return 0;
+            }
 
-                // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
-                part.dataStore().init();
+            if (log.isDebugEnabled()) {
+                log.debug("Creating partition on recovery (exists in page store) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
+            }
 
-                ctx.database().checkpointReadLock();
+            GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
 
-                try {
-                    long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
-                    long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
+            // Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
+            part.dataStore().init();
 
-                    try {
-                        long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
+            ctx.database().checkpointReadLock();
 
-                        boolean changed = false;
+            try {
+                long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
+                long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
 
-                        try {
-                            PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
+                try {
+                    long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
 
-                            if (recoverState != null) {
-                                changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue());
+                    boolean changed = false;
 
-                                updateState(part, recoverState);
+                    try {
+                        PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Restored partition state (from WAL) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() +
-                                        ", size=" + part.fullSize() + ']');
-                                }
-                            }
-                            else {
-                                int stateId = io.getPartitionState(pageAddr);
+                        if (recoverState != null) {
+                            changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue());
 
-                                updateState(part, stateId);
+                            updateState(part, recoverState);
 
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Restored partition state (from page memory) " +
-                                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                                        ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
-                                        ", size=" + part.fullSize() + ']');
-                                }
+                            if (log.isDebugEnabled()) {
+                                log.debug("Restored partition state (from WAL) " +
+                                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                    ", updCntr=" + part.initialUpdateCounter() +
+                                    ", size=" + part.fullSize() + ']');
                             }
                         }
-                        finally {
-                            pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
+                        else {
+                            int stateId = io.getPartitionState(pageAddr);
+
+                            updateState(part, stateId);
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Restored partition state (from page memory) " +
+                                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                                    ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId +
+                                    ", size=" + part.fullSize() + ']');
+                            }
                         }
                     }
                     finally {
-                        pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
+                        pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
                     }
                 }
                 finally {
-                    ctx.database().checkpointReadUnlock();
+                    pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
                 }
-
-                processed.put(p, U.currentTimeMillis() - startTime);
             }
-            else if (recoverState != null) { // Pre-create partition if having valid state.
-                GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
 
-                updateState(part, recoverState);
+            res = U.currentTimeMillis() - startTime;
+        }
+        else if (recoverState != null) { // Pre-create partition if having valid state.
+            GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
 
-                processed.put(p, U.currentTimeMillis() - startTime);
+            updateState(part, recoverState);
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Restored partition state (from WAL) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
-                        ", updCntr=" + part.initialUpdateCounter() +
-                        ", size=" + part.fullSize() + ']');
-                }
-            }
-            else {
-                if (log.isDebugEnabled()) {
-                    log.debug("Skipping partition on recovery (no page store OR wal state) " +
-                        "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
-                }
-            }
+            res = U.currentTimeMillis() - startTime;
 
             if (log.isDebugEnabled()) {
-                log.debug("Finished restoring partition state " +
-                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
-                    ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+                log.debug("Restored partition state (from WAL) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
+                    ", updCntr=" + part.initialUpdateCounter() +
+                    ", size=" + part.fullSize() + ']');
+            }
+        }
+        else {
+            if (log.isDebugEnabled()) {
+                log.debug("Skipping partition on recovery (no page store OR wal state) " +
+                    "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']');
             }
         }
 
-        partitionStatesRestored = true;
+        if (log.isDebugEnabled()) {
+            log.debug("Finished restoring partition state " +
+                "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
+                ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']');
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, Long> restorePartitionStates(
+        Map<GroupPartitionId, Integer> partRecoveryStates

Review comment:
       Is this argument needed if an empty map is passed to it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org