You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2023/03/28 07:07:37 UTC

[ignite] branch master updated: IGNITE-19052 Introduce IncrementalSnapshotProcessor (#10600)

This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 307ebcc5d3a IGNITE-19052 Introduce IncrementalSnapshotProcessor (#10600)
307ebcc5d3a is described below

commit 307ebcc5d3a7744dd9cf1a9cac103a776209c084
Author: Maksim Timonin <ti...@apache.org>
AuthorDate: Tue Mar 28 10:07:28 2023 +0300

    IGNITE-19052 Introduce IncrementalSnapshotProcessor (#10600)
---
 .../snapshot/IncrementalSnapshotProcessor.java     | 288 +++++++++++++++++++++
 .../snapshot/SnapshotRestoreProcess.java           | 246 +++---------------
 2 files changed, 317 insertions(+), 217 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
new file mode 100644
index 00000000000..2ca529e086a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
+import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotStartRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_FINISH_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_START_RECORD;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
+
+/** Processes incremental snapshot: parse WAL segments and handles records. */
+abstract class IncrementalSnapshotProcessor {
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Snapshot name. */
+    private final String snpName;
+
+    /** Snapshot path. */
+    private final String snpPath;
+
+    /** Incremental snapshot index. */
+    private final int incIdx;
+
+    /** Snapshot cache IDs. */
+    private final Set<Integer> cacheIds;
+
+    /** */
+    IncrementalSnapshotProcessor(GridCacheSharedContext<?, ?> cctx, String snpName, String snpPath, int incIdx, Set<Integer> cacheIds) {
+        this.cctx = cctx;
+        this.snpName = snpName;
+        this.snpPath = snpPath;
+        this.incIdx = incIdx;
+        this.cacheIds = cacheIds;
+
+        log = cctx.logger(getClass());
+    }
+
+    /**
+     * Process incremental snapshot data.
+     *
+     * @param dataEntryHnd Handle data entries.
+     */
+    void process(@Nullable Consumer<DataEntry> dataEntryHnd) throws IgniteCheckedException, IOException {
+        IncrementalSnapshotMetadata meta = cctx.snapshotMgr()
+            .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx);
+
+        File[] segments = walSegments(meta.folderName());
+
+        totalWalSegments(segments.length);
+
+        UUID incSnpId = meta.requestId();
+
+        File lastSeg = Arrays.stream(segments)
+            .map(File::toPath)
+            .max(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex))
+            .orElseThrow(() -> new IgniteCheckedException("Last WAL segment wasn't found [snpName=" + snpName + ']'))
+            .toFile();
+
+        IncrementalSnapshotFinishRecord incSnpFinRec = readFinishRecord(lastSeg, incSnpId);
+
+        if (incSnpFinRec == null) {
+            throw new IgniteCheckedException("System WAL record for incremental snapshot wasn't found " +
+                "[id=" + incSnpId + ", walSegFile=" + lastSeg + ']');
+        }
+
+        LongAdder applied = new LongAdder();
+
+        initWalEntries(applied);
+        processedWalSegments(0);
+
+        Set<WALRecord.RecordType> recTypes = new HashSet<>(F.asList(
+            CLUSTER_SNAPSHOT,
+            INCREMENTAL_SNAPSHOT_START_RECORD,
+            INCREMENTAL_SNAPSHOT_FINISH_RECORD,
+            DATA_RECORD_V2));
+
+        // Create a single WAL iterator for 2 steps: finding ClusterSnapshotRecord and applying incremental snapshots.
+        // TODO: Fix it after resolving https://issues.apache.org/jira/browse/IGNITE-18718.
+        try (WALIterator it = walIter(log, recTypes, segments)) {
+            long startIdx = -1;
+
+            // Step 1. Skips applying WAL until base snapshot record has been reached.
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
+
+                WALRecord rec = walRec.getValue();
+
+                if (rec.type() == CLUSTER_SNAPSHOT) {
+                    if (((ClusterSnapshotRecord)rec).clusterSnapshotName().equals(snpName)) {
+                        startIdx = walRec.getKey().index();
+
+                        break;
+                    }
+                }
+            }
+
+            if (startIdx < 0) {
+                throw new IgniteCheckedException("System WAL record for full snapshot wasn't found " +
+                    "[snpName=" + snpName + ", walSegFile=" + segments[0] + ']');
+            }
+
+            UUID prevIncSnpId = incIdx > 1
+                ? cctx.snapshotMgr().readIncrementalSnapshotMetadata(snpName, snpPath, incIdx - 1).requestId()
+                : null;
+
+            IgnitePredicate<GridCacheVersion> txVerFilter = prevIncSnpId != null
+                ? txVer -> true : txVer -> !incSnpFinRec.excluded().contains(txVer);
+
+            long lastProcessedIdx = 0;
+
+            // Step 2. Apply incremental snapshots.
+            while (it.hasNext()) {
+                IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
+
+                long curIdx = walRec.getKey().index();
+
+                if (curIdx != lastProcessedIdx) {
+                    processedWalSegments((int)(curIdx - startIdx));
+
+                    lastProcessedIdx = curIdx;
+                }
+
+                WALRecord rec = walRec.getValue();
+
+                if (rec.type() == INCREMENTAL_SNAPSHOT_START_RECORD) {
+                    IncrementalSnapshotStartRecord startRec = (IncrementalSnapshotStartRecord)rec;
+
+                    if (startRec.id().equals(incSnpFinRec.id()))
+                        txVerFilter = v -> incSnpFinRec.included().contains(v);
+                }
+                else if (rec.type() == INCREMENTAL_SNAPSHOT_FINISH_RECORD) {
+                    IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)rec;
+
+                    if (finRec.id().equals(prevIncSnpId))
+                        txVerFilter = txVer -> !incSnpFinRec.excluded().contains(txVer);
+                }
+                else if (rec.type() == DATA_RECORD_V2) {
+                    DataRecord data = (DataRecord)rec;
+
+                    for (DataEntry e: data.writeEntries()) {
+                        // That is OK to restore only part of transaction related to a specified cache group,
+                        // because a full snapshot restoring does the same.
+                        if (!cacheIds.contains(e.cacheId()) || !txVerFilter.apply(e.nearXidVersion()))
+                            continue;
+
+                        if (dataEntryHnd != null)
+                            dataEntryHnd.accept(e);
+
+                        applied.increment();
+                    }
+                }
+            }
+
+            processedWalSegments(segments.length);
+        }
+    }
+
+    /** @return WAL segments to restore for specified incremental index since the base snapshot. */
+    private File[] walSegments(String folderName) throws IgniteCheckedException {
+        File[] segments = null;
+
+        for (int i = 1; i <= incIdx; i++) {
+            File incSnpDir = cctx.snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, i);
+
+            if (!incSnpDir.exists())
+                throw new IgniteCheckedException("Incremental snapshot doesn't exists [dir=" + incSnpDir + ']');
+
+            File incSnpWalDir = incrementalSnapshotWalsDir(incSnpDir, folderName);
+
+            if (!incSnpWalDir.exists())
+                throw new IgniteCheckedException("Incremental snapshot WAL directory doesn't exists [dir=" + incSnpWalDir + ']');
+
+            File[] incSegs = incSnpWalDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
+
+            if (incSegs == null)
+                throw new IgniteCheckedException("Failed to list WAL segments from snapshot directory [dir=" + incSnpDir + ']');
+
+            if (segments == null)
+                segments = incSegs;
+            else {
+                int segLen = segments.length;
+
+                segments = Arrays.copyOf(segments, segLen + incSegs.length);
+
+                System.arraycopy(incSegs, 0, segments, segLen, incSegs.length);
+            }
+        }
+
+        if (F.isEmpty(segments)) {
+            throw new IgniteCheckedException("No WAL segments found for incremental snapshot " +
+                "[snpName=" + snpName + ", snpPath=" + snpPath + ", incrementIndex=" + incIdx + ']');
+        }
+
+        return segments;
+    }
+
+    /** @return {@link IncrementalSnapshotFinishRecord} for specified snapshot, or {@code null} if not found. */
+    private @Nullable IncrementalSnapshotFinishRecord readFinishRecord(File segment, UUID incSnpId) throws IgniteCheckedException {
+        try (WALIterator it = walIter(log, Collections.singleton(INCREMENTAL_SNAPSHOT_FINISH_RECORD), segment)) {
+            while (it.hasNext()) {
+                IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)it.next().getValue();
+
+                if (finRec.id().equals(incSnpId))
+                    return finRec;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @param log Ignite logger.
+     * @param types WAL record types to read.
+     * @param segments WAL segments.
+     * @return Iterator over WAL segments.
+     */
+    private WALIterator walIter(IgniteLogger log, Set<WALRecord.RecordType> types, File... segments) throws IgniteCheckedException {
+        return new IgniteWalIteratorFactory(log)
+            .iterator(new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .filter((recType, recPtr) -> types.contains(recType))
+                .sharedContext(cctx)
+                .filesOrDirs(segments));
+    }
+
+    /**
+     * @param segCnt Total WAL segments in the incremental snapshot.
+     */
+    abstract void totalWalSegments(int segCnt);
+
+    /**
+     * @param segCnt Processed WAL segments for the incremental snapshot.
+     */
+    abstract void processedWalSegments(int segCnt);
+
+    /**
+     * @param entriesCnt Processed data entries for the incremental snapshot.
+     */
+    abstract void initWalEntries(LongAdder entriesCnt);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 1067a9faf99..715663d354a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -26,10 +26,8 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -67,13 +65,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
-import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
-import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotStartRecord;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -90,10 +82,6 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import org.apache.ignite.internal.processors.cache.persistence.file.FileVersionCheckingFactory;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
-import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
@@ -106,9 +94,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -116,10 +102,6 @@ import static java.util.Optional.ofNullable;
 import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
 import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_FINISH_RECORD;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_START_RECORD;
 import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
@@ -127,8 +109,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
-import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
@@ -1396,7 +1376,7 @@ public class SnapshotRestoreProcess {
 
                 walEnabled(false, cacheIds);
 
-                restoreIncrementalSnapshot(opCtx0.snpName, opCtx0.snpPath, cacheIds, opCtx0.incIdx);
+                restoreIncrementalSnapshot(cacheIds);
 
                 walEnabled(true, cacheIds);
 
@@ -1421,147 +1401,45 @@ public class SnapshotRestoreProcess {
 
     /**
      * Restore incremental snapshot.
-     *
-     * @param snpName Base snapshot name.
-     * @param snpPath Base snapshot path.
-     * @param cacheIds Restoring cache IDs.
-     * @param incIdx Index of incremental snapshot.
      */
-    private void restoreIncrementalSnapshot(
-        String snpName,
-        String snpPath,
-        Set<Integer> cacheIds,
-        int incIdx
-    ) throws IgniteCheckedException, IOException {
+    private void restoreIncrementalSnapshot(Set<Integer> cacheIds) throws IgniteCheckedException, IOException {
         SnapshotRestoreContext opCtx0 = opCtx;
 
-        IncrementalSnapshotMetadata meta = ctx.cache().context().snapshotMgr()
-            .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx);
-
-        File[] segments = walSegments(snpName, snpPath, incIdx, meta.folderName());
-
-        opCtx0.totalWalSegments = segments.length;
-
-        UUID incSnpId = meta.requestId();
-
-        File lastSeg = Arrays.stream(segments)
-            .map(File::toPath)
-            .max(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex))
-            .orElseThrow(() -> new IgniteCheckedException("Last WAL segment wasn't found [snpName=" + snpName + ']'))
-            .toFile();
-
-        IncrementalSnapshotFinishRecord incSnpFinRec = readFinishRecord(lastSeg, incSnpId);
-
-        if (incSnpFinRec == null) {
-            throw new IgniteCheckedException("System WAL record for incremental snapshot wasn't found " +
-                "[id=" + incSnpId + ", walSegFile=" + lastSeg + ']');
-        }
-
-        CacheStripedExecutor exec = new CacheStripedExecutor(ctx.pools().getStripedExecutorService());
-
-        long start = U.currentTimeMillis();
-
-        LongAdder applied = new LongAdder();
-
-        opCtx0.processedWalEntries = applied;
-        opCtx0.processedWalSegments = 0;
-
-        Set<WALRecord.RecordType> recTypes = new HashSet<>(F.asList(
-            CLUSTER_SNAPSHOT,
-            INCREMENTAL_SNAPSHOT_START_RECORD,
-            INCREMENTAL_SNAPSHOT_FINISH_RECORD,
-            DATA_RECORD_V2));
-
-        // Create a single WAL iterator for 2 steps: finding ClusterSnapshotRecord and applying incremental snapshots.
-        // TODO: Fix it after resolving https://issues.apache.org/jira/browse/IGNITE-18718.
-        try (WALIterator it = walIter(log, recTypes, segments)) {
-            long startIdx = -1;
-
-            // Step 1. Skips applying WAL until base snapshot record has been reached.
-            while (it.hasNext()) {
-                IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
-
-                WALRecord rec = walRec.getValue();
-
-                if (rec.type() == CLUSTER_SNAPSHOT) {
-                    if (((ClusterSnapshotRecord)rec).clusterSnapshotName().equals(snpName)) {
-                        startIdx = walRec.getKey().index();
-
-                        break;
-                    }
-                }
+        IncrementalSnapshotProcessor incSnpProc = new IncrementalSnapshotProcessor(
+            ctx.cache().context(), opCtx0.snpName, opCtx0.snpPath, opCtx0.incIdx, cacheIds
+        ) {
+            @Override void totalWalSegments(int segCnt) {
+                opCtx0.totalWalSegments = segCnt;
             }
 
-            if (startIdx < 0) {
-                throw new IgniteCheckedException("System WAL record for full snapshot wasn't found " +
-                    "[snpName=" + snpName + ", walSegFile=" + segments[0] + ']');
+            @Override void processedWalSegments(int segCnt) {
+                opCtx0.processedWalSegments = segCnt;
             }
 
-            UUID prevIncSnpId = incIdx > 1
-                ? ctx.cache().context().snapshotMgr().readIncrementalSnapshotMetadata(snpName, snpPath, incIdx - 1).requestId()
-                : null;
-
-            IgnitePredicate<GridCacheVersion> txVerFilter = prevIncSnpId != null
-                ? txVer -> true : txVer -> !incSnpFinRec.excluded().contains(txVer);
-
-            long lastProcessedIdx = 0;
-
-            // Step 2. Apply incremental snapshots.
-            while (it.hasNext()) {
-                IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
-
-                long curIdx = walRec.getKey().index();
-
-                if (curIdx != lastProcessedIdx) {
-                    opCtx0.processedWalSegments = (int)(curIdx - startIdx);
+            @Override void initWalEntries(LongAdder entriesCnt) {
+                opCtx0.processedWalEntries = entriesCnt;
+            }
+        };
 
-                    lastProcessedIdx = curIdx;
-                }
+        CacheStripedExecutor exec = new CacheStripedExecutor(ctx.pools().getStripedExecutorService());
 
-                WALRecord rec = walRec.getValue();
+        long start = U.currentTimeMillis();
 
-                if (rec.type() == INCREMENTAL_SNAPSHOT_START_RECORD) {
-                    IncrementalSnapshotStartRecord startRec = (IncrementalSnapshotStartRecord)rec;
+        incSnpProc.process(e -> {
+            GridCacheContext<?, ?> cacheCtx = ctx.cache().context().cacheContext(e.cacheId());
+            GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.cache().context().database();
 
-                    if (startRec.id().equals(incSnpFinRec.id()))
-                        txVerFilter = v -> incSnpFinRec.included().contains(v);
+            exec.submit(() -> {
+                try {
+                    applyDataEntry(dbMgr, cacheCtx, e);
                 }
-                else if (rec.type() == INCREMENTAL_SNAPSHOT_FINISH_RECORD) {
-                    IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)rec;
+                catch (IgniteCheckedException err) {
+                    U.error(log, "Failed to apply data entry [entry=" + e + ']');
 
-                    if (finRec.id().equals(prevIncSnpId))
-                        txVerFilter = txVer -> !incSnpFinRec.excluded().contains(txVer);
+                    exec.onError(err);
                 }
-                else if (rec.type() == DATA_RECORD_V2) {
-                    DataRecord data = (DataRecord)rec;
-
-                    for (DataEntry e: data.writeEntries()) {
-                        // That is OK to restore only part of transaction related to a specified cache group,
-                        // because a full snapshot restoring does the same.
-                        if (!cacheIds.contains(e.cacheId()) || !txVerFilter.apply(e.nearXidVersion()))
-                            continue;
-
-                        GridCacheContext<?, ?> cacheCtx = ctx.cache().context().cacheContext(e.cacheId());
-                        GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.cache().context().database();
-
-                        exec.submit(() -> {
-                            try {
-                                applyDataEntry(dbMgr, cacheCtx, e);
-
-                                applied.increment();
-                            }
-                            catch (IgniteCheckedException err) {
-                                U.error(log, "Failed to apply data entry, dataEntry=" + e + ", ptr=" + data.position());
-
-                                exec.onError(err);
-                            }
-                        }, cacheCtx.groupId(), e.partitionId());
-                    }
-                }
-            }
-        }
-
-        opCtx0.processedWalSegments += 1;
+            }, cacheCtx.groupId(), e.partitionId());
+        });
 
         exec.awaitApplyComplete();
 
@@ -1584,62 +1462,10 @@ public class SnapshotRestoreProcess {
         exec.awaitApplyComplete();
 
         if (log.isInfoEnabled()) {
-            log.info("Finished restore incremental snapshot [snpName=" + snpName + ", incrementIndex=" + incIdx +
-                ", id=" + incSnpId + ", updatesApplied=" + applied.longValue() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
-        }
-    }
-
-    /** @return {@link IncrementalSnapshotFinishRecord} for specified snapshot, or {@code null} if not found. */
-    private @Nullable IncrementalSnapshotFinishRecord readFinishRecord(File segment, UUID incSnpId) throws IgniteCheckedException {
-        try (WALIterator it = walIter(log, Collections.singleton(INCREMENTAL_SNAPSHOT_FINISH_RECORD), segment)) {
-            while (it.hasNext()) {
-                IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)it.next().getValue();
-
-                if (finRec.id().equals(incSnpId))
-                    return finRec;
-            }
-        }
-
-        return null;
-    }
-
-    /** @return WAL segments to restore for specified incremental index since the base snapshot. */
-    private File[] walSegments(String snpName, String snpPath, int incIdx, String folderName) throws IgniteCheckedException {
-        File[] segments = null;
-
-        for (int i = 1; i <= incIdx; i++) {
-            File incSnpDir = ctx.cache().context().snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, i);
-
-            if (!incSnpDir.exists())
-                throw new IgniteCheckedException("Incremental snapshot doesn't exists [dir=" + incSnpDir + ']');
-
-            File incSnpWalDir = incrementalSnapshotWalsDir(incSnpDir, folderName);
-
-            if (!incSnpWalDir.exists())
-                throw new IgniteCheckedException("Incremental snapshot WAL directory doesn't exists [dir=" + incSnpWalDir + ']');
-
-            File[] incSegs = incSnpWalDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
-
-            if (incSegs == null)
-                throw new IgniteCheckedException("Failed to list WAL segments from snapshot directory [dir=" + incSnpDir + ']');
-
-            if (segments == null)
-                segments = incSegs;
-            else {
-                int segLen = segments.length;
-
-                segments = Arrays.copyOf(segments, segLen + incSegs.length);
-
-                System.arraycopy(incSegs, 0, segments, segLen, incSegs.length);
-            }
-        }
-
-        if (F.isEmpty(segments)) {
-            throw new IgniteCheckedException("No WAL segments found for incremental snapshot " +
-                "[snpName=" + snpName + ", snpPath=" + snpPath + ", incrementIndex=" + incIdx + ']');
+            log.info("Finished restore incremental snapshot [snpName=" + opCtx0.snpName + ", incrementIndex=" + opCtx0.incIdx +
+                ", id=" + opCtx0.reqId + ", updatesApplied=" + opCtx0.processedWalEntries.longValue() +
+                ", time=" + (U.currentTimeMillis() - start) + " ms]");
         }
-
-        return segments;
     }
 
     /**
@@ -1691,20 +1517,6 @@ public class SnapshotRestoreProcess {
             cacheCtx.offheap().dataStore(locPart).updateCounter(dataEntry.partitionCounter() - 1, 1);
     }
 
-    /**
-     * @param log Ignite logger.
-     * @param types WAL record types to read.
-     * @param segments WAL segments.
-     * @return Iterator over WAL segments.
-     */
-    public WALIterator walIter(IgniteLogger log, Set<WALRecord.RecordType> types, File... segments) throws IgniteCheckedException {
-        return new IgniteWalIteratorFactory(log)
-            .iterator(new IgniteWalIteratorFactory.IteratorParametersBuilder()
-                .filter((recType, recPtr) -> types.contains(recType))
-                .sharedContext(ctx.cache().context())
-                .filesOrDirs(segments));
-    }
-
     /**
      * @param reqId Request ID.
      * @param res Results.