You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/10/14 10:15:09 UTC
[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17613 Create incremental snapshot infrastructure (#10263)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
new 914dc24b242 IGNITE-17613 Create incremental snapshot infrastructure (#10263)
914dc24b242 is described below
commit 914dc24b2423d93c2cc9dc5d235b2449e51a15e2
Author: Nikolay <ni...@apache.org>
AuthorDate: Fri Oct 14 13:14:57 2022 +0300
IGNITE-17613 Create incremental snapshot infrastructure (#10263)
Co-authored-by: Maksim Timonin <ti...@gmail.com>
---
.../commandline/indexreader/IgniteIndexReader.java | 13 +-
.../snapshot/SnapshotCreateCommand.java | 12 +-
.../snapshot/SnapshotCreateCommandOption.java | 6 +-
.../commandline/snapshot/SnapshotSubcommand.java | 5 +-
.../apache/ignite/util/GridCommandHandlerTest.java | 2 +-
.../java/org/apache/ignite/IgniteSnapshot.java | 10 +-
.../org/apache/ignite/internal/cdc/CdcMain.java | 4 +-
.../processors/cache/GridLocalConfigManager.java | 37 +-
.../persistence/file/FilePageStoreManager.java | 8 +
.../snapshot/AbstractSnapshotFutureTask.java | 7 +
.../snapshot/IgniteSnapshotManager.java | 446 ++++++++++++++++++---
.../snapshot/IncrementalSnapshotFutureTask.java | 125 ++++++
.../IncrementalSnapshotFutureTaskResult.java | 22 +
.../snapshot/IncrementalSnapshotMetadata.java | 82 ++++
.../persistence/snapshot/SnapshotFutureTask.java | 7 -
.../persistence/snapshot/SnapshotMXBeanImpl.java | 12 +-
.../persistence/snapshot/SnapshotMetadata.java | 3 +-
.../snapshot/SnapshotOperationRequest.java | 24 +-
.../snapshot/SnapshotRestoreProcess.java | 10 +-
.../cache/persistence/wal/FileDescriptor.java | 18 +-
.../persistence/wal/FileWriteAheadLogManager.java | 8 +-
.../apache/ignite/internal/util/IgniteUtils.java | 47 +++
.../apache/ignite/internal/util/lang/GridFunc.java | 4 +-
.../visor/snapshot/VisorSnapshotCreateTask.java | 7 +-
.../visor/snapshot/VisorSnapshotCreateTaskArg.java | 14 +-
.../snapshot/VisorSnapshotRestoreTaskArg.java | 2 +-
.../org/apache/ignite/mxbean/SnapshotMXBean.java | 15 +
.../snapshot/AbstractSnapshotSelfTest.java | 10 +
.../snapshot/IgniteClusterSnapshotHandlerTest.java | 2 +-
.../IgniteClusterSnapshotRestoreSelfTest.java | 4 +-
.../snapshot/IgniteClusterSnapshotSelfTest.java | 51 ++-
.../snapshot/IgniteSnapshotMXBeanTest.java | 9 +
.../snapshot/IncrementalSnapshotTest.java | 264 ++++++++++++
.../internal/util/lang/GridFuncSelfTest.java | 54 +++
.../ignite/testsuites/IgniteSnapshotTestSuite.java | 4 +-
.../ignite/testsuites/IgniteUtilSelfTestSuite.java | 2 +
...ridCommandHandlerClusterByClassTest_help.output | 13 +-
...andHandlerClusterByClassWithSSLTest_help.output | 13 +-
38 files changed, 1233 insertions(+), 143 deletions(-)
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java
index 1842266680a..f0dcf4510fc 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java
@@ -18,10 +18,7 @@
package org.apache.ignite.internal.commandline.indexreader;
import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -62,6 +59,7 @@ import org.apache.ignite.internal.commandline.indexreader.ScanContext.PagesStati
import org.apache.ignite.internal.commandline.systemview.SystemViewCommand;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.IndexStorageImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
@@ -118,7 +116,6 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageIndex;
import static org.apache.ignite.internal.pagemem.PageIdUtils.partId;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
@@ -285,13 +282,13 @@ public class IgniteIndexReader implements AutoCloseable {
for (int i = 0; i < partCnt; i++)
partStores[i] = filePageStore(i, FLAG_DATA, storeFactory);
- Arrays.stream(root.listFiles(f -> f.getName().endsWith(CACHE_DATA_FILENAME))).forEach(f -> {
- try (ObjectInputStream stream = new ObjectInputStream(Files.newInputStream(f.toPath()))) {
- StoredCacheData data = (StoredCacheData)stream.readObject();
+ Arrays.stream(FilePageStoreManager.cacheDataFiles(root)).forEach(f -> {
+ try {
+ StoredCacheData data = GridLocalConfigManager.readCacheData(f, null, null);
storedCacheData.put(CU.cacheId(data.config().getName()), data);
}
- catch (ClassNotFoundException | IOException e) {
+ catch (IgniteCheckedException e) {
log.log(WARNING, "Can't read stored cache data. Inline for this cache will not be analyzed [f=" + f.getName() + ']', e);
}
});
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java
index 7582bd3fcfb..f346beebe75 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.visor.snapshot.VisorSnapshotCreateTaskArg;
import static org.apache.ignite.internal.commandline.CommandList.SNAPSHOT;
import static org.apache.ignite.internal.commandline.CommandLogger.optional;
import static org.apache.ignite.internal.commandline.snapshot.SnapshotCreateCommandOption.DESTINATION;
+import static org.apache.ignite.internal.commandline.snapshot.SnapshotCreateCommandOption.INCREMENTAL;
import static org.apache.ignite.internal.commandline.snapshot.SnapshotCreateCommandOption.SYNC;
/**
@@ -45,6 +46,7 @@ public class SnapshotCreateCommand extends SnapshotSubcommand {
String snpName = argIter.nextArg("Expected snapshot name.");
String snpPath = null;
boolean sync = false;
+ boolean incremental = false;
while (argIter.hasNextSubArg()) {
String arg = argIter.nextArg(null);
@@ -72,10 +74,15 @@ public class SnapshotCreateCommand extends SnapshotSubcommand {
sync = true;
}
+ else if (option == INCREMENTAL) {
+ if (incremental)
+ throw new IllegalArgumentException(INCREMENTAL.argName() + " arg specified twice.");
+ incremental = true;
+ }
}
- cmdArg = new VisorSnapshotCreateTaskArg(snpName, snpPath, sync);
+ cmdArg = new VisorSnapshotCreateTaskArg(snpName, snpPath, sync, incremental);
}
/** {@inheritDoc} */
@@ -84,8 +91,9 @@ public class SnapshotCreateCommand extends SnapshotSubcommand {
params.put(DESTINATION.argName() + " " + DESTINATION.arg(), DESTINATION.description());
params.put(SYNC.argName(), SYNC.description());
+ params.put(INCREMENTAL.argName(), INCREMENTAL.description());
usage(log, "Create cluster snapshot:", SNAPSHOT, params, name(), SNAPSHOT_NAME_ARG,
- optional(DESTINATION.argName(), DESTINATION.arg()), optional(SYNC.argName()));
+ optional(DESTINATION.argName(), DESTINATION.arg()), optional(SYNC.argName()), optional(INCREMENTAL.argName()));
}
}
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java
index a6d216d1781..23761a0e5a2 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java
@@ -30,7 +30,11 @@ public enum SnapshotCreateCommandOption implements CommandArg {
/** Snapshot directory path. */
DESTINATION("--dest", "path", "Path to the directory where the snapshot will be saved. If not specified, " +
- "the default configured snapshot directory will be used.");
+ "the default configured snapshot directory will be used."),
+
+ /** Incremental snapshot flag. */
+ INCREMENTAL("--incremental", null, "Create an incremental snapshot for previously created full snapshot. " +
+ "Full snapshot must be accessible via --dest and snapshot_name.");
/** Name. */
private final String name;
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java
index 44f9e7d73b0..2d5b3c331fe 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java
@@ -87,7 +87,10 @@ public abstract class SnapshotSubcommand extends AbstractCommand<Object> {
* @return General usage options.
*/
protected Map<String, String> generalUsageOptions() {
- return F.asMap(SNAPSHOT_NAME_ARG, "Snapshot name.");
+ return F.asMap(
+ SNAPSHOT_NAME_ARG,
+ "Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided."
+ );
}
/**
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index fe1be09f520..f092fb6c40e 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -3080,7 +3080,7 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
// Invalid command syntax check.
assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "create", snpName, "blah"));
- assertContains(log, testOut.toString(), "Invalid argument: blah. Possible options: --sync, --dest.");
+ assertContains(log, testOut.toString(), "Invalid argument: blah. Possible options: --sync, --dest, --incremental.");
assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "create", snpName, "--sync", "blah"));
assertContains(log, testOut.toString(), "Invalid argument: blah.");
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
index 5945805b076..08e23d9f022 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -38,10 +38,18 @@ public interface IgniteSnapshot {
* Create a consistent copy of all persistence cache groups from the whole cluster.
*
* @param name Snapshot unique name which satisfies the following name pattern [a-zA-Z0-9_].
- * @return Future which will be completed when a process ends.
+ * @return Future which will be completed when the process ends.
*/
public IgniteFuture<Void> createSnapshot(String name);
+ /**
+ * Create an incremental snapshot for a given full snapshot.
+ *
+ * @param fullSnapshot Snapshot unique name which satisfies the following name pattern [a-zA-Z0-9_].
+ * @return Future which will be completed when the process ends.
+ */
+ public IgniteFuture<Void> createIncrementalSnapshot(String fullSnapshot);
+
/**
* Cancel running snapshot operation. All intermediate results of cancelled snapshot operation will be deleted.
* If snapshot already created this command will have no effect.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 91abaa7faa1..e56c3dd0899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
@@ -77,7 +78,6 @@ import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer
import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
@@ -682,7 +682,7 @@ public class CdcMain implements Runnable {
.filter(File::exists)
// Cache group directory can contain several cache data files.
// See GridLocalConfigManager#cacheConfigurationFile(CacheConfiguration<?, ?>)
- .flatMap(cacheDir -> Arrays.stream(cacheDir.listFiles(f -> f.getName().endsWith(CACHE_DATA_FILENAME))))
+ .flatMap(cacheDir -> Arrays.stream(FilePageStoreManager.cacheDataFiles(cacheDir)))
.map(f -> {
try {
CdcCacheEvent evt = GridLocalConfigManager.readCacheData(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
index 6cc56335480..787013bbd74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
@@ -20,10 +20,9 @@ package org.apache.ignite.internal.processors.cache;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@@ -62,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
+import org.jetbrains.annotations.Nullable;
import static java.nio.file.Files.newDirectoryStream;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
@@ -197,18 +197,26 @@ public class GridLocalConfigManager {
/**
* @param conf File with stored cache data.
+ * @param marshaller Marshaller.
+ * @param cfg Ignite configuration.
* @return Cache data.
* @throws IgniteCheckedException If failed.
*/
public static StoredCacheData readCacheData(
File conf,
- Marshaller marshaller,
- IgniteConfiguration cfg
+ @Nullable Marshaller marshaller,
+ @Nullable IgniteConfiguration cfg
) throws IgniteCheckedException {
- try (InputStream stream = new BufferedInputStream(new FileInputStream(conf))) {
+ try (InputStream stream = new BufferedInputStream(Files.newInputStream(conf.toPath()))) {
+ if (marshaller == null || cfg == null) {
+ try (ObjectInputStream ostream = new ObjectInputStream(stream)) {
+ return (StoredCacheData)ostream.readObject();
+ }
+ }
+
return marshaller.unmarshal(stream, U.resolveClassLoader(cfg));
}
- catch (IgniteCheckedException | IOException e) {
+ catch (IgniteCheckedException | IOException | ClassNotFoundException e) {
throw new IgniteCheckedException("An error occurred during cache configuration loading from file [file=" +
conf.getAbsolutePath() + "]", e);
}
@@ -221,7 +229,7 @@ public class GridLocalConfigManager {
*/
public void writeCacheData(StoredCacheData cacheData, File conf) throws IgniteCheckedException {
// Pre-existing file will be truncated upon stream open.
- try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(conf))) {
+ try (OutputStream stream = new BufferedOutputStream(Files.newOutputStream(conf.toPath()))) {
marshaller.marshal(cacheData, stream);
}
catch (IOException e) {
@@ -355,9 +363,18 @@ public class GridLocalConfigManager {
* @param lsnr Instance of listener to add.
*/
public void addConfigurationChangeListener(BiConsumer<String, File> lsnr) {
- assert chgLock.isWriteLockedByCurrentThread();
+ if (chgLock.isWriteLockedByCurrentThread())
+ lsnrs.add(lsnr);
+ else {
+ chgLock.writeLock().lock();
- lsnrs.add(lsnr);
+ try {
+ lsnrs.add(lsnr);
+ }
+ finally {
+ chgLock.writeLock().unlock();
+ }
+ }
}
/**
@@ -484,7 +501,7 @@ public class GridLocalConfigManager {
* @param ccfg Cache configuration.
* @return Cache configuration file with respect to {@link CacheConfiguration#getGroupName} value.
*/
- private File cacheConfigurationFile(CacheConfiguration<?, ?> ccfg) {
+ public File cacheConfigurationFile(CacheConfiguration<?, ?> ccfg) {
File cacheWorkDir = cacheWorkDir(ccfg);
return ccfg.getGroupName() == null ? new File(cacheWorkDir, CACHE_DATA_FILENAME) :
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index cf6f23515ca..c3a6a252cc1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -943,6 +943,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + dir);
}
+ /**
+ * @param root Root directory.
+ * @return Array of cache data files.
+ */
+ public static File[] cacheDataFiles(File root) {
+ return root.listFiles(f -> f.getName().endsWith(CACHE_DATA_FILENAME));
+ }
+
/** {@inheritDoc} */
@Override public boolean hasIndexStore(int grpId) {
return !grpsWithoutIdx.contains(grpId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
index cb9486a96d7..7ccc2f3944c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
@@ -121,6 +121,13 @@ abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
return reqId;
}
+ /**
+ * @return Set of cache groups included into snapshot operation.
+ */
+ public Set<Integer> affectedCacheGroups() {
+ return parts.keySet();
+ }
+
/**
* Initiates snapshot task.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e27d6161d53..cd894261b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -66,6 +66,7 @@ import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -106,6 +107,7 @@ import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.systemview.walker.SnapshotViewWalker;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheType;
@@ -308,6 +310,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Total snapshot files count which receiver should expect to receive. */
private static final String SNP_PARTITIONS_CNT = "partsCnt";
+ /** Incremental snapshots directory name. */
+ public static final String INC_SNP_DIR = "increments";
+
+ /** Pattern for incremental snapshot directory names. */
+ public static final Pattern INC_SNP_NAME_PATTERN = U.fixedLengthNumberNamePattern(null);
+
/**
* Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
* It is important to have only one buffer per thread (instead of creating each buffer per
@@ -506,7 +514,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
String.class,
"The error message of last started cluster snapshot request which fail with an error. " +
"This value will be empty if last snapshot request has been completed successfully.");
- mreg.register("LocalSnapshotNames", this::localSnapshotNames, List.class,
+ mreg.register("LocalSnapshotNames", () -> localSnapshotNames(null), List.class,
"The list of names of all snapshots currently saved on the local node with respect to " +
"the configured via IgniteConfiguration snapshot working path.");
mreg.register("LastRequestId", () -> Optional.ofNullable(lastSeenSnpFut.rqId).map(UUID::toString).orElse(""),
@@ -572,7 +580,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
SNAPSHOT_SYS_VIEW,
SNAPSHOT_SYS_VIEW_DESC,
new SnapshotViewWalker(),
- () -> F.flatCollections(F.transform(localSnapshotNames(), name -> readSnapshotMetadatas(name, null))),
+ () -> F.flatCollections(F.transform(localSnapshotNames(null), name -> readSnapshotMetadatas(name, null))),
this::snapshotViewSupplier);
}
@@ -704,6 +712,38 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return snpPath == null ? new File(locSnpDir, snpName) : new File(snpPath, snpName);
}
+ /**
+ * Returns path to specific incremental snapshot.
+ * For example, {@code "work/snapshots/mybackup/increments/node01/0001"}.
+ *
+ * @param snpName Snapshot name.
+ * @param snpPath Snapshot directory path.
+ * @param incIdx Increment index.
+ * @return Local snapshot directory where snapshot files are located.
+ */
+ public File incrementalSnapshotLocalDir(String snpName, @Nullable String snpPath, int incIdx) {
+ return Paths.get(
+ incrementalSnapshotsLocalRootDir(snpName, snpPath).getAbsolutePath(),
+ U.fixedLengthNumberName(incIdx, null)
+ ).toFile();
+ }
+
+ /**
+ * Returns root folder for incremental snapshot.
+ * For example, {@code "work/snapshots/mybackup/increments/node01"}.
+ *
+ * @param snpName Snapshot name.
+ * @param snpPath Snapshot directory path.
+ * @return Local snapshot directory where snapshot files are located.
+ */
+ public File incrementalSnapshotsLocalRootDir(String snpName, @Nullable String snpPath) {
+ return Paths.get(
+ snapshotLocalDir(snpName, snpPath).getAbsolutePath(),
+ INC_SNP_DIR,
+ pdsSettings.folderName()
+ ).toFile();
+ }
+
/**
* @param snpName Snapshot name.
* @return Local snapshot directory for snapshot with given name.
@@ -773,6 +813,110 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
"on the local node [missed=" + leftGrps + ", nodeId=" + cctx.localNodeId() + ']'));
}
+ if (req.incremental()) {
+ SnapshotMetadata meta = readSnapshotMetadata(new File(
+ snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
+ snapshotMetaFileName(cctx.localNode().consistentId().toString())
+ ));
+
+ try {
+ checkIncrementalCanBeCreated(req.snapshotName(), req.snapshotPath(), meta);
+ }
+ catch (IgniteCheckedException | IOException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return initLocalIncrementalSnapshot(req, meta);
+ }
+ else
+ return initLocalFullSnapshot(req, grpIds, withMetaStorage);
+ }
+
+ /**
+ * @param req Request on snapshot creation.
+ * @param meta Full snapshot metadata.
+ * @return Future which will be completed when a snapshot has been started.
+ */
+ private IgniteInternalFuture<SnapshotOperationResponse> initLocalIncrementalSnapshot(
+ SnapshotOperationRequest req,
+ SnapshotMetadata meta
+ ) {
+ File incSnpDir = incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex());
+
+ IgniteInternalFuture<SnapshotOperationResponse> task0 = registerTask(req.snapshotName(), new IncrementalSnapshotFutureTask(
+ cctx,
+ req.operationalNodeId(),
+ req.requestId(),
+ meta,
+ req.snapshotPath(),
+ req.incrementIndex(),
+ tmpWorkDir,
+ ioFactory
+ )).chain(fut -> {
+ if (fut.error() != null)
+ throw F.wrap(fut.error());
+
+ assert incSnpDir.exists() : "Incremental snapshot directory must exists";
+
+ IncrementalSnapshotMetadata incMeta = new IncrementalSnapshotMetadata(
+ req.requestId(),
+ req.snapshotName(),
+ req.incrementIndex(),
+ cctx.localNode().consistentId().toString(),
+ pdsSettings.folderName(),
+ null /* WAL Pointer for CUT record goes here. */
+ );
+
+ writeSnapshotMetafile(
+ new File(incSnpDir, incrementalSnapshotMetaFileName(req.incrementIndex())),
+ incMeta
+ );
+
+ return new SnapshotOperationResponse();
+ });
+
+ if (task0.isDone())
+ return task0;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Incremental snapshot operation submited for execution" +
+ "[snpName=" + req.snapshotName() + ", incIdx=" + req.incrementIndex());
+ }
+
+ clusterSnpReq = req;
+
+ cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
+ SnapshotOperationRequest snpReq = clusterSnpReq;
+
+ AbstractSnapshotFutureTask<?> task = locSnpTasks.get(snpReq.snapshotName());
+
+ if (task == null)
+ return;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Incremental snapshot operation started" +
+ "[snpName=" + req.snapshotName() + ", incIdx=" + req.incrementIndex());
+ }
+
+ writeSnapshotDirectoryToMetastorage(incSnpDir);
+
+ task.start();
+ });
+
+ return task0;
+ }
+
+ /**
+ * @param req Request
+ * @param grpIds Groups.
+ * @param withMetaStorage Flag to include metastorage.
+ * @return Create snapshot future.
+ */
+ private IgniteInternalFuture<SnapshotOperationResponse> initLocalFullSnapshot(
+ SnapshotOperationRequest req,
+ List<Integer> grpIds,
+ boolean withMetaStorage
+ ) {
Map<Integer, Set<Integer>> parts = new HashMap<>();
// Prepare collection of pairs group and appropriate cache partition to be snapshot.
@@ -815,11 +959,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
File snpDir = snapshotLocalDir(req.snapshotName(), req.snapshotPath());
- File smf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString()));
-
- if (smf.exists())
- throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath()));
-
snpDir.mkdirs();
SnapshotFutureTaskResult res = (SnapshotFutureTaskResult)fut.result();
@@ -836,31 +975,48 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
cctx.gridConfig().getEncryptionSpi().masterKeyDigest()
);
- try (OutputStream out = Files.newOutputStream(smf.toPath())) {
- byte[] bytes = U.marshal(marsh, meta);
- int blockSize = SNAPSHOT_LIMITED_TRANSFER_BLOCK_SIZE_BYTES;
-
- for (int off = 0; off < bytes.length; off += blockSize) {
- int len = Math.min(blockSize, bytes.length - off);
-
- transferRateLimiter.acquire(len);
-
- out.write(bytes, off, len);
- }
- }
-
- log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
+ writeSnapshotMetafile(
+ new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString())),
+ meta
+ );
SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(), snpDir);
return new SnapshotOperationResponse(handlers.invokeAll(SnapshotHandlerType.CREATE, ctx));
}
- catch (IOException | IgniteCheckedException e) {
+ catch (IgniteCheckedException e) {
throw F.wrap(e);
}
});
}
+ /**
+ * @param smf File to write to.
+ * @param meta Snapshot meta information.
+ */
+ private <M extends Serializable> void writeSnapshotMetafile(File smf, M meta) {
+ if (smf.exists())
+ throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath()));
+
+ try (OutputStream out = Files.newOutputStream(smf.toPath())) {
+ byte[] bytes = U.marshal(marsh, meta);
+ int blockSize = SNAPSHOT_LIMITED_TRANSFER_BLOCK_SIZE_BYTES;
+
+ for (int off = 0; off < bytes.length; off += blockSize) {
+ int len = Math.min(blockSize, bytes.length - off);
+
+ transferRateLimiter.acquire(len);
+
+ out.write(bytes, off, len);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
+ }
+
/**
* @param id Request id.
* @param res Results.
@@ -987,7 +1143,10 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
if (req.error() != null) {
snpReq.error(req.error());
- deleteSnapshot(snapshotLocalDir(req.snapshotName(), req.snapshotPath()), pdsSettings.folderName());
+ if (req.incremental())
+ U.delete(incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex()));
+ else
+ deleteSnapshot(snapshotLocalDir(req.snapshotName(), req.snapshotPath()), pdsSettings.folderName());
}
removeLastMetaStorageKey();
@@ -1109,7 +1268,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/**
* @return List of all known snapshots on the local node.
*/
- public List<String> localSnapshotNames() {
+ public List<String> localSnapshotNames(@Nullable String snpPath) {
if (cctx.kernalContext().clientNode())
throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
@@ -1117,12 +1276,41 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return Collections.emptyList();
synchronized (snpOpMux) {
- return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+ File[] dirs = (snpPath == null ? locSnpDir : new File(snpPath)).listFiles(File::isDirectory);
+
+ if (dirs == null)
+ return Collections.emptyList();
+
+ return Arrays.stream(dirs)
.map(File::getName)
.collect(Collectors.toList());
}
}
+ /**
+ * @param snpName Full snapshot name.
+ * @param snpPath Snapshot path.
+ * @return Maximum existing incremental snapshot index.
+ */
+ private int maxLocalIncrementSnapshot(String snpName, @Nullable String snpPath) {
+ if (cctx.kernalContext().clientNode())
+ throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+ synchronized (snpOpMux) {
+ File[] incDirs = incrementalSnapshotsLocalRootDir(snpName, snpPath).listFiles(File::isDirectory);
+
+ if (incDirs == null)
+ return 0;
+
+ return Arrays.stream(incDirs)
+ .map(File::getName)
+ .filter(name -> INC_SNP_NAME_PATTERN.matcher(name).matches())
+ .mapToInt(Integer::parseInt)
+ .max()
+ .orElse(0);
+ }
+ }
+
/** {@inheritDoc} */
@Override public IgniteFuture<Void> cancelSnapshot(String name) {
return new IgniteFutureImpl<>(cancelSnapshot0(name).chain(f -> null));
@@ -1505,7 +1693,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** {@inheritDoc} */
@Override public IgniteFuture<Void> createSnapshot(String name) {
- return createSnapshot(name, null);
+ return createSnapshot(name, null, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> createIncrementalSnapshot(String name) {
+ return createSnapshot(name, null, true);
}
/**
@@ -1515,7 +1708,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
* @param snpPath Snapshot directory path.
* @return Future which will be completed when a process ends.
*/
- public IgniteFutureImpl<Void> createSnapshot(String name, @Nullable String snpPath) {
+ public IgniteFutureImpl<Void> createSnapshot(String name, @Nullable String snpPath, boolean incremental) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
@@ -1546,7 +1739,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return new IgniteSnapshotFutureImpl(cctx.kernalContext().closure()
.callAsyncNoFailover(BALANCE,
- new CreateSnapshotCallable(name),
+ new CreateSnapshotCallable(name, incremental),
Collections.singletonList(crd),
false,
0,
@@ -1554,6 +1747,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
}
ClusterSnapshotFuture snpFut0;
+ int incIdx = -1;
synchronized (snpOpMux) {
if (clusterSnpFut != null && !clusterSnpFut.isDone()) {
@@ -1565,10 +1759,20 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
if (clusterSnpReq != null)
throw new IgniteException("Create snapshot request has been rejected. Parallel snapshot processes are not allowed.");
- if (localSnapshotNames().contains(name)) {
- throw new IgniteException(
- "Create snapshot request has been rejected. Snapshot with given name already exists on local node."
- );
+ boolean snpExists = localSnapshotNames(snpPath).contains(name);
+
+ if (!incremental && snpExists) {
+ throw new IgniteException("Create snapshot request has been rejected. " +
+ "Snapshot with given name already exists on local node.");
+ }
+
+ if (incremental) {
+ if (!snpExists) {
+ throw new IgniteException("Create incremental snapshot request has been rejected. " +
+ "Base snapshot with given name doesn't exist on local node.");
+ }
+
+ incIdx = maxLocalIncrementSnapshot(name, snpPath) + 1;
}
if (isRestoring()) {
@@ -1602,10 +1806,21 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
Set<UUID> bltNodeIds =
new HashSet<>(F.viewReadOnly(srvNodes, F.node2id(), (node) -> CU.baselineNode(node, clusterState)));
- startSnpProc.start(snpFut0.rqId,
- new SnapshotOperationRequest(snpFut0.rqId, cctx.localNodeId(), name, snpPath, grps, bltNodeIds));
-
- String msg = "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']';
+ startSnpProc.start(snpFut0.rqId, new SnapshotOperationRequest(
+ snpFut0.rqId,
+ cctx.localNodeId(),
+ name,
+ snpPath,
+ grps,
+ bltNodeIds,
+ incremental,
+ incIdx
+ ));
+
+ String msg =
+ "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps +
+ (incremental ? "" : (", incremental=true, index=" + incIdx)) +
+ ']';
recordSnapshotEvent(name, msg, EVT_CLUSTER_SNAPSHOT_STARTED);
@@ -1668,7 +1883,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
String snpName = (String)metaStorage.read(SNP_RUNNING_KEY);
String snpDirName = snpName == null ? (String)metaStorage.read(SNP_RUNNING_DIR_KEY) : null;
- File snpDir = snpName != null ? snapshotLocalDir(snpName, null) : snpDirName != null ? new File(snpDirName) : null;
+ File snpDir = snpName != null
+ ? snapshotLocalDir(snpName, null)
+ : snpDirName != null
+ ? new File(snpDirName)
+ : null;
if (snpDir == null)
return;
@@ -1678,7 +1897,10 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
for (File tmp : snapshotTmpDir().listFiles())
U.delete(tmp);
- deleteSnapshot(snpDir, pdsSettings.folderName());
+ if (INC_SNP_NAME_PATTERN.matcher(snpDir.getName()).matches() && snpDir.getAbsolutePath().contains(INC_SNP_DIR))
+ U.delete(snpDir);
+ else
+ deleteSnapshot(snpDir, pdsSettings.folderName());
if (log.isInfoEnabled()) {
log.info("Previous attempt to create snapshot fail due to the local node crash. All resources " +
@@ -1703,6 +1925,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
SnapshotOperationRequest snpReq = clusterSnpReq;
+ if (snpReq.incremental())
+ return;
+
AbstractSnapshotFutureTask<?> task = locSnpTasks.get(snpReq.snapshotName());
if (task == null)
@@ -1774,7 +1999,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
for (AbstractSnapshotFutureTask<?> sctx : F.view(locSnpTasks.values(), t -> t instanceof SnapshotFutureTask)) {
Set<Integer> retain = new HashSet<>(grps);
- retain.retainAll(((SnapshotFutureTask)sctx).affectedCacheGroups());
+ retain.retainAll(sctx.affectedCacheGroups());
if (!retain.isEmpty()) {
sctx.acceptException(new IgniteCheckedException("Snapshot has been interrupted due to some of the required " +
@@ -1791,6 +2016,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
}
+ /**
+ * @param incIdx Increment index.
+ * @return Snapshot metadata file name.
+ */
+ public static String incrementalSnapshotMetaFileName(int incIdx) {
+ return U.fixedLengthNumberName(incIdx, SNAPSHOT_METAFILE_EXT);
+ }
+
/**
* @param snpDir The full path to the snapshot files.
* @param folderName The node folder name, usually it's the same as the U.maskForFileName(consistentId).
@@ -2015,6 +2248,24 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
rqId);
}
+ /** @param snpLocDir Snapshot local directory. */
+ public void writeSnapshotDirectoryToMetastorage(File snpLocDir) {
+ cctx.database().checkpointReadLock();
+
+ try {
+ assert metaStorage != null && metaStorage.read(SNP_RUNNING_DIR_KEY) == null :
+ "The previous snapshot hasn't been completed correctly";
+
+ metaStorage.write(SNP_RUNNING_DIR_KEY, snpLocDir.getAbsolutePath());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ cctx.database().checkpointReadUnlock();
+ }
+ }
+
/** Snapshot finished successfully or already restored. Key can be removed. */
private void removeLastMetaStorageKey() throws IgniteCheckedException {
cctx.database().checkpointReadLock();
@@ -2178,6 +2429,102 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
return new IgniteFutureImpl<>(cctx.kernalContext().task().execute(taskCls, snpName));
}
+ /**
+ * Checks that incremental snapshot can be created for given full snapshot and current cluster state.
+ *
+ * @param name Full snapshot name.
+ * @param snpPath Snapshot path.
+ * @param meta Full snapshot metadata.
+ */
+ private void checkIncrementalCanBeCreated(
+ String name,
+ @Nullable String snpPath,
+ SnapshotMetadata meta
+ ) throws IgniteCheckedException, IOException {
+ File snpDir = snapshotLocalDir(name, snpPath);
+
+ Set<String> aliveNodesConsIds = cctx.discovery().aliveServerNodes()
+ .stream()
+ .map(node -> node.consistentId().toString())
+ .collect(Collectors.toSet());
+
+ for (String consId : meta.baselineNodes()) {
+ if (!aliveNodesConsIds.contains(consId)) {
+ throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+ "Node from full snapshot offline [consistentId=" + consId + ']');
+ }
+ }
+
+ File rootSnpCachesDir = new File(snpDir, databaseRelativePath(meta.folderName()));
+
+ for (int grpId : meta.cacheGroupIds()) {
+ if (grpId == METASTORAGE_CACHE_ID)
+ continue;
+
+ CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grpId);
+
+ if (gctx == null) {
+ throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+ "Cache group destroyed [groupId=" + grpId + ']');
+ }
+
+ if (gctx.config().isEncryptionEnabled()) {
+ throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+ "Encrypted cache groups not supported [groupId=" + grpId + ']');
+ }
+
+ List<File> snpCacheDir =
+ cacheDirectories(rootSnpCachesDir, grpName -> gctx.cacheOrGroupName().equals(grpName));
+
+ if (snpCacheDir.isEmpty()) {
+ throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+ "Cache group directory not found [groupId=" + grpId + ']');
+ }
+
+ assert snpCacheDir.size() == 1 : "Single snapshot cache directory must be found";
+
+ for (File snpDataFile : FilePageStoreManager.cacheDataFiles(snpCacheDir.get(0))) {
+ StoredCacheData snpCacheData = GridLocalConfigManager.readCacheData(
+ snpDataFile,
+ MarshallerUtils.jdkMarshaller(cctx.kernalContext().igniteInstanceName()),
+ cctx.kernalContext().config()
+ );
+
+ byte[] snpCacheDataBytes = Files.readAllBytes(snpDataFile.toPath());
+
+ File nodeDataFile = new File(snpDataFile.getAbsolutePath().replace(
+ rootSnpCachesDir.getAbsolutePath(),
+ pdsSettings.persistentStoreNodePath().getAbsolutePath()
+ ));
+
+ if (!nodeDataFile.exists()) {
+ throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+ "Cache destroyed [cacheId=" + snpCacheData.cacheId() +
+ ", cacheName=" + snpCacheData.config().getName() + ']');
+ }
+
+ byte[] nodeCacheDataBytes = Files.readAllBytes(nodeDataFile.toPath());
+
+ if (!Arrays.equals(snpCacheDataBytes, nodeCacheDataBytes)) {
+ throw new IgniteCheckedException(
+ cacheChangedException(snpCacheData.cacheId(), snpCacheData.config().getName())
+ );
+ }
+ }
+ }
+ }
+
+ /**
+ * Throw cache changed exception.
+ *
+ * @param cacheId Cache id.
+ * @param name Cache name.
+ */
+ public static String cacheChangedException(int cacheId, String name) {
+ return "Create incremental snapshot request has been rejected. " +
+ "Cache changed [cacheId=" + cacheId + ", cacheName=" + name + ']';
+ }
+
/**
* @param meta Snapshot metadata.
* @return Snapshot view.
@@ -3138,22 +3485,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
"[snpName=" + snpLocDir.getName() + ", absPath=" + dbDir.getAbsolutePath() + ']');
}
- cctx.database().checkpointReadLock();
+ writeSnapshotDirectoryToMetastorage(snpLocDir);
try {
- assert metaStorage != null && metaStorage.read(SNP_RUNNING_DIR_KEY) == null :
- "The previous snapshot hasn't been completed correctly";
-
- metaStorage.write(SNP_RUNNING_DIR_KEY, snpLocDir.getAbsolutePath());
-
U.ensureDirectory(dbDir, "snapshot work directory for a local snapshot sender", log);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
- finally {
- cctx.database().checkpointReadUnlock();
- }
}
/** {@inheritDoc} */
@@ -3434,6 +3773,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Snapshot name. */
private final String snpName;
+ /** Incremental flag. */
+ private final boolean incremental;
+
/** Auto-injected grid instance. */
@IgniteInstanceResource
private transient IgniteEx ignite;
@@ -3441,13 +3783,17 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/**
* @param snpName Snapshot name.
*/
- public CreateSnapshotCallable(String snpName) {
+ public CreateSnapshotCallable(String snpName, boolean incremental) {
this.snpName = snpName;
+ this.incremental = incremental;
}
/** {@inheritDoc} */
@Override public Void call() throws Exception {
- ignite.snapshot().createSnapshot(snpName).get();
+ if (incremental)
+ ignite.snapshot().createIncrementalSnapshot(snpName).get();
+ else
+ ignite.snapshot().createSnapshot(snpName).get();
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
new file mode 100644
index 00000000000..aa9e2208c05
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class IncrementalSnapshotFutureTask
+ extends AbstractSnapshotFutureTask<IncrementalSnapshotFutureTaskResult>
+ implements BiConsumer<String, File> {
+ /** Index of incremental snapshot. */
+ private final int incIdx;
+
+ /** Snapshot path. */
+ private final @Nullable String snpPath;
+
+ /** Metadata of the full snapshot. */
+ private final Set<Integer> affectedCacheGrps;
+
+ /** */
+ public IncrementalSnapshotFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ UUID reqNodeId,
+ SnapshotMetadata meta,
+ String snpPath,
+ int incIdx,
+ File tmpWorkDir,
+ FileIOFactory ioFactory
+ ) {
+ super(
+ cctx,
+ srcNodeId,
+ reqNodeId,
+ meta.snapshotName(),
+ tmpWorkDir,
+ ioFactory,
+ new SnapshotSender(
+ cctx.logger(IncrementalSnapshotFutureTask.class),
+ cctx.kernalContext().pools().getSnapshotExecutorService()
+ ) {
+ @Override protected void init(int partsCnt) {
+ // No-op.
+ }
+
+ @Override protected void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+ // No-op.
+ }
+
+ @Override protected void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+ // No-op.
+ }
+ },
+ null
+ );
+
+ this.incIdx = incIdx;
+ this.snpPath = snpPath;
+ this.affectedCacheGrps = new HashSet<>(meta.cacheGroupIds());
+
+ cctx.cache().configManager().addConfigurationChangeListener(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Integer> affectedCacheGroups() {
+ return affectedCacheGrps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean start() {
+ try {
+ File incSnpDir = cctx.snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, incIdx);
+
+ if (!incSnpDir.mkdirs()) {
+ onDone(new IgniteException("Can't create snapshot directory[dir=" + incSnpDir.getAbsolutePath() + ']'));
+
+ return false;
+ }
+
+ onDone(new IncrementalSnapshotFutureTaskResult());
+
+ return true;
+ }
+ finally {
+ cctx.cache().configManager().removeConfigurationChangeListener(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acceptException(Throwable th) {
+ cctx.cache().configManager().removeConfigurationChangeListener(this);
+
+ onDone(th);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(String name, File file) {
+ onDone(new IgniteException(IgniteSnapshotManager.cacheChangedException(CU.cacheId(name), name)));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTaskResult.java
new file mode 100644
index 00000000000..2ff37e6d3f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTaskResult.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+/** */
+class IncrementalSnapshotFutureTaskResult {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
new file mode 100644
index 00000000000..c262b070d82
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Incremental snapshot metadata file.
+ *
+ * @see IgniteSnapshotManager#createIncrementalSnapshot(String)
+ */
+public class IncrementalSnapshotMetadata implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Unique snapshot request id. */
+ private final UUID rqId;
+
+ /** Snapshot name. */
+ private final String snpName;
+
+ /** Increment index. */
+ private final int incIdx;
+
+ /** Consistent id of a node to which this metadata relates. */
+ private final String consId;
+
+ /**
+ * Directory related to the current consistent node id on which partition files are stored.
+ * For some of the cases, consId doesn't equal the directory name.
+ */
+ private final String folderName;
+
+ /** WAL pointer to consistent cut record. */
+ private final WALPointer cutPtr;
+
+ /**
+ * @param rqId Unique request id.
+ * @param snpName Snapshot name.
+ * @param consId Consistent id of a node to which this metadata relates.
+ * @param folderName Directory name which stores the data files.
+ * @param cutPtr Pointer to consistent cut record.
+ */
+ public IncrementalSnapshotMetadata(
+ UUID rqId,
+ String snpName,
+ int incIdx,
+ String consId,
+ String folderName,
+ WALPointer cutPtr
+ ) {
+ this.rqId = rqId;
+ this.snpName = snpName;
+ this.incIdx = incIdx;
+ this.consId = consId;
+ this.folderName = folderName;
+ this.cutPtr = cutPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IncrementalSnapshotMetadata.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index aa5326aa9af..4bacbb349f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -189,13 +189,6 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe
this.locBuff = locBuff;
}
- /**
- * @return Set of cache groups included into snapshot operation.
- */
- public Set<Integer> affectedCacheGroups() {
- return parts.keySet();
- }
-
/**
* @param th An exception which occurred during snapshot processing.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
index 713ab2e46c0..7bea6bcb971 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
@@ -50,7 +50,15 @@ public class SnapshotMXBeanImpl implements SnapshotMXBean {
/** {@inheritDoc} */
@Override public void createSnapshot(String snpName, String snpPath) {
- IgniteFuture<Void> fut = mgr.createSnapshot(snpName, F.isEmpty(snpPath) ? null : snpPath);
+ IgniteFuture<Void> fut = mgr.createSnapshot(snpName, F.isEmpty(snpPath) ? null : snpPath, false);
+
+ if (fut.isDone())
+ fut.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void createIncrementalSnapshot(String fullSnapshot, String fullSnapshotPath) {
+ IgniteFuture<Void> fut = mgr.createSnapshot(fullSnapshot, F.isEmpty(fullSnapshotPath) ? null : fullSnapshotPath, true);
if (fut.isDone())
fut.get();
@@ -88,6 +96,8 @@ public class SnapshotMXBeanImpl implements SnapshotMXBean {
if (req != null) {
return "Create snapshot operation is in progress [name=" + req.snapshotName() +
+ ", incremental=" + req.incremental() +
+ (req.incremental() ? (", incrementalIndex=" + req.incrementIndex()) : "") +
", id=" + req.requestId() + ']';
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index e1f8e4e2e6c..10ab16aa82b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -85,7 +85,8 @@ public class SnapshotMetadata implements Serializable {
@Nullable private final byte[] masterKeyDigest;
/**
- * F@param snpName Snapshot name.
+ * @param rqId Unique request id.
+ * @param snpName Snapshot name.
* @param consId Consistent id of a node to which this metadata relates.
* @param folderName Directory name which stores the data files.
* @param pageSize Page size of stored snapshot data.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index 14c72f661ea..23a55a6251c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -64,6 +64,12 @@ public class SnapshotOperationRequest implements Serializable {
/** Operation start time. */
private final long startTime;
+ /** If {@code true} then incremental snapshot requested. */
+ private final boolean incremental;
+
+ /** Index of incremental snapshot. */
+ private final int incIdx;
+
/**
* @param reqId Request ID.
* @param opNodeId Operational node ID.
@@ -71,6 +77,8 @@ public class SnapshotOperationRequest implements Serializable {
* @param snpPath Snapshot directory path.
* @param grps List of cache group names.
* @param nodes Baseline node IDs that must be alive to complete the operation.
+ * @param incremental {@code True} if incremental snapshot requested.
+ * @param incIdx Incremental snapshot index.
*/
public SnapshotOperationRequest(
UUID reqId,
@@ -78,7 +86,9 @@ public class SnapshotOperationRequest implements Serializable {
String snpName,
String snpPath,
@Nullable Collection<String> grps,
- Set<UUID> nodes
+ Set<UUID> nodes,
+ boolean incremental,
+ int incIdx
) {
this.reqId = reqId;
this.opNodeId = opNodeId;
@@ -86,6 +96,8 @@ public class SnapshotOperationRequest implements Serializable {
this.grps = grps;
this.nodes = nodes;
this.snpPath = snpPath;
+ this.incremental = incremental;
+ this.incIdx = incIdx;
startTime = U.currentTimeMillis();
}
@@ -145,6 +157,16 @@ public class SnapshotOperationRequest implements Serializable {
this.err = err;
}
+ /** @return {@code True} if incremental snapshot requested. */
+ public boolean incremental() {
+ return incremental;
+ }
+
+ /** @return Incremental index. */
+ public int incrementIndex() {
+ return incIdx;
+ }
+
/** @return Start time. */
public long startTime() {
return startTime;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index d41fdda28ca..08cac31a386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -332,7 +332,15 @@ public class SnapshotRestoreProcess {
Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());
SnapshotOperationRequest req = new SnapshotOperationRequest(
- fut0.rqId, F.first(dataNodes), snpName, snpPath, cacheGrpNames, new HashSet<>(bltNodes));
+ fut0.rqId,
+ F.first(dataNodes),
+ snpName,
+ snpPath,
+ cacheGrpNames,
+ new HashSet<>(bltNodes),
+ false,
+ -1
+ );
prepareRestoreProc.start(req.requestId(), req);
});
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
index eeaab94b87d..2f34b575bbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
-import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.StandardOpenOption.READ;
@@ -36,9 +36,6 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe
/** file extension of WAL segment. */
private static final String WAL_SEGMENT_FILE_EXT = ".wal";
- /** Length of WAL segment file name. */
- private static final int WAL_SEGMENT_FILE_NAME_LENGTH = 16;
-
/** File represented by this class. */
protected final File file;
@@ -67,7 +64,7 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe
assert fileName.contains(WAL_SEGMENT_FILE_EXT);
- this.idx = idx == null ? Long.parseLong(fileName.substring(0, WAL_SEGMENT_FILE_NAME_LENGTH)) : idx;
+ this.idx = idx == null ? U.fixedLengthFileNumber(fileName) : idx;
}
/**
@@ -77,16 +74,7 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe
* @return Segment file name.
*/
public static String fileName(long idx) {
- SB b = new SB();
-
- String segmentStr = Long.toString(idx);
-
- for (int i = segmentStr.length(); i < WAL_SEGMENT_FILE_NAME_LENGTH; i++)
- b.a('0');
-
- b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT);
-
- return b.toString();
+ return U.fixedLengthNumberName(idx, WAL_SEGMENT_FILE_EXT);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 868871288e6..51a1b88b105 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -173,10 +173,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
private static final byte[] FILL_BUF = new byte[1024 * 1024];
/** Pattern for segment file names. */
- public static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
+ public static final Pattern WAL_NAME_PATTERN = U.fixedLengthNumberNamePattern(".wal");
/** Pattern for WAL temp files - these files will be cleared at startup. */
- public static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
+ public static final Pattern WAL_TEMP_NAME_PATTERN = U.fixedLengthNumberNamePattern(".wal.tmp");
/** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
public static final FileFilter WAL_SEGMENT_FILE_FILTER = file -> !file.isDirectory() &&
@@ -187,7 +187,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();
/** */
- public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");
+ public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(".wal.zip");
/** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = file -> !file.isDirectory() &&
@@ -195,7 +195,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
/** */
- private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
+ private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(".wal.zip.tmp");
/** */
private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = file -> !file.isDirectory() &&
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ec8764393c0..d2101fb2545 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -401,6 +401,9 @@ public abstract class IgniteUtils {
/** Alphanumeric with underscore regexp pattern. */
private static final Pattern ALPHANUMERIC_UNDERSCORE_PATTERN = Pattern.compile("^[a-zA-Z_0-9]+$");
+ /** Length of numbered file name. */
+ public static final int NUMBER_FILE_NAME_LENGTH = 16;
+
/** Project home directory. */
private static volatile GridTuple<String> ggHome;
@@ -3364,6 +3367,50 @@ public abstract class IgniteUtils {
return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
}
+ /**
+ * Generates file name from index.
+ *
+ * @param num Number to generate file name.
+ * @param ext Optional extension
+ * @return File name.
+ */
+ public static String fixedLengthNumberName(long num, @Nullable String ext) {
+ SB b = new SB();
+
+ String segmentStr = Long.toString(num);
+
+ for (int i = segmentStr.length(); i < NUMBER_FILE_NAME_LENGTH; i++)
+ b.a('0');
+
+ b.a(segmentStr);
+
+ if (ext != null)
+ b.a(ext);
+
+ return b.toString();
+ }
+
+ /**
+ * @param fileName File name.
+ * @return Number of this file.
+ */
+ public static long fixedLengthFileNumber(String fileName) {
+ return Long.parseLong(fileName.substring(0, NUMBER_FILE_NAME_LENGTH));
+ }
+
+ /**
+ * @param ext Optional extension.
+ * @return Pattern to match numbered file name with the specific extension.
+ */
+ public static Pattern fixedLengthNumberNamePattern(@Nullable String ext) {
+ String pattern = "\\d{" + NUMBER_FILE_NAME_LENGTH + "}";
+
+ if (ext != null)
+ pattern += ext.replaceAll("\\.", "\\\\\\.");
+
+ return Pattern.compile(pattern);
+ }
+
/**
* Verifier always returns successful result for any host.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 0b4d4863597..8fb3c4d8a78 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3012,7 +3012,7 @@ public class GridFunc {
V v2 = m2.get(e.getKey());
if (v1 == v2)
- return true;
+ continue;
if (v1 == null || v2 == null)
return false;
@@ -3027,7 +3027,7 @@ public class GridFunc {
return false;
}
else {
- if (!eq(v1, v2))
+ if (!(v1.getClass().isArray() ? arrayEq(v1, v2) : eq(v1, v2)))
return false;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java
index 2214a50affb..df34e2487dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java
@@ -52,8 +52,11 @@ public class VisorSnapshotCreateTask extends VisorSnapshotOneNodeTask<VisorSnaps
/** {@inheritDoc} */
@Override protected String run(VisorSnapshotCreateTaskArg arg) throws IgniteException {
- IgniteFutureImpl<Void> fut =
- ignite.context().cache().context().snapshotMgr().createSnapshot(arg.snapshotName(), arg.snapshotPath());
+ IgniteFutureImpl<Void> fut = ignite.context().cache().context().snapshotMgr().createSnapshot(
+ arg.snapshotName(),
+ arg.snapshotPath(),
+ arg.incremental()
+ );
IgniteSnapshotManager.ClusterSnapshotFuture snpFut =
fut.internalFuture() instanceof IgniteSnapshotManager.ClusterSnapshotFuture ?
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java
index a828cf9ec9d..eadfab88551 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java
@@ -32,6 +32,9 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
/** Synchronous execution flag. */
private boolean sync;
+ /** Incremental snapshot flag. */
+ private boolean inc;
+
/** Default constructor. */
public VisorSnapshotCreateTaskArg() {
// No-op.
@@ -41,11 +44,13 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param sync Synchronous execution flag.
+ * @param inc Incremental snapshot flag.
*/
- public VisorSnapshotCreateTaskArg(String snpName, String snpPath, boolean sync) {
+ public VisorSnapshotCreateTaskArg(String snpName, String snpPath, boolean sync, boolean inc) {
super(snpName, snpPath);
this.sync = sync;
+ this.inc = inc;
}
/** @return Synchronous execution flag. */
@@ -53,11 +58,17 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
return sync;
}
+ /** @return Incremental snapshot flag. */
+ public boolean incremental() {
+ return inc;
+ }
+
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
super.writeExternalData(out);
out.writeBoolean(sync);
+ out.writeBoolean(inc);
}
/** {@inheritDoc} */
@@ -65,6 +76,7 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
super.readExternalData(ver, in);
sync = in.readBoolean();
+ inc = in.readBoolean();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java
index 86c07c4740a..bb5778df955 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java
@@ -56,7 +56,7 @@ public class VisorSnapshotRestoreTaskArg extends VisorSnapshotCreateTaskArg {
VisorSnapshotRestoreTaskAction action,
@Nullable Collection<String> grpNames
) {
- super(snpName, snpPath, sync);
+ super(snpName, snpPath, sync, false);
this.action = action;
this.grpNames = grpNames;
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
index d8b06eaccd9..479268f0264 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
@@ -40,6 +40,21 @@ public interface SnapshotMXBean {
String snpPath
);
+ /**
+ * Create the cluster-wide incremental snapshot for the given base snapshot.
+ *
+ * @param fullSnapshot Full snapshot name to attach incremental snapshot to.
+ * @param fullSnapshotPath Full snapshot directory path.
+ * @see IgniteSnapshot#createSnapshot(String)
+ * @see IgniteSnapshot#createIncrementalSnapshot(String)
+ */
+ public void createIncrementalSnapshot(
+ @MXBeanParameter(name = "fullSnapshot", description = "Snapshot name.")
+ String fullSnapshot,
+ @MXBeanParameter(name = "fullSnapshotPath", description = "Optional snapshot directory path.")
+ String fullSnapshotPath
+ );
+
/**
* Cancel previously started snapshot operation on the node initiator.
*
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 9aa661a6acb..3623553ee4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -103,6 +103,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_SNAPSHOT_TMP_DIR;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotMetaFileName;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -646,6 +647,15 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
return snpFutTask;
}
+ /** Checks incremental snapshot exists. */
+ protected boolean checkIncremental(IgniteEx node, String snpName, String snpPath, int incIdx) {
+ File incSnpDir = snp(node).incrementalSnapshotLocalDir(snpName, snpPath, incIdx);
+
+ return incSnpDir.exists()
+ && incSnpDir.isDirectory()
+ && new File(incSnpDir, incrementalSnapshotMetaFileName(incIdx)).exists();
+ }
+
/**
* @param ignite Ignite instance to resolve discovery spi to.
* @return BlockingCustomMessageDiscoverySpi instance.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
index a3b4600b5b0..13edc33a16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
@@ -394,7 +394,7 @@ public class IgniteClusterSnapshotHandlerTest extends IgniteClusterSnapshotResto
IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
- snpMgr.createSnapshot(snpName, snpDir.getAbsolutePath()).get(TIMEOUT);
+ snpMgr.createSnapshot(snpName, snpDir.getAbsolutePath(), false).get(TIMEOUT);
ignite.destroyCache(DEFAULT_CACHE_NAME);
awaitPartitionMapExchange();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index 0ec6648f45d..030c006c818 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -138,7 +138,9 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
- ignite.context().cache().context().snapshotMgr().createSnapshot(SNAPSHOT_NAME, snpDir.toString()).get(TIMEOUT);
+ ignite.context().cache().context().snapshotMgr()
+ .createSnapshot(SNAPSHOT_NAME, snpDir.toString(), false)
+ .get(TIMEOUT);
// Check snapshot.
IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, snpDir.getAbsolutePath()).get(TIMEOUT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index cffa3156c4d..f9317a47808 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -101,6 +101,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeFalse;
/**
* Cluster-wide snapshot test.
@@ -466,7 +467,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
- spi.waitBlocked(10_000L);
+ spi.waitBlocked(TIMEOUT);
// Creating of new caches should not be blocked.
ignite.getOrCreateCache(dfltCacheCfg.setName("default2"))
@@ -502,7 +503,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
- spi.waitBlocked(10_000L);
+ spi.waitBlocked(TIMEOUT);
// Not baseline node joins successfully.
String grid4Dir = folderName(startGrid(4));
@@ -527,21 +528,34 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotExOnInitiatorLeft() throws Exception {
- IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+ for (boolean inc : new boolean[] {false, true}) {
+ IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
- BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
- spi.block((msg) -> msg instanceof FullMessage);
+ if (inc) {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
- IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+ }
- spi.waitBlocked(10_000L);
+ BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+ spi.block((msg) -> msg instanceof FullMessage);
- ignite.close();
+ IgniteFuture<Void> fut = inc
+ ? ignite.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME)
+ : ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
- assertThrowsAnyCause(log,
- fut::get,
- NodeStoppingException.class,
- SNP_NODE_STOPPING_ERR_MSG);
+ spi.waitBlocked(TIMEOUT);
+
+ ignite.close();
+
+ assertThrowsAnyCause(log,
+ fut::get,
+ NodeStoppingException.class,
+ SNP_NODE_STOPPING_ERR_MSG);
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
}
/** @throws Exception If fails. */
@@ -611,7 +625,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
assertTrue("Snapshot directory must be empty for node 1 due to snapshot future fail: " + dirNameIgnite1,
!searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite1).isPresent());
- List<String> allSnapshots = snp(ignite).localSnapshotNames();
+ List<String> allSnapshots = snp(ignite).localSnapshotNames(null);
assertTrue("Snapshot directory must be empty due to snapshot fail: " + allSnapshots,
allSnapshots.isEmpty());
@@ -668,7 +682,10 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
awaitPartitionMapExchange();
- assertTrue("Snapshot directory must be empty", grid2.context().cache().context().snapshotMgr().localSnapshotNames().isEmpty());
+ assertTrue(
+ "Snapshot directory must be empty",
+ grid2.context().cache().context().snapshotMgr().localSnapshotNames(null).isEmpty()
+ );
ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
.get();
@@ -747,7 +764,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
ignite.context().cache().context().snapshotMgr()
- .createSnapshot(SNAPSHOT_NAME, cfgPath ? null : snpDir.getAbsolutePath()).get();
+ .createSnapshot(SNAPSHOT_NAME, cfgPath ? null : snpDir.getAbsolutePath(), false).get();
stopAllGrids();
@@ -773,7 +790,9 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
GridKernalContext kctx = grid(idx).context();
assertThrowsAnyCause(log,
- () -> kctx.cache().context().snapshotMgr().createSnapshot(SNAPSHOT_NAME, invalidPath).get(TIMEOUT),
+ () -> kctx.cache().context().snapshotMgr()
+ .createSnapshot(SNAPSHOT_NAME, invalidPath, false)
+ .get(TIMEOUT),
IgniteCheckedException.class,
invalidPath);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
index 837038b31c8..867c006be4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
@@ -75,6 +75,15 @@ public class IgniteSnapshotMXBeanTest extends AbstractSnapshotSelfTest {
assertTrue("Waiting for snapshot operation failed.",
GridTestUtils.waitForCondition(() -> (long)getMetric("LastSnapshotEndTime", snpMBean) > 0, TIMEOUT));
+ if (!encryption) {
+ mxBean.createIncrementalSnapshot(SNAPSHOT_NAME, "");
+
+ assertTrue(
+ "Waiting for incremental snapshot failed",
+ GridTestUtils.waitForCondition(() -> checkIncremental(ignite, SNAPSHOT_NAME, null, 1), TIMEOUT)
+ );
+ }
+
stopAllGrids();
IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
new file mode 100644
index 00000000000..340955a3ae6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.function.UnaryOperator;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.junit.Assume.assumeFalse;
+
+/**
+ * Basic tests for incremental snapshots.
+ */
+public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
+ /** */
+ public static final int GRID_CNT = 3;
+
+ /** */
+ public static final String OTHER_CACHE = "other-cache";
+
+ /** */
+ public static final String GROUPED_CACHE = "my-grouped-cache2";
+
+ /** */
+ @Test
+ public void testCreation() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ IgniteEx srv = startGridsWithCache(
+ GRID_CNT,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ File snpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
+
+ assertTrue("Target directory is not empty: " + snpDir, F.isEmpty(snpDir.list()));
+
+ IgniteEx cli = startClientGrid(
+ GRID_CNT,
+ (UnaryOperator<IgniteConfiguration>)
+ cfg -> cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
+ );
+
+ File exSnpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
+
+ for (boolean client : new boolean[] {false, true}) {
+ IgniteSnapshotManager snpCreate = snp(client ? cli : srv);
+
+ for (File snpPath : new File[] {null, exSnpDir}) {
+ if (client && snpPath != null) // Set snapshot path not supported for snapshot from client nodes.
+ continue;
+
+ String snpName = SNAPSHOT_NAME + "_" + client + "_" + (snpPath == null ? "" : snpPath.getName());
+
+ if (snpPath == null)
+ snpCreate.createSnapshot(snpName).get(TIMEOUT);
+ else
+ snpCreate.createSnapshot(snpName, snpPath.getAbsolutePath(), false).get(TIMEOUT);
+
+ for (int incIdx = 1; incIdx < 3; incIdx++) {
+ if (snpPath == null)
+ snpCreate.createIncrementalSnapshot(snpName).get(TIMEOUT);
+ else
+ snpCreate.createSnapshot(snpName, snpPath.getAbsolutePath(), true).get(TIMEOUT);
+
+ for (int gridIdx = 0; gridIdx < GRID_CNT; gridIdx++) {
+ assertTrue("Incremental snapshot must exists on node " + gridIdx, checkIncremental(
+ grid(gridIdx),
+ snpName,
+ snpPath == null ? null : snpPath.getAbsolutePath(),
+ incIdx
+ ));
+ }
+ }
+ }
+ }
+ }
+
+ /** */
+ @Test
+ public void testFailForUnknownBaseSnapshot() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ IgniteEx ign = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ assertThrowsWithCause(
+ () -> snp(ign).createIncrementalSnapshot("unknown").get(TIMEOUT),
+ IgniteException.class
+ );
+
+ snp(ign).createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ assertThrowsWithCause(
+ () -> snp(ign).createIncrementalSnapshot("unknown").get(TIMEOUT),
+ IgniteException.class
+ );
+ }
+
+ /** */
+ @Test
+ public void testIncrementalSnapshotFailsOnTopologyChange() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ IgniteEx srv = startGridsWithCache(
+ GRID_CNT,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ IgniteSnapshotManager snpCreate = snp(srv);
+
+ snpCreate.createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ String consId = grid(1).context().discovery().localNode().consistentId().toString();
+
+ // Stop some node.
+ stopGrid(1);
+
+ assertThrows(
+ null,
+ () -> snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+ IgniteException.class,
+ "Create incremental snapshot request has been rejected. " +
+ "Node from full snapshot offline [consistentId=" + consId + ']'
+ );
+ }
+
+ /** */
+ @Test
+ public void testIncrementalSnapshotFailsOnCacheDestroy() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ checkFailWhenCacheDestroyed(OTHER_CACHE, "Create incremental snapshot request has been rejected. " +
+ "Cache group destroyed [groupId=" + CU.cacheId(OTHER_CACHE) + ']');
+ }
+
+ /** */
+ @Test
+ public void testIncrementalSnapshotFailsOnGroupedCacheDestroy() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ checkFailWhenCacheDestroyed(GROUPED_CACHE, "Create incremental snapshot request has been rejected. " +
+ "Cache destroyed [cacheId=" + CU.cacheId(GROUPED_CACHE) + ", cacheName=" + GROUPED_CACHE + ']');
+ }
+
+ /** */
+ @Test
+ public void testIncrementalSnapshotFailsOnCacheChange() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ CacheConfiguration<Integer, Account> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ IgniteEx srv = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key), ccfg);
+
+ snp(srv).createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ GridLocalConfigManager locCfgMgr = srv.context().cache().configManager();
+
+ File ccfgFile = locCfgMgr.cacheConfigurationFile(ccfg);
+
+ StoredCacheData cacheData = locCfgMgr.readCacheData(ccfgFile);
+
+ assertNotNull(cacheData);
+
+ cacheData.queryEntities(Collections.singletonList(new QueryEntity(String.class, Account.class)));
+
+ locCfgMgr.writeCacheData(cacheData, ccfgFile);
+
+ assertThrows(
+ null,
+ () -> snp(srv).createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+ IgniteException.class,
+ "Cache changed [cacheId=" + CU.cacheId(DEFAULT_CACHE_NAME) + ", cacheName=" + DEFAULT_CACHE_NAME + ']'
+ );
+ }
+
+ /** */
+ @Test
+ public void testIncrementalSnapshotFailOnDirtyDir() throws Exception {
+ assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+ IgniteEx srv = startGridsWithCache(
+ GRID_CNT,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ );
+
+ snp(srv).createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ assertTrue(snp(srv).incrementalSnapshotsLocalRootDir(SNAPSHOT_NAME, null).mkdirs());
+ assertTrue(snp(srv).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).createNewFile());
+
+ assertThrows(
+ null,
+ () -> snp(srv).createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+ IgniteException.class,
+ "Can't create snapshot directory"
+ );
+
+ for (int i = 0; i < GRID_CNT; i++)
+ assertFalse(snp(srv).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).exists());
+ }
+
+ /** */
+ private void checkFailWhenCacheDestroyed(String cache2rvm, String errMsg) throws Exception {
+ IgniteEx srv = startGridsWithCache(
+ 1,
+ CACHE_KEYS_RANGE,
+ key -> new Account(key, key),
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME),
+ new CacheConfiguration<>(OTHER_CACHE),
+ new CacheConfiguration<Integer, Object>("my-grouped-cache1").setGroupName("mygroup"),
+ new CacheConfiguration<Integer, Object>(GROUPED_CACHE).setGroupName("mygroup")
+ );
+
+ IgniteSnapshotManager snpCreate = snp(srv);
+
+ snpCreate.createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ srv.destroyCache(cache2rvm);
+
+ assertThrows(
+ null,
+ () -> snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+ IgniteException.class,
+ errMsg
+ );
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/lang/GridFuncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/GridFuncSelfTest.java
new file mode 100644
index 00000000000..d25cb4e97bb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/GridFuncSelfTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import java.util.TreeMap;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** */
+public class GridFuncSelfTest {
+ /** */
+ @Test
+ public void testMapEqNotOrdered() {
+ String str = "mystring";
+
+ TreeMap<String, Object> m1 = new TreeMap<>();
+
+ TreeMap<String, Object> m2 = new TreeMap<>();
+
+ m1.put("1", str);
+ m2.put("1", str);
+
+ m1.put("2", "2");
+ m2.put("3", "3");
+
+ assertFalse(F.eqNotOrdered(m1, m2));
+
+ m1.remove("2");
+ m2.remove("3");
+
+ m1.put("arr", new byte[] {1, 2, 3});
+ m2.put("arr", new byte[] {1, 2, 3});
+
+ assertTrue(F.eqNotOrdered(m1, m2));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index 7618ff25521..83689482659 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSn
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.PlainSnapshotTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -46,7 +47,8 @@ import org.junit.runners.Suite;
IgniteSnapshotRestoreFromRemoteTest.class,
PlainSnapshotTest.class,
EncryptedSnapshotTest.class,
- IgniteClusterSnapshotWalRecordTest.class
+ IgniteClusterSnapshotWalRecordTest.class,
+ IncrementalSnapshotTest.class
})
public class IgniteSnapshotTestSuite {
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index 5126a479aea..c003db0ea2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.IgniteDevOnlyLogTest;
import org.apache.ignite.internal.util.IgniteExceptionRegistrySelfTest;
import org.apache.ignite.internal.util.IgniteUtilsSelfTest;
import org.apache.ignite.internal.util.IgniteUtilsUnitTest;
+import org.apache.ignite.internal.util.lang.GridFuncSelfTest;
import org.apache.ignite.internal.util.nio.GridNioDelimitedBufferSelfTest;
import org.apache.ignite.internal.util.nio.GridNioSelfTest;
import org.apache.ignite.internal.util.nio.GridNioServerTest;
@@ -109,6 +110,7 @@ import org.junit.runners.Suite;
GridTransientTest.class,
IgniteDevOnlyLogTest.class,
GridConcurrentMultiPairQueueTest.class,
+ GridFuncSelfTest.class,
// Sensitive toString.
IncludeSensitiveAtomicTest.class,
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
index c1b37a48c3c..ef334798d3b 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
@@ -140,12 +140,13 @@ This utility can do the following commands:
snapshot_name - Snapshot name.
Create cluster snapshot:
- control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync]
+ control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync] [--incremental]
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
--dest path - Path to the directory where the snapshot will be saved. If not specified, the default configured snapshot directory will be used.
--sync - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
+ --incremental - Create an incremental snapshot for previously created full snapshot. Full snapshot must be accessible via --dest and snapshot_name.
Cancel running snapshot operation:
control.(sh|bat) --snapshot cancel --id id|--name name
@@ -158,14 +159,14 @@ This utility can do the following commands:
control.(sh|bat) --snapshot check snapshot_name [--src path]
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
--src path - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
Restore snapshot:
control.(sh|bat) --snapshot restore snapshot_name [--groups group1,...groupN] [--src path] [--sync]
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
--groups group1,...groupN - Cache group names.
--src path - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
--sync - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
@@ -174,13 +175,13 @@ This utility can do the following commands:
control.(sh|bat) --snapshot restore snapshot_name --status
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
Cancel snapshot restore operation (Command deprecated. Use '--snapshot cancel' instead):
control.(sh|bat) --snapshot restore snapshot_name --cancel
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
Get the status of the current snapshot operation:
control.(sh|bat) --snapshot status
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
index c1b37a48c3c..ef334798d3b 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
@@ -140,12 +140,13 @@ This utility can do the following commands:
snapshot_name - Snapshot name.
Create cluster snapshot:
- control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync]
+ control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync] [--incremental]
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
--dest path - Path to the directory where the snapshot will be saved. If not specified, the default configured snapshot directory will be used.
--sync - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
+ --incremental - Create an incremental snapshot for previously created full snapshot. Full snapshot must be accessible via --dest and snapshot_name.
Cancel running snapshot operation:
control.(sh|bat) --snapshot cancel --id id|--name name
@@ -158,14 +159,14 @@ This utility can do the following commands:
control.(sh|bat) --snapshot check snapshot_name [--src path]
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
--src path - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
Restore snapshot:
control.(sh|bat) --snapshot restore snapshot_name [--groups group1,...groupN] [--src path] [--sync]
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
--groups group1,...groupN - Cache group names.
--src path - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
--sync - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
@@ -174,13 +175,13 @@ This utility can do the following commands:
control.(sh|bat) --snapshot restore snapshot_name --status
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
Cancel snapshot restore operation (Command deprecated. Use '--snapshot cancel' instead):
control.(sh|bat) --snapshot restore snapshot_name --cancel
Parameters:
- snapshot_name - Snapshot name.
+ snapshot_name - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
Get the status of the current snapshot operation:
control.(sh|bat) --snapshot status