You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/10/26 14:03:06 UTC
[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17613 Copy WAL segments in incremental snapshot (#10292)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
new 4f1a3033a28 IGNITE-17613 Copy WAL segments in incremental snapshot (#10292)
4f1a3033a28 is described below
commit 4f1a3033a284f9348221f134b32f79a50f1375ea
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Oct 26 17:02:58 2022 +0300
IGNITE-17613 Copy WAL segments in incremental snapshot (#10292)
---
.../pagemem/wal/IgniteWriteAheadLogManager.java | 23 ++++
.../snapshot/IgniteSnapshotManager.java | 139 ++++++++++++++++-----
.../snapshot/IncrementalSnapshotFutureTask.java | 67 +++++++++-
.../snapshot/IncrementalSnapshotMetadata.java | 5 +
.../snapshot/SnapshotHandlerRestoreTask.java | 3 +-
.../snapshot/SnapshotPartitionsVerifyTask.java | 3 +-
.../persistence/wal/FileWriteAheadLogManager.java | 84 ++++++++-----
.../wal/aware/SegmentArchivedStorage.java | 4 +-
.../cache/persistence/wal/aware/SegmentAware.java | 14 ++-
.../wal/aware/SegmentCompressStorage.java | 21 ++++
.../wal/aware/SegmentReservationStorage.java | 4 +-
.../cache/persistence/pagemem/NoOpWALManager.java | 16 +++
.../snapshot/IncrementalSnapshotTest.java | 134 ++++++++++++++++++++
13 files changed, 448 insertions(+), 69 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 98d99d613cc..832beef8bf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.pagemem.wal;
+import java.io.File;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
@@ -231,4 +233,25 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
* Start automatically releasing segments when reaching {@link DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
void startAutoReleaseSegments();
+
+ /**
+ * Archive directory if any.
+ *
+ * @return Archive directory.
+ */
+ @Nullable File archiveDir();
+
+ /**
+ * @param idx Segment index.
+ * @return Compressed archive segment.
+ */
+ @Nullable File compactedSegment(long idx);
+
+ /**
+ * Blocks current thread while segment with the {@code idx} not compressed.
+ * If segment compressed, already, returns immediately.
+ *
+ * @param idx Segment index.
+ */
+ void awaitCompacted(long idx) throws IgniteInterruptedCheckedException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index cd894261b73..7be795ccf85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.BufferedInputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -106,6 +105,9 @@ import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.systemview.walker.SnapshotViewWalker;
import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
@@ -132,6 +134,7 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPa
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
import org.apache.ignite.internal.processors.cache.tree.DataRow;
@@ -223,6 +226,7 @@ import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+import static org.apache.ignite.internal.util.io.GridFileUtils.ensureHardLinkAvailable;
import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_SNAPSHOT;
import static org.apache.ignite.spi.systemview.view.SnapshotView.SNAPSHOT_SYS_VIEW;
import static org.apache.ignite.spi.systemview.view.SnapshotView.SNAPSHOT_SYS_VIEW_DESC;
@@ -814,12 +818,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
if (req.incremental()) {
- SnapshotMetadata meta = readSnapshotMetadata(new File(
- snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
- snapshotMetaFileName(cctx.localNode().consistentId().toString())
- ));
+ SnapshotMetadata meta;
try {
+ meta = readSnapshotMetadata(new File(
+ snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
+ snapshotMetaFileName(cctx.localNode().consistentId().toString())
+ ));
+
checkIncrementalCanBeCreated(req.snapshotName(), req.snapshotPath(), meta);
}
catch (IgniteCheckedException | IOException e) {
@@ -842,6 +848,46 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
SnapshotMetadata meta
) {
File incSnpDir = incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex());
+ WALPointer lowPtr, highPtr;
+
+ if (req.incrementIndex() == 1)
+ lowPtr = meta.snapshotRecordPointer();
+ else {
+ int prevIdx = req.incrementIndex() - 1;
+
+ IncrementalSnapshotMetadata prevIncSnpMeta;
+
+ try {
+ prevIncSnpMeta = readFromFile(new File(
+ incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), prevIdx),
+ incrementalSnapshotMetaFileName(prevIdx)
+ ));
+ }
+ catch (IgniteCheckedException | IOException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ lowPtr = prevIncSnpMeta.cutPointer();
+ }
+
+ cctx.database().checkpointReadLock();
+
+ try {
+ highPtr = cctx.wal().log(new ClusterSnapshotRecord(req.snapshotName()));
+
+ // Dummy way to forcefully switch to the next segment.
+ // TODO: Must be replaced with the actual ConsistentCut logging.
+ cctx.wal().log(new ClusterSnapshotRecord(req.snapshotName()), RolloverType.NEXT_SEGMENT);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ finally {
+ cctx.database().checkpointReadUnlock();
+ }
+
+ // For now, forcefully rollover to the next WAL segments to make segments waiting possible.
+ assert cctx.wal().currentSegment() >= highPtr.index() : "Rollover must be invoked.";
IgniteInternalFuture<SnapshotOperationResponse> task0 = registerTask(req.snapshotName(), new IncrementalSnapshotFutureTask(
cctx,
@@ -851,7 +897,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
req.snapshotPath(),
req.incrementIndex(),
tmpWorkDir,
- ioFactory
+ ioFactory,
+ lowPtr,
+ highPtr
)).chain(fut -> {
if (fut.error() != null)
throw F.wrap(fut.error());
@@ -864,7 +912,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
req.incrementIndex(),
cctx.localNode().consistentId().toString(),
pdsSettings.folderName(),
- null /* WAL Pointer for CUT record goes here. */
+ highPtr
);
writeSnapshotMetafile(
@@ -1600,7 +1648,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
* @param consId Node consistent id to read metadata for.
* @return Snapshot metadata instance.
*/
- public SnapshotMetadata readSnapshotMetadata(File snpDir, String consId) {
+ public SnapshotMetadata readSnapshotMetadata(File snpDir, String consId) throws IgniteCheckedException, IOException {
return readSnapshotMetadata(new File(snpDir, snapshotMetaFileName(consId)));
}
@@ -1608,26 +1656,31 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
* @param smf File denoting to snapshot metafile.
* @return Snapshot metadata instance.
*/
- private SnapshotMetadata readSnapshotMetadata(File smf) {
- if (!smf.exists())
- throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+ private SnapshotMetadata readSnapshotMetadata(File smf) throws IgniteCheckedException, IOException {
+ SnapshotMetadata meta = readFromFile(smf);
String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
- try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
- SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cctx.gridConfig()));
+ if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
+ throw new IgniteException(
+ "Error reading snapshot metadata [smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId())
+ );
+ }
- if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
- throw new IgniteException(
- "Error reading snapshot metadata [smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId())
- );
- }
+ return meta;
+ }
- return meta;
- }
- catch (IgniteCheckedException | IOException e) {
- throw new IgniteException("An error occurred during reading snapshot metadata file [file=" +
- smf.getAbsolutePath() + "]", e);
+ /**
+ * @param smf File to read.
+ * @return Read metadata.
+ * @param <T> Type of metadata.
+ */
+ private <T> T readFromFile(File smf) throws IgniteCheckedException, IOException {
+ if (!smf.exists())
+ throw new IgniteCheckedException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+ try (InputStream in = new BufferedInputStream(Files.newInputStream(smf.toPath()))) {
+ return marsh.unmarshal(in, U.resolveClassLoader(cctx.gridConfig()));
}
}
@@ -1665,15 +1718,20 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
Map<String, SnapshotMetadata> metasMap = new HashMap<>();
SnapshotMetadata prev = null;
- for (File smf : smfs) {
- SnapshotMetadata curr = readSnapshotMetadata(smf);
+ try {
+ for (File smf : smfs) {
+ SnapshotMetadata curr = readSnapshotMetadata(smf);
- if (prev != null && !prev.sameSnapshot(curr))
- throw new IgniteException("Snapshot metadata files are from different snapshots [prev=" + prev + ", curr=" + curr);
+ if (prev != null && !prev.sameSnapshot(curr))
+ throw new IgniteException("Snapshot metadata files are from different snapshots [prev=" + prev + ", curr=" + curr);
- metasMap.put(curr.consistentId(), curr);
+ metasMap.put(curr.consistentId(), curr);
- prev = curr;
+ prev = curr;
+ }
+ }
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
}
SnapshotMetadata currNodeSmf = metasMap.remove(cctx.localNode().consistentId().toString());
@@ -1767,6 +1825,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
if (incremental) {
+ if (!cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()) {
+ throw new IgniteException("Create incremental snapshot request has been rejected. " +
+ "WAL compaction must be enabled.");
+ }
+
if (!snpExists) {
throw new IgniteException("Create incremental snapshot request has been rejected. " +
"Base snapshot with given name doesn't exist on local node.");
@@ -2443,6 +2506,24 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
) throws IgniteCheckedException, IOException {
File snpDir = snapshotLocalDir(name, snpPath);
+ IgniteWriteAheadLogManager wal = cctx.wal();
+
+ if (wal == null) {
+ throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+ "WAL must be eanbled."
+ );
+ }
+
+ File archiveDir = wal.archiveDir();
+
+ if (archiveDir == null) {
+ throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+ "WAL archive must be eanbled."
+ );
+ }
+
+ ensureHardLinkAvailable(archiveDir.toPath(), snpDir.toPath());
+
Set<String> aliveNodesConsIds = cctx.discovery().aliveServerNodes()
.stream()
.map(node -> node.consistentId().toString())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
index aa9e2208c05..2bbaad1d4c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -18,14 +18,18 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.Nullable;
@@ -42,16 +46,28 @@ class IncrementalSnapshotFutureTask
/** Metadata of the full snapshot. */
private final Set<Integer> affectedCacheGrps;
+ /**
+ * Pointer to the previous snapshot record.
+ * In case first increment snapshot will point to the {@link ClusterSnapshotRecord}.
+ * For second and subsequent incements on the previous consistent cut record.
+ */
+ private final WALPointer lowPtr;
+
+ /** Current consistent cut WAL pointer. */
+ private final WALPointer highPtr;
+
/** */
public IncrementalSnapshotFutureTask(
GridCacheSharedContext<?, ?> cctx,
UUID srcNodeId,
UUID reqNodeId,
SnapshotMetadata meta,
- String snpPath,
+ @Nullable String snpPath,
int incIdx,
File tmpWorkDir,
- FileIOFactory ioFactory
+ FileIOFactory ioFactory,
+ WALPointer lowPtr,
+ WALPointer highPtr
) {
super(
cctx,
@@ -82,6 +98,8 @@ class IncrementalSnapshotFutureTask
this.incIdx = incIdx;
this.snpPath = snpPath;
this.affectedCacheGrps = new HashSet<>(meta.cacheGroupIds());
+ this.lowPtr = lowPtr;
+ this.highPtr = highPtr;
cctx.cache().configManager().addConfigurationChangeListener(this);
}
@@ -102,7 +120,50 @@ class IncrementalSnapshotFutureTask
return false;
}
- onDone(new IncrementalSnapshotFutureTaskResult());
+ cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
+ try {
+ // First increment must include low segment, because full snapshot knows nothing about WAL.
+ // All other begins from the next segment because lowPtr already saved inside previous increment.
+ long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
+ long highIdx = highPtr.index();
+
+ assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()
+ : "WAL Compaction must be enabled";
+ assert lowIdx <= highIdx;
+
+ if (log.isInfoEnabled())
+ log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']');
+
+ cctx.wal().awaitCompacted(highPtr.index());
+
+ if (log.isInfoEnabled()) {
+ log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " +
+ "highIdx=" + highIdx + ']');
+ }
+
+ for (; lowIdx <= highIdx; lowIdx++) {
+ File seg = cctx.wal().compactedSegment(lowIdx);
+
+ if (!seg.exists()) {
+ onDone(new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']'));
+
+ return;
+ }
+
+ Path segLink = incSnpDir.toPath().resolve(seg.getName());
+
+ if (log.isDebugEnabled())
+ log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']');
+
+ Files.createLink(segLink, seg.toPath());
+ }
+
+ onDone(new IncrementalSnapshotFutureTaskResult());
+ }
+ catch (Throwable e) {
+ onDone(e);
+ }
+ });
return true;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
index c262b070d82..3f4abe5d02d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
@@ -75,6 +75,11 @@ public class IncrementalSnapshotMetadata implements Serializable {
this.cutPtr = cutPtr;
}
+ /** @return Pointer to consistent cut record. */
+ public WALPointer cutPointer() {
+ return cutPtr;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IncrementalSnapshotMetadata.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
index 947085ab87f..79d019b6985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -144,7 +145,7 @@ public class SnapshotHandlerRestoreTask extends AbstractSnapshotVerificationTask
return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
new SnapshotHandlerContext(meta, grps, ignite.localNode(), snpDir));
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 05f5856b903..29219920fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -119,7 +120,7 @@ public class SnapshotPartitionsVerifyTask extends AbstractSnapshotVerificationTa
return new SnapshotPartitionsVerifyHandler(cctx)
.invoke(new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(), snpDir));
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 51a1b88b105..d92895ceaae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -634,6 +634,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath()));
}
+ /** {@inheritDoc} */
+ @Override public @Nullable File archiveDir() {
+ return walArchiveDir;
+ }
+
/**
* Collects WAL segments from the archive only if they are all present.
* Will wait for the last segment to be archived if it is not.
@@ -654,10 +659,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
List<File> res = new ArrayList<>();
for (long i = low.index(); i < high.index(); i++) {
- String segmentName = fileName(i);
-
- File file = new File(walArchiveDir, segmentName);
- File fileZip = new File(walArchiveDir, segmentName + ZIP_SUFFIX);
+ File file = archiveSegment(i, null);
+ File fileZip = compactedSegment(i);
if (file.exists())
res.add(file);
@@ -1101,10 +1104,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* @return {@code True} exists.
*/
private boolean hasIndex(long absIdx) {
- String segmentName = fileName(absIdx);
-
- boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
- new File(walArchiveDir, segmentName + ZIP_SUFFIX).exists();
+ boolean inArchive = archiveSegment(absIdx, null).exists() ||
+ compactedSegment(absIdx).exists();
if (inArchive)
return true;
@@ -2063,11 +2064,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
File origFile = new File(walWorkDir, fileName(segIdx));
- String name = fileName(absIdx);
-
- File dstTmpFile = new File(walArchiveDir, name + TMP_SUFFIX);
-
- File dstFile = new File(walArchiveDir, name);
+ File dstTmpFile = FileWriteAheadLogManager.this.archiveSegment(absIdx, TMP_SUFFIX);
+ File dstFile = FileWriteAheadLogManager.this.archiveSegment(absIdx, null);
if (log.isInfoEnabled()) {
log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx +
@@ -2314,13 +2312,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if ((segIdx = tryReserveNextSegmentOrWait()) == -1)
continue;
- String segmentFileName = fileName(segIdx);
-
- File tmpZip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX + TMP_SUFFIX);
-
- File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX);
-
- File raw = new File(walArchiveDir, segmentFileName);
+ File tmpZip = archiveSegment(segIdx, ZIP_SUFFIX + TMP_SUFFIX);
+ File zip = compactedSegment(segIdx);
+ File raw = archiveSegment(segIdx, null);
long currSize = 0;
long reservedSize = raw.length();
@@ -2475,12 +2469,45 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (segmentReservedOrLocked(desc.idx))
return;
- if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx))
- segmentAware.addSize(desc.idx, -deleteArchiveFiles(desc.file));
+ if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) {
+ long sz = deleteArchiveFiles(desc.file);
+
+ segmentAware.addSize(desc.idx, -sz);
+ }
}
}
}
+ /** {@inheritDoc} */
+ @Override public File compactedSegment(long idx) {
+ return archiveSegment(idx, ZIP_SUFFIX);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitCompacted(long idx) throws IgniteInterruptedCheckedException {
+ segmentAware.awaitSegmentCompressed(idx);
+ }
+
+ /** */
+ private File archiveSegment(long idx, @Nullable String ext) {
+ return archiveSegment(walArchiveDir, idx, ext);
+ }
+
+ /**
+ * @param walArchiveDir WAL archive directory.
+ * @param idx Segment index.
+ * @param ext Optional extension
+ * @return Path to archive segment.
+ */
+ public static File archiveSegment(File walArchiveDir, long idx, String ext) {
+ String fileName = fileName(idx);
+
+ if (ext != null)
+ fileName += ext;
+
+ return new File(walArchiveDir, fileName);
+ }
+
/**
* Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay.
*/
@@ -2525,11 +2552,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (segmentToDecompress == -1)
continue;
- String segmentFileName = fileName(segmentToDecompress);
-
- File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX);
- File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX);
- File unzip = new File(walArchiveDir, segmentFileName);
+ File zip = compactedSegment(segmentToDecompress);
+ File unzipTmp = archiveSegment(segmentToDecompress, TMP_SUFFIX);
+ File unzip = archiveSegment(segmentToDecompress, null);
long currSize = 0;
long reservedSize = U.uncompressedSize(zip);
@@ -2606,7 +2631,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
if (decompressionFutures.containsKey(idx))
return decompressionFutures.get(idx);
- File f = new File(walArchiveDir, fileName(idx));
+ File f = archiveSegment(idx, null);
if (f.exists())
return new GridFinishedFuture<>();
@@ -2882,8 +2907,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
AbstractFileDescriptor currDesc = desc;
if (!desc.file().exists()) {
- FileDescriptor zipFile = new FileDescriptor(
- new File(walArchiveDir, fileName(desc.idx()) + ZIP_SUFFIX));
+ FileDescriptor zipFile = new FileDescriptor(archiveSegment(walArchiveDir, desc.idx(), ZIP_SUFFIX));
if (!zipFile.file.exists()) {
throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index 12a1e5f2fdc..3ca1ab82a73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -65,9 +65,9 @@ class SegmentArchivedStorage extends SegmentObservable {
}
/**
- * Method will wait activation of particular WAL segment index.
+ * Method will wait archivation of particular WAL segment index.
*
- * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
+ * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
* @throws IgniteInterruptedCheckedException if interrupted.
*/
synchronized void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 728bd7411e5..e4131098e7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -130,15 +130,25 @@ public class SegmentAware {
}
/**
- * Method will wait activation of particular WAL segment index.
+ * Method will wait archivation of particular WAL segment index.
*
- * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
+ * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
* @throws IgniteInterruptedCheckedException if interrupted.
*/
public void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException {
segmentArchivedStorage.awaitSegmentArchived(awaitIdx);
}
+ /**
+ * Method will wait activation of particular WAL segment index.
+ *
+ * @param awaitIdx absolute index {@link #lastCompressedIdx()} to become true.
+ * @throws IgniteInterruptedCheckedException if interrupted.
+ */
+ public void awaitSegmentCompressed(long awaitIdx) throws IgniteInterruptedCheckedException {
+ segmentCompressStorage.awaitSegmentCompressed(awaitIdx);
+ }
+
/**
* Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. Waits if
* there's no segment to archive right now.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 2272c0ec123..85a9257f0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -79,10 +79,31 @@ class SegmentCompressStorage {
else
this.lastCompressedIdx = lastMaxCompressedIdx;
+ notifyAll();
+
if (compressedIdx > lastEnqueuedToCompressIdx)
lastEnqueuedToCompressIdx = compressedIdx;
}
+ /**
+ * Method will wait activation of particular WAL segment index.
+ *
+ * @param awaitIdx absolute index {@link #lastCompressedIdx()}} to become true.
+ * @throws IgniteInterruptedCheckedException if interrupted.
+ */
+ public synchronized void awaitSegmentCompressed(long awaitIdx) throws IgniteInterruptedCheckedException {
+ while (lastCompressedIdx() < awaitIdx && !interrupted) {
+ try {
+ wait(2000);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
+
+ checkInterrupted();
+ }
+
/**
* @return Last compressed segment.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
index ee81bff4b34..adc4fbfe670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
@@ -122,7 +122,9 @@ class SegmentReservationStorage extends SegmentObservable {
Long oldMin = oldMinE == null ? null : oldMinE.getKey();
Long newMin = newMinE == null ? null : newMinE.getKey();
- return Objects.equals(oldMin, newMin) ? null : newMin == null ? -1 : newMin;
+ return Objects.equals(oldMin, newMin)
+ ? null
+ : newMin == null ? -1 : newMin;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index eccef97f56c..a2863fc70df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+import java.io.File;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -201,4 +202,19 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
@Override public void startAutoReleaseSegments() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public @Nullable File archiveDir() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable File compactedSegment(long idx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitCompacted(long idx) {
+ // No-op.
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
index 340955a3ae6..121a57be913 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
@@ -19,14 +19,19 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.io.File;
import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -34,6 +39,7 @@ import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.junit.Assume.assumeFalse;
/**
@@ -49,6 +55,23 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
/** */
public static final String GROUPED_CACHE = "my-grouped-cache2";
+ /** @see DataStorageConfiguration#isWalCompactionEnabled() */
+ public boolean walCompactionEnabled = true;
+
+ /** */
+ private AtomicInteger cntr = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getDataStorageConfiguration()
+ .setWalCompactionEnabled(walCompactionEnabled)
+ .setWalSegmentSize((int)U.MB);
+
+ return cfg;
+ }
+
/** */
@Test
public void testCreation() throws Exception {
@@ -88,6 +111,8 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
snpCreate.createSnapshot(snpName, snpPath.getAbsolutePath(), false).get(TIMEOUT);
for (int incIdx = 1; incIdx < 3; incIdx++) {
+ addData(cli);
+
if (snpPath == null)
snpCreate.createIncrementalSnapshot(snpName).get(TIMEOUT);
else
@@ -106,6 +131,19 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
}
}
+ /** */
+ private void addData(IgniteEx node) {
+ IgniteCache<Integer, byte[]> cache = node.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < (int)(U.MB * 3 * GRID_CNT / U.KB); i++) {
+ byte[] bytes = new byte[(int)U.KB];
+
+ ThreadLocalRandom.current().nextBytes(bytes);
+
+ cache.put(cntr.incrementAndGet(), bytes);
+ }
+ }
+
/** */
@Test
public void testFailForUnknownBaseSnapshot() throws Exception {
@@ -127,6 +165,72 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
);
}
+ /** */
+ @Test
+ public void testFailIfPreviousIncrementNotAvailable() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ IgniteEx srv = startGridsWithCache(
+ GRID_CNT,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ IgniteEx cli = startClientGrid(
+ GRID_CNT,
+ (UnaryOperator<IgniteConfiguration>)
+ cfg -> cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
+ );
+
+ cli.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+ cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ File toRmv = snp(ignite(GRID_CNT - 1)).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 2);
+
+ assertTrue(toRmv.exists());
+ assertTrue(toRmv.isDirectory());
+
+ U.delete(toRmv);
+
+ assertThrowsWithCause(
+ () -> cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+ IgniteException.class
+ );
+ }
+
+ /** */
+ @Test
+ public void testFailIfSegmentNotFound() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ IgniteEx srv = startGridsWithCache(
+ 1,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ srv.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ addData(srv);
+
+ FileWriteAheadLogManager wal = (FileWriteAheadLogManager)srv.context().cache().context().wal();
+
+ assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 0, getTestTimeout()));
+
+ long segIdx = wal.lastCompactedSegment();
+
+ U.delete(wal.compactedSegment(segIdx));
+
+ assertThrowsWithCause(
+ () -> srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+ IgniteException.class
+ );
+ }
+
/** */
@Test
public void testIncrementalSnapshotFailsOnTopologyChange() throws Exception {
@@ -234,6 +338,36 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
assertFalse(snp(srv).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).exists());
}
+ /** */
+ @Test
+ public void testFailIfWalCompactionDisabled() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ walCompactionEnabled = false;
+
+ try {
+ IgniteEx srv = startGridsWithCache(
+ 1,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ srv.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ assertThrows(
+ null,
+ () -> srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+ IgniteException.class,
+ "Create incremental snapshot request has been rejected. WAL compaction must be enabled."
+ );
+ }
+ finally {
+ walCompactionEnabled = true;
+ }
+
+ }
+
/** */
private void checkFailWhenCacheDestroyed(String cache2rvm, String errMsg) throws Exception {
IgniteEx srv = startGridsWithCache(