You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ti...@apache.org on 2023/03/28 07:07:37 UTC
[ignite] branch master updated: IGNITE-19052 Introduce IncrementalSnapshotProcessor (#10600)
This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 307ebcc5d3a IGNITE-19052 Introduce IncrementalSnapshotProcessor (#10600)
307ebcc5d3a is described below
commit 307ebcc5d3a7744dd9cf1a9cac103a776209c084
Author: Maksim Timonin <ti...@apache.org>
AuthorDate: Tue Mar 28 10:07:28 2023 +0300
IGNITE-19052 Introduce IncrementalSnapshotProcessor (#10600)
---
.../snapshot/IncrementalSnapshotProcessor.java | 288 +++++++++++++++++++++
.../snapshot/SnapshotRestoreProcess.java | 246 +++---------------
2 files changed, 317 insertions(+), 217 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
new file mode 100644
index 00000000000..2ca529e086a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotProcessor.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
+import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotStartRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_FINISH_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_START_RECORD;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
+
+/** Processes incremental snapshot: parse WAL segments and handles records. */
+abstract class IncrementalSnapshotProcessor {
+ /** Cache shared context. */
+ private final GridCacheSharedContext<?, ?> cctx;
+
+ /** Ignite logger. */
+ private final IgniteLogger log;
+
+ /** Snapshot name. */
+ private final String snpName;
+
+ /** Snapshot path. */
+ private final String snpPath;
+
+ /** Incremental snapshot index. */
+ private final int incIdx;
+
+ /** Snapshot cache IDs. */
+ private final Set<Integer> cacheIds;
+
+ /** */
+ IncrementalSnapshotProcessor(GridCacheSharedContext<?, ?> cctx, String snpName, String snpPath, int incIdx, Set<Integer> cacheIds) {
+ this.cctx = cctx;
+ this.snpName = snpName;
+ this.snpPath = snpPath;
+ this.incIdx = incIdx;
+ this.cacheIds = cacheIds;
+
+ log = cctx.logger(getClass());
+ }
+
+ /**
+ * Process incremental snapshot data.
+ *
+ * @param dataEntryHnd Handle data entries.
+ */
+ void process(@Nullable Consumer<DataEntry> dataEntryHnd) throws IgniteCheckedException, IOException {
+ IncrementalSnapshotMetadata meta = cctx.snapshotMgr()
+ .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx);
+
+ File[] segments = walSegments(meta.folderName());
+
+ totalWalSegments(segments.length);
+
+ UUID incSnpId = meta.requestId();
+
+ File lastSeg = Arrays.stream(segments)
+ .map(File::toPath)
+ .max(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex))
+ .orElseThrow(() -> new IgniteCheckedException("Last WAL segment wasn't found [snpName=" + snpName + ']'))
+ .toFile();
+
+ IncrementalSnapshotFinishRecord incSnpFinRec = readFinishRecord(lastSeg, incSnpId);
+
+ if (incSnpFinRec == null) {
+ throw new IgniteCheckedException("System WAL record for incremental snapshot wasn't found " +
+ "[id=" + incSnpId + ", walSegFile=" + lastSeg + ']');
+ }
+
+ LongAdder applied = new LongAdder();
+
+ initWalEntries(applied);
+ processedWalSegments(0);
+
+ Set<WALRecord.RecordType> recTypes = new HashSet<>(F.asList(
+ CLUSTER_SNAPSHOT,
+ INCREMENTAL_SNAPSHOT_START_RECORD,
+ INCREMENTAL_SNAPSHOT_FINISH_RECORD,
+ DATA_RECORD_V2));
+
+ // Create a single WAL iterator for 2 steps: finding ClusterSnapshotRecord and applying incremental snapshots.
+ // TODO: Fix it after resolving https://issues.apache.org/jira/browse/IGNITE-18718.
+ try (WALIterator it = walIter(log, recTypes, segments)) {
+ long startIdx = -1;
+
+ // Step 1. Skips applying WAL until base snapshot record has been reached.
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
+
+ WALRecord rec = walRec.getValue();
+
+ if (rec.type() == CLUSTER_SNAPSHOT) {
+ if (((ClusterSnapshotRecord)rec).clusterSnapshotName().equals(snpName)) {
+ startIdx = walRec.getKey().index();
+
+ break;
+ }
+ }
+ }
+
+ if (startIdx < 0) {
+ throw new IgniteCheckedException("System WAL record for full snapshot wasn't found " +
+ "[snpName=" + snpName + ", walSegFile=" + segments[0] + ']');
+ }
+
+ UUID prevIncSnpId = incIdx > 1
+ ? cctx.snapshotMgr().readIncrementalSnapshotMetadata(snpName, snpPath, incIdx - 1).requestId()
+ : null;
+
+ IgnitePredicate<GridCacheVersion> txVerFilter = prevIncSnpId != null
+ ? txVer -> true : txVer -> !incSnpFinRec.excluded().contains(txVer);
+
+ long lastProcessedIdx = 0;
+
+ // Step 2. Apply incremental snapshots.
+ while (it.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
+
+ long curIdx = walRec.getKey().index();
+
+ if (curIdx != lastProcessedIdx) {
+ processedWalSegments((int)(curIdx - startIdx));
+
+ lastProcessedIdx = curIdx;
+ }
+
+ WALRecord rec = walRec.getValue();
+
+ if (rec.type() == INCREMENTAL_SNAPSHOT_START_RECORD) {
+ IncrementalSnapshotStartRecord startRec = (IncrementalSnapshotStartRecord)rec;
+
+ if (startRec.id().equals(incSnpFinRec.id()))
+ txVerFilter = v -> incSnpFinRec.included().contains(v);
+ }
+ else if (rec.type() == INCREMENTAL_SNAPSHOT_FINISH_RECORD) {
+ IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)rec;
+
+ if (finRec.id().equals(prevIncSnpId))
+ txVerFilter = txVer -> !incSnpFinRec.excluded().contains(txVer);
+ }
+ else if (rec.type() == DATA_RECORD_V2) {
+ DataRecord data = (DataRecord)rec;
+
+ for (DataEntry e: data.writeEntries()) {
+ // That is OK to restore only part of transaction related to a specified cache group,
+ // because a full snapshot restoring does the same.
+ if (!cacheIds.contains(e.cacheId()) || !txVerFilter.apply(e.nearXidVersion()))
+ continue;
+
+ if (dataEntryHnd != null)
+ dataEntryHnd.accept(e);
+
+ applied.increment();
+ }
+ }
+ }
+
+ processedWalSegments(segments.length);
+ }
+ }
+
+ /** @return WAL segments to restore for specified incremental index since the base snapshot. */
+ private File[] walSegments(String folderName) throws IgniteCheckedException {
+ File[] segments = null;
+
+ for (int i = 1; i <= incIdx; i++) {
+ File incSnpDir = cctx.snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, i);
+
+ if (!incSnpDir.exists())
+ throw new IgniteCheckedException("Incremental snapshot doesn't exists [dir=" + incSnpDir + ']');
+
+ File incSnpWalDir = incrementalSnapshotWalsDir(incSnpDir, folderName);
+
+ if (!incSnpWalDir.exists())
+ throw new IgniteCheckedException("Incremental snapshot WAL directory doesn't exists [dir=" + incSnpWalDir + ']');
+
+ File[] incSegs = incSnpWalDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
+
+ if (incSegs == null)
+ throw new IgniteCheckedException("Failed to list WAL segments from snapshot directory [dir=" + incSnpDir + ']');
+
+ if (segments == null)
+ segments = incSegs;
+ else {
+ int segLen = segments.length;
+
+ segments = Arrays.copyOf(segments, segLen + incSegs.length);
+
+ System.arraycopy(incSegs, 0, segments, segLen, incSegs.length);
+ }
+ }
+
+ if (F.isEmpty(segments)) {
+ throw new IgniteCheckedException("No WAL segments found for incremental snapshot " +
+ "[snpName=" + snpName + ", snpPath=" + snpPath + ", incrementIndex=" + incIdx + ']');
+ }
+
+ return segments;
+ }
+
+ /** @return {@link IncrementalSnapshotFinishRecord} for specified snapshot, or {@code null} if not found. */
+ private @Nullable IncrementalSnapshotFinishRecord readFinishRecord(File segment, UUID incSnpId) throws IgniteCheckedException {
+ try (WALIterator it = walIter(log, Collections.singleton(INCREMENTAL_SNAPSHOT_FINISH_RECORD), segment)) {
+ while (it.hasNext()) {
+ IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)it.next().getValue();
+
+ if (finRec.id().equals(incSnpId))
+ return finRec;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * @param log Ignite logger.
+ * @param types WAL record types to read.
+ * @param segments WAL segments.
+ * @return Iterator over WAL segments.
+ */
+ private WALIterator walIter(IgniteLogger log, Set<WALRecord.RecordType> types, File... segments) throws IgniteCheckedException {
+ return new IgniteWalIteratorFactory(log)
+ .iterator(new IgniteWalIteratorFactory.IteratorParametersBuilder()
+ .filter((recType, recPtr) -> types.contains(recType))
+ .sharedContext(cctx)
+ .filesOrDirs(segments));
+ }
+
+ /**
+ * @param segCnt Total WAL segments in the incremental snapshot.
+ */
+ abstract void totalWalSegments(int segCnt);
+
+ /**
+ * @param segCnt Processed WAL segments for the incremental snapshot.
+ */
+ abstract void processedWalSegments(int segCnt);
+
+ /**
+ * @param entriesCnt Processed data entries for the incremental snapshot.
+ */
+ abstract void initWalEntries(LongAdder entriesCnt);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 1067a9faf99..715663d354a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -26,10 +26,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -67,13 +65,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
-import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
-import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
-import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotStartRecord;
-import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -90,10 +82,6 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
import org.apache.ignite.internal.processors.cache.persistence.file.FileVersionCheckingFactory;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
-import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
-import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
@@ -106,9 +94,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -116,10 +102,6 @@ import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_FINISH_RECORD;
-import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.INCREMENTAL_SNAPSHOT_START_RECORD;
import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
@@ -127,8 +109,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
-import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotWalsDir;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
@@ -1396,7 +1376,7 @@ public class SnapshotRestoreProcess {
walEnabled(false, cacheIds);
- restoreIncrementalSnapshot(opCtx0.snpName, opCtx0.snpPath, cacheIds, opCtx0.incIdx);
+ restoreIncrementalSnapshot(cacheIds);
walEnabled(true, cacheIds);
@@ -1421,147 +1401,45 @@ public class SnapshotRestoreProcess {
/**
* Restore incremental snapshot.
- *
- * @param snpName Base snapshot name.
- * @param snpPath Base snapshot path.
- * @param cacheIds Restoring cache IDs.
- * @param incIdx Index of incremental snapshot.
*/
- private void restoreIncrementalSnapshot(
- String snpName,
- String snpPath,
- Set<Integer> cacheIds,
- int incIdx
- ) throws IgniteCheckedException, IOException {
+ private void restoreIncrementalSnapshot(Set<Integer> cacheIds) throws IgniteCheckedException, IOException {
SnapshotRestoreContext opCtx0 = opCtx;
- IncrementalSnapshotMetadata meta = ctx.cache().context().snapshotMgr()
- .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx);
-
- File[] segments = walSegments(snpName, snpPath, incIdx, meta.folderName());
-
- opCtx0.totalWalSegments = segments.length;
-
- UUID incSnpId = meta.requestId();
-
- File lastSeg = Arrays.stream(segments)
- .map(File::toPath)
- .max(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex))
- .orElseThrow(() -> new IgniteCheckedException("Last WAL segment wasn't found [snpName=" + snpName + ']'))
- .toFile();
-
- IncrementalSnapshotFinishRecord incSnpFinRec = readFinishRecord(lastSeg, incSnpId);
-
- if (incSnpFinRec == null) {
- throw new IgniteCheckedException("System WAL record for incremental snapshot wasn't found " +
- "[id=" + incSnpId + ", walSegFile=" + lastSeg + ']');
- }
-
- CacheStripedExecutor exec = new CacheStripedExecutor(ctx.pools().getStripedExecutorService());
-
- long start = U.currentTimeMillis();
-
- LongAdder applied = new LongAdder();
-
- opCtx0.processedWalEntries = applied;
- opCtx0.processedWalSegments = 0;
-
- Set<WALRecord.RecordType> recTypes = new HashSet<>(F.asList(
- CLUSTER_SNAPSHOT,
- INCREMENTAL_SNAPSHOT_START_RECORD,
- INCREMENTAL_SNAPSHOT_FINISH_RECORD,
- DATA_RECORD_V2));
-
- // Create a single WAL iterator for 2 steps: finding ClusterSnapshotRecord and applying incremental snapshots.
- // TODO: Fix it after resolving https://issues.apache.org/jira/browse/IGNITE-18718.
- try (WALIterator it = walIter(log, recTypes, segments)) {
- long startIdx = -1;
-
- // Step 1. Skips applying WAL until base snapshot record has been reached.
- while (it.hasNext()) {
- IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
-
- WALRecord rec = walRec.getValue();
-
- if (rec.type() == CLUSTER_SNAPSHOT) {
- if (((ClusterSnapshotRecord)rec).clusterSnapshotName().equals(snpName)) {
- startIdx = walRec.getKey().index();
-
- break;
- }
- }
+ IncrementalSnapshotProcessor incSnpProc = new IncrementalSnapshotProcessor(
+ ctx.cache().context(), opCtx0.snpName, opCtx0.snpPath, opCtx0.incIdx, cacheIds
+ ) {
+ @Override void totalWalSegments(int segCnt) {
+ opCtx0.totalWalSegments = segCnt;
}
- if (startIdx < 0) {
- throw new IgniteCheckedException("System WAL record for full snapshot wasn't found " +
- "[snpName=" + snpName + ", walSegFile=" + segments[0] + ']');
+ @Override void processedWalSegments(int segCnt) {
+ opCtx0.processedWalSegments = segCnt;
}
- UUID prevIncSnpId = incIdx > 1
- ? ctx.cache().context().snapshotMgr().readIncrementalSnapshotMetadata(snpName, snpPath, incIdx - 1).requestId()
- : null;
-
- IgnitePredicate<GridCacheVersion> txVerFilter = prevIncSnpId != null
- ? txVer -> true : txVer -> !incSnpFinRec.excluded().contains(txVer);
-
- long lastProcessedIdx = 0;
-
- // Step 2. Apply incremental snapshots.
- while (it.hasNext()) {
- IgniteBiTuple<WALPointer, WALRecord> walRec = it.next();
-
- long curIdx = walRec.getKey().index();
-
- if (curIdx != lastProcessedIdx) {
- opCtx0.processedWalSegments = (int)(curIdx - startIdx);
+ @Override void initWalEntries(LongAdder entriesCnt) {
+ opCtx0.processedWalEntries = entriesCnt;
+ }
+ };
- lastProcessedIdx = curIdx;
- }
+ CacheStripedExecutor exec = new CacheStripedExecutor(ctx.pools().getStripedExecutorService());
- WALRecord rec = walRec.getValue();
+ long start = U.currentTimeMillis();
- if (rec.type() == INCREMENTAL_SNAPSHOT_START_RECORD) {
- IncrementalSnapshotStartRecord startRec = (IncrementalSnapshotStartRecord)rec;
+ incSnpProc.process(e -> {
+ GridCacheContext<?, ?> cacheCtx = ctx.cache().context().cacheContext(e.cacheId());
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.cache().context().database();
- if (startRec.id().equals(incSnpFinRec.id()))
- txVerFilter = v -> incSnpFinRec.included().contains(v);
+ exec.submit(() -> {
+ try {
+ applyDataEntry(dbMgr, cacheCtx, e);
}
- else if (rec.type() == INCREMENTAL_SNAPSHOT_FINISH_RECORD) {
- IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)rec;
+ catch (IgniteCheckedException err) {
+ U.error(log, "Failed to apply data entry [entry=" + e + ']');
- if (finRec.id().equals(prevIncSnpId))
- txVerFilter = txVer -> !incSnpFinRec.excluded().contains(txVer);
+ exec.onError(err);
}
- else if (rec.type() == DATA_RECORD_V2) {
- DataRecord data = (DataRecord)rec;
-
- for (DataEntry e: data.writeEntries()) {
- // That is OK to restore only part of transaction related to a specified cache group,
- // because a full snapshot restoring does the same.
- if (!cacheIds.contains(e.cacheId()) || !txVerFilter.apply(e.nearXidVersion()))
- continue;
-
- GridCacheContext<?, ?> cacheCtx = ctx.cache().context().cacheContext(e.cacheId());
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.cache().context().database();
-
- exec.submit(() -> {
- try {
- applyDataEntry(dbMgr, cacheCtx, e);
-
- applied.increment();
- }
- catch (IgniteCheckedException err) {
- U.error(log, "Failed to apply data entry, dataEntry=" + e + ", ptr=" + data.position());
-
- exec.onError(err);
- }
- }, cacheCtx.groupId(), e.partitionId());
- }
- }
- }
- }
-
- opCtx0.processedWalSegments += 1;
+ }, cacheCtx.groupId(), e.partitionId());
+ });
exec.awaitApplyComplete();
@@ -1584,62 +1462,10 @@ public class SnapshotRestoreProcess {
exec.awaitApplyComplete();
if (log.isInfoEnabled()) {
- log.info("Finished restore incremental snapshot [snpName=" + snpName + ", incrementIndex=" + incIdx +
- ", id=" + incSnpId + ", updatesApplied=" + applied.longValue() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
- }
- }
-
- /** @return {@link IncrementalSnapshotFinishRecord} for specified snapshot, or {@code null} if not found. */
- private @Nullable IncrementalSnapshotFinishRecord readFinishRecord(File segment, UUID incSnpId) throws IgniteCheckedException {
- try (WALIterator it = walIter(log, Collections.singleton(INCREMENTAL_SNAPSHOT_FINISH_RECORD), segment)) {
- while (it.hasNext()) {
- IncrementalSnapshotFinishRecord finRec = (IncrementalSnapshotFinishRecord)it.next().getValue();
-
- if (finRec.id().equals(incSnpId))
- return finRec;
- }
- }
-
- return null;
- }
-
- /** @return WAL segments to restore for specified incremental index since the base snapshot. */
- private File[] walSegments(String snpName, String snpPath, int incIdx, String folderName) throws IgniteCheckedException {
- File[] segments = null;
-
- for (int i = 1; i <= incIdx; i++) {
- File incSnpDir = ctx.cache().context().snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, i);
-
- if (!incSnpDir.exists())
- throw new IgniteCheckedException("Incremental snapshot doesn't exists [dir=" + incSnpDir + ']');
-
- File incSnpWalDir = incrementalSnapshotWalsDir(incSnpDir, folderName);
-
- if (!incSnpWalDir.exists())
- throw new IgniteCheckedException("Incremental snapshot WAL directory doesn't exists [dir=" + incSnpWalDir + ']');
-
- File[] incSegs = incSnpWalDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
-
- if (incSegs == null)
- throw new IgniteCheckedException("Failed to list WAL segments from snapshot directory [dir=" + incSnpDir + ']');
-
- if (segments == null)
- segments = incSegs;
- else {
- int segLen = segments.length;
-
- segments = Arrays.copyOf(segments, segLen + incSegs.length);
-
- System.arraycopy(incSegs, 0, segments, segLen, incSegs.length);
- }
- }
-
- if (F.isEmpty(segments)) {
- throw new IgniteCheckedException("No WAL segments found for incremental snapshot " +
- "[snpName=" + snpName + ", snpPath=" + snpPath + ", incrementIndex=" + incIdx + ']');
+ log.info("Finished restore incremental snapshot [snpName=" + opCtx0.snpName + ", incrementIndex=" + opCtx0.incIdx +
+ ", id=" + opCtx0.reqId + ", updatesApplied=" + opCtx0.processedWalEntries.longValue() +
+ ", time=" + (U.currentTimeMillis() - start) + " ms]");
}
-
- return segments;
}
/**
@@ -1691,20 +1517,6 @@ public class SnapshotRestoreProcess {
cacheCtx.offheap().dataStore(locPart).updateCounter(dataEntry.partitionCounter() - 1, 1);
}
- /**
- * @param log Ignite logger.
- * @param types WAL record types to read.
- * @param segments WAL segments.
- * @return Iterator over WAL segments.
- */
- public WALIterator walIter(IgniteLogger log, Set<WALRecord.RecordType> types, File... segments) throws IgniteCheckedException {
- return new IgniteWalIteratorFactory(log)
- .iterator(new IgniteWalIteratorFactory.IteratorParametersBuilder()
- .filter((recType, recPtr) -> types.contains(recType))
- .sharedContext(ctx.cache().context())
- .filesOrDirs(segments));
- }
-
/**
* @param reqId Request ID.
* @param res Results.